blob: 859d07e8a594b2a80acae666949971f6ead90299 [file] [log] [blame]
import threading
import pytest
from tests.testserver.server import Server, consume_socket_content
import requests
from requests.compat import JSONDecodeError
from .utils import override_environ
def echo_response_handler(sock):
"""Simple handler that will take request and echo it back to requester."""
request_content = consume_socket_content(sock, timeout=0.5)
text_200 = (
b"HTTP/1.1 200 OK\r\n"
b"Content-Length: %d\r\n\r\n"
b"%s"
) % (len(request_content), request_content)
sock.send(text_200)
def test_chunked_upload():
"""can safely send generators"""
close_server = threading.Event()
server = Server.basic_response_server(wait_to_close_event=close_server)
data = iter([b"a", b"b", b"c"])
with server as (host, port):
url = f"http://{host}:{port}/"
r = requests.post(url, data=data, stream=True)
close_server.set() # release server block
assert r.status_code == 200
assert r.request.headers["Transfer-Encoding"] == "chunked"
def test_chunked_encoding_error():
"""get a ChunkedEncodingError if the server returns a bad response"""
def incomplete_chunked_response_handler(sock):
request_content = consume_socket_content(sock, timeout=0.5)
# The server never ends the request and doesn't provide any valid chunks
sock.send(
b"HTTP/1.1 200 OK\r\n"
b"Transfer-Encoding: chunked\r\n"
)
return request_content
close_server = threading.Event()
server = Server(incomplete_chunked_response_handler)
with server as (host, port):
url = f"http://{host}:{port}/"
with pytest.raises(requests.exceptions.ChunkedEncodingError):
requests.get(url)
close_server.set() # release server block
def test_chunked_upload_uses_only_specified_host_header():
"""Ensure we use only the specified Host header for chunked requests."""
close_server = threading.Event()
server = Server(echo_response_handler, wait_to_close_event=close_server)
data = iter([b"a", b"b", b"c"])
custom_host = "sample-host"
with server as (host, port):
url = f"http://{host}:{port}/"
r = requests.post(url, data=data, headers={"Host": custom_host}, stream=True)
close_server.set() # release server block
expected_header = b"Host: %s\r\n" % custom_host.encode("utf-8")
assert expected_header in r.content
assert r.content.count(b"Host: ") == 1
def test_chunked_upload_doesnt_skip_host_header():
"""Ensure we don't omit all Host headers with chunked requests."""
close_server = threading.Event()
server = Server(echo_response_handler, wait_to_close_event=close_server)
data = iter([b"a", b"b", b"c"])
with server as (host, port):
expected_host = f"{host}:{port}"
url = f"http://{host}:{port}/"
r = requests.post(url, data=data, stream=True)
close_server.set() # release server block
expected_header = b"Host: %s\r\n" % expected_host.encode("utf-8")
assert expected_header in r.content
assert r.content.count(b"Host: ") == 1
def test_conflicting_content_lengths():
"""Ensure we correctly throw an InvalidHeader error if multiple
conflicting Content-Length headers are returned.
"""
def multiple_content_length_response_handler(sock):
request_content = consume_socket_content(sock, timeout=0.5)
response = (
b"HTTP/1.1 200 OK\r\n"
b"Content-Type: text/plain\r\n"
b"Content-Length: 16\r\n"
b"Content-Length: 32\r\n\r\n"
b"-- Bad Actor -- Original Content\r\n"
)
sock.send(response)
return request_content
close_server = threading.Event()
server = Server(multiple_content_length_response_handler)
with server as (host, port):
url = f"http://{host}:{port}/"
with pytest.raises(requests.exceptions.InvalidHeader):
requests.get(url)
close_server.set()
def test_digestauth_401_count_reset_on_redirect():
"""Ensure we correctly reset num_401_calls after a successful digest auth,
followed by a 302 redirect to another digest auth prompt.
See https://github.com/psf/requests/issues/1979.
"""
text_401 = (b'HTTP/1.1 401 UNAUTHORIZED\r\n'
b'Content-Length: 0\r\n'
b'WWW-Authenticate: Digest nonce="6bf5d6e4da1ce66918800195d6b9130d"'
b', opaque="372825293d1c26955496c80ed6426e9e", '
b'realm="me@kennethreitz.com", qop=auth\r\n\r\n')
text_302 = (b'HTTP/1.1 302 FOUND\r\n'
b'Content-Length: 0\r\n'
b'Location: /\r\n\r\n')
text_200 = (b'HTTP/1.1 200 OK\r\n'
b'Content-Length: 0\r\n\r\n')
expected_digest = (b'Authorization: Digest username="user", '
b'realm="me@kennethreitz.com", '
b'nonce="6bf5d6e4da1ce66918800195d6b9130d", uri="/"')
auth = requests.auth.HTTPDigestAuth('user', 'pass')
def digest_response_handler(sock):
# Respond to initial GET with a challenge.
request_content = consume_socket_content(sock, timeout=0.5)
assert request_content.startswith(b"GET / HTTP/1.1")
sock.send(text_401)
# Verify we receive an Authorization header in response, then redirect.
request_content = consume_socket_content(sock, timeout=0.5)
assert expected_digest in request_content
sock.send(text_302)
# Verify Authorization isn't sent to the redirected host,
# then send another challenge.
request_content = consume_socket_content(sock, timeout=0.5)
assert b'Authorization:' not in request_content
sock.send(text_401)
# Verify Authorization is sent correctly again, and return 200 OK.
request_content = consume_socket_content(sock, timeout=0.5)
assert expected_digest in request_content
sock.send(text_200)
return request_content
close_server = threading.Event()
server = Server(digest_response_handler, wait_to_close_event=close_server)
with server as (host, port):
url = f'http://{host}:{port}/'
r = requests.get(url, auth=auth)
# Verify server succeeded in authenticating.
assert r.status_code == 200
# Verify Authorization was sent in final request.
assert 'Authorization' in r.request.headers
assert r.request.headers['Authorization'].startswith('Digest ')
# Verify redirect happened as we expected.
assert r.history[0].status_code == 302
close_server.set()
def test_digestauth_401_only_sent_once():
"""Ensure we correctly respond to a 401 challenge once, and then
stop responding if challenged again.
"""
text_401 = (b'HTTP/1.1 401 UNAUTHORIZED\r\n'
b'Content-Length: 0\r\n'
b'WWW-Authenticate: Digest nonce="6bf5d6e4da1ce66918800195d6b9130d"'
b', opaque="372825293d1c26955496c80ed6426e9e", '
b'realm="me@kennethreitz.com", qop=auth\r\n\r\n')
expected_digest = (b'Authorization: Digest username="user", '
b'realm="me@kennethreitz.com", '
b'nonce="6bf5d6e4da1ce66918800195d6b9130d", uri="/"')
auth = requests.auth.HTTPDigestAuth('user', 'pass')
def digest_failed_response_handler(sock):
# Respond to initial GET with a challenge.
request_content = consume_socket_content(sock, timeout=0.5)
assert request_content.startswith(b"GET / HTTP/1.1")
sock.send(text_401)
# Verify we receive an Authorization header in response, then
# challenge again.
request_content = consume_socket_content(sock, timeout=0.5)
assert expected_digest in request_content
sock.send(text_401)
# Verify the client didn't respond to second challenge.
request_content = consume_socket_content(sock, timeout=0.5)
assert request_content == b''
return request_content
close_server = threading.Event()
server = Server(digest_failed_response_handler, wait_to_close_event=close_server)
with server as (host, port):
url = f'http://{host}:{port}/'
r = requests.get(url, auth=auth)
# Verify server didn't authenticate us.
assert r.status_code == 401
assert r.history[0].status_code == 401
close_server.set()
def test_digestauth_only_on_4xx():
"""Ensure we only send digestauth on 4xx challenges.
See https://github.com/psf/requests/issues/3772.
"""
text_200_chal = (b'HTTP/1.1 200 OK\r\n'
b'Content-Length: 0\r\n'
b'WWW-Authenticate: Digest nonce="6bf5d6e4da1ce66918800195d6b9130d"'
b', opaque="372825293d1c26955496c80ed6426e9e", '
b'realm="me@kennethreitz.com", qop=auth\r\n\r\n')
auth = requests.auth.HTTPDigestAuth('user', 'pass')
def digest_response_handler(sock):
# Respond to GET with a 200 containing www-authenticate header.
request_content = consume_socket_content(sock, timeout=0.5)
assert request_content.startswith(b"GET / HTTP/1.1")
sock.send(text_200_chal)
# Verify the client didn't respond with auth.
request_content = consume_socket_content(sock, timeout=0.5)
assert request_content == b''
return request_content
close_server = threading.Event()
server = Server(digest_response_handler, wait_to_close_event=close_server)
with server as (host, port):
url = f'http://{host}:{port}/'
r = requests.get(url, auth=auth)
# Verify server didn't receive auth from us.
assert r.status_code == 200
assert len(r.history) == 0
close_server.set()
_schemes_by_var_prefix = [
('http', ['http']),
('https', ['https']),
('all', ['http', 'https']),
]
_proxy_combos = []
for prefix, schemes in _schemes_by_var_prefix:
for scheme in schemes:
_proxy_combos.append((f"{prefix}_proxy", scheme))
_proxy_combos += [(var.upper(), scheme) for var, scheme in _proxy_combos]
@pytest.mark.parametrize("var,scheme", _proxy_combos)
def test_use_proxy_from_environment(httpbin, var, scheme):
url = f"{scheme}://httpbin.org"
fake_proxy = Server() # do nothing with the requests; just close the socket
with fake_proxy as (host, port):
proxy_url = f"socks5://{host}:{port}"
kwargs = {var: proxy_url}
with override_environ(**kwargs):
# fake proxy's lack of response will cause a ConnectionError
with pytest.raises(requests.exceptions.ConnectionError):
requests.get(url)
# the fake proxy received a request
assert len(fake_proxy.handler_results) == 1
# it had actual content (not checking for SOCKS protocol for now)
assert len(fake_proxy.handler_results[0]) > 0
def test_redirect_rfc1808_to_non_ascii_location():
path = 'ลก'
expected_path = b'%C5%A1'
redirect_request = [] # stores the second request to the server
def redirect_resp_handler(sock):
consume_socket_content(sock, timeout=0.5)
location = f'//{host}:{port}/{path}'
sock.send(
(
b'HTTP/1.1 301 Moved Permanently\r\n'
b'Content-Length: 0\r\n'
b'Location: %s\r\n'
b'\r\n'
) % location.encode('utf8')
)
redirect_request.append(consume_socket_content(sock, timeout=0.5))
sock.send(b'HTTP/1.1 200 OK\r\n\r\n')
close_server = threading.Event()
server = Server(redirect_resp_handler, wait_to_close_event=close_server)
with server as (host, port):
url = f'http://{host}:{port}'
r = requests.get(url=url, allow_redirects=True)
assert r.status_code == 200
assert len(r.history) == 1
assert r.history[0].status_code == 301
assert redirect_request[0].startswith(b'GET /' + expected_path + b' HTTP/1.1')
assert r.url == '{}/{}'.format(url, expected_path.decode('ascii'))
close_server.set()
def test_fragment_not_sent_with_request():
"""Verify that the fragment portion of a URI isn't sent to the server."""
close_server = threading.Event()
server = Server(echo_response_handler, wait_to_close_event=close_server)
with server as (host, port):
url = f'http://{host}:{port}/path/to/thing/#view=edit&token=hunter2'
r = requests.get(url)
raw_request = r.content
assert r.status_code == 200
headers, body = raw_request.split(b'\r\n\r\n', 1)
status_line, headers = headers.split(b'\r\n', 1)
assert status_line == b'GET /path/to/thing/ HTTP/1.1'
for frag in (b'view', b'edit', b'token', b'hunter2'):
assert frag not in headers
assert frag not in body
close_server.set()
def test_fragment_update_on_redirect():
"""Verify we only append previous fragment if one doesn't exist on new
location. If a new fragment is encountered in a Location header, it should
be added to all subsequent requests.
"""
def response_handler(sock):
consume_socket_content(sock, timeout=0.5)
sock.send(
b'HTTP/1.1 302 FOUND\r\n'
b'Content-Length: 0\r\n'
b'Location: /get#relevant-section\r\n\r\n'
)
consume_socket_content(sock, timeout=0.5)
sock.send(
b'HTTP/1.1 302 FOUND\r\n'
b'Content-Length: 0\r\n'
b'Location: /final-url/\r\n\r\n'
)
consume_socket_content(sock, timeout=0.5)
sock.send(
b'HTTP/1.1 200 OK\r\n\r\n'
)
close_server = threading.Event()
server = Server(response_handler, wait_to_close_event=close_server)
with server as (host, port):
url = f'http://{host}:{port}/path/to/thing/#view=edit&token=hunter2'
r = requests.get(url)
assert r.status_code == 200
assert len(r.history) == 2
assert r.history[0].request.url == url
# Verify we haven't overwritten the location with our previous fragment.
assert r.history[1].request.url == f'http://{host}:{port}/get#relevant-section'
# Verify previous fragment is used and not the original.
assert r.url == f'http://{host}:{port}/final-url/#relevant-section'
close_server.set()
def test_json_decode_compatibility_for_alt_utf_encodings():
def response_handler(sock):
consume_socket_content(sock, timeout=0.5)
sock.send(
b'HTTP/1.1 200 OK\r\n'
b'Content-Length: 18\r\n\r\n'
b'\xff\xfe{\x00"\x00K0"\x00=\x00"\x00\xab0"\x00\r\n'
)
close_server = threading.Event()
server = Server(response_handler, wait_to_close_event=close_server)
with server as (host, port):
url = f'http://{host}:{port}/'
r = requests.get(url)
r.encoding = None
with pytest.raises(requests.exceptions.JSONDecodeError) as excinfo:
r.json()
assert isinstance(excinfo.value, requests.exceptions.RequestException)
assert isinstance(excinfo.value, JSONDecodeError)
assert r.text not in str(excinfo.value)