blob: 11ebf617263cc09731b30666483fb3c5f7757f3d [file] [log] [blame]
# -*- coding: utf-8 -*-
from io import BytesIO
import pytest
from requests import compat
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
from requests.utils import (
address_in_network, dotted_netmask,
get_auth_from_url, get_encoding_from_headers,
get_encodings_from_content, get_environ_proxies,
guess_filename, guess_json_utf, is_ipv4_address,
is_valid_cidr, iter_slices, parse_dict_header,
parse_header_links, prepend_scheme_if_needed,
requote_uri, select_proxy, should_bypass_proxies, super_len,
to_key_val_list, to_native_string,
unquote_header_value, unquote_unreserved,
urldefragauth, add_dict_to_cookiejar)
from requests._internal_utils import unicode_is_ascii
from .compat import StringIO, cStringIO
class TestSuperLen:
'stream, value', (
(StringIO.StringIO, 'Test'),
(BytesIO, b'Test'),
pytest.mark.skipif('cStringIO is None')((cStringIO, 'Test')),
def test_io_streams(self, stream, value):
"""Ensures that we properly deal with different kinds of IO streams."""
assert super_len(stream()) == 0
assert super_len(stream(value)) == 4
def test_super_len_correctly_calculates_len_of_partially_read_file(self):
"""Ensure that we handle partially consumed file like objects."""
s = StringIO.StringIO()
assert super_len(s) == 0
@pytest.mark.parametrize('error', [IOError, OSError])
def test_super_len_handles_files_raising_weird_errors_in_tell(self, error):
"""If tell() raises errors, assume the cursor is at position zero."""
class BoomFile(object):
def __len__(self):
return 5
def tell(self):
raise error()
assert super_len(BoomFile()) == 0
@pytest.mark.parametrize('error', [IOError, OSError])
def test_super_len_tell_ioerror(self, error):
"""Ensure that if tell gives an IOError super_len doesn't fail"""
class NoLenBoomFile(object):
def tell(self):
raise error()
def seek(self, offset, whence):
assert super_len(NoLenBoomFile()) == 0
def test_string(self):
assert super_len('Test') == 4
'mode, warnings_num', (
('r', 1),
('rb', 0),
def test_file(self, tmpdir, mode, warnings_num, recwarn):
file_obj = tmpdir.join('test.txt')
with as fd:
assert super_len(fd) == 4
assert len(recwarn) == warnings_num
def test_super_len_with__len__(self):
foo = [1,2,3,4]
len_foo = super_len(foo)
assert len_foo == 4
def test_super_len_with_no__len__(self):
class LenFile(object):
def __init__(self):
self.len = 5
assert super_len(LenFile()) == 5
def test_super_len_with_tell(self):
foo = StringIO.StringIO('12345')
assert super_len(foo) == 5
assert super_len(foo) == 3
def test_super_len_with_fileno(self):
with open(__file__, 'rb') as f:
length = super_len(f)
file_data =
assert length == len(file_data)
def test_super_len_with_no_matches(self):
"""Ensure that objects without any length methods default to 0"""
assert super_len(object()) == 0
class TestToKeyValList:
'value, expected', (
([('key', 'val')], [('key', 'val')]),
((('key', 'val'), ), [('key', 'val')]),
({'key': 'val'}, [('key', 'val')]),
(None, None)
def test_valid(self, value, expected):
assert to_key_val_list(value) == expected
def test_invalid(self):
with pytest.raises(ValueError):
class TestUnquoteHeaderValue:
'value, expected', (
(None, None),
('Test', 'Test'),
('"Test"', 'Test'),
('"Test\\\\"', 'Test\\'),
('"\\\\Comp\\Res"', '\\Comp\\Res'),
def test_valid(self, value, expected):
assert unquote_header_value(value) == expected
def test_is_filename(self):
assert unquote_header_value('"\\\\Comp\\Res"', True) == '\\\\Comp\\Res'
class TestGetEnvironProxies:
"""Ensures that IP addresses are correctly matches with ranges
in no_proxy variable.
@pytest.fixture(autouse=True, params=['no_proxy', 'NO_PROXY'])
def no_proxy(self, request, monkeypatch):
monkeypatch.setenv(request.param, ',,localhost.localdomain,')
'url', (
def test_bypass(self, url):
assert get_environ_proxies(url, no_proxy=None) == {}
'url', (
def test_not_bypass(self, url):
assert get_environ_proxies(url, no_proxy=None) != {}
'url', (
def test_bypass_no_proxy_keyword(self, url):
no_proxy = ','
assert get_environ_proxies(url, no_proxy=no_proxy) == {}
'url', (
def test_not_bypass_no_proxy_keyword(self, url, monkeypatch):
# This is testing that the 'no_proxy' argument overrides the
# environment variable 'no_proxy'
monkeypatch.setenv('http_proxy', '')
no_proxy = ','
assert get_environ_proxies(url, no_proxy=no_proxy) != {}
class TestIsIPv4Address:
def test_valid(self):
assert is_ipv4_address('')
@pytest.mark.parametrize('value', ('', 'localhost.localdomain'))
def test_invalid(self, value):
assert not is_ipv4_address(value)
class TestIsValidCIDR:
def test_valid(self):
assert is_valid_cidr('')
'value', (
def test_invalid(self, value):
assert not is_valid_cidr(value)
class TestAddressInNetwork:
def test_valid(self):
assert address_in_network('', '')
def test_invalid(self):
assert not address_in_network('', '')
class TestGuessFilename:
'value', (1, type('Fake', (object,), {'name': 1})()),
def test_guess_filename_invalid(self, value):
assert guess_filename(value) is None
'value, expected_type', (
(b'value', compat.bytes),
(b'value'.decode('utf-8'), compat.str)
def test_guess_filename_valid(self, value, expected_type):
obj = type('Fake', (object,), {'name': value})()
result = guess_filename(obj)
assert result == value
assert isinstance(result, expected_type)
class TestContentEncodingDetection:
def test_none(self):
encodings = get_encodings_from_content('')
assert not len(encodings)
'content', (
# HTML5 meta charset attribute
'<meta charset="UTF-8">',
# HTML4 pragma directive
'<meta http-equiv="Content-type" content="text/html;charset=UTF-8">',
# XHTML 1.x served with text/html MIME type
'<meta http-equiv="Content-type" content="text/html;charset=UTF-8" />',
# XHTML 1.x served as XML
'<?xml version="1.0" encoding="UTF-8"?>',
def test_pragmas(self, content):
encodings = get_encodings_from_content(content)
assert len(encodings) == 1
assert encodings[0] == 'UTF-8'
def test_precedence(self):
content = '''
<?xml version="1.0" encoding="XML"?>
<meta charset="HTML5">
<meta http-equiv="Content-type" content="text/html;charset=HTML4" />
assert get_encodings_from_content(content) == ['HTML5', 'HTML4', 'XML']
class TestGuessJSONUTF:
'encoding', (
'utf-32', 'utf-8-sig', 'utf-16', 'utf-8', 'utf-16-be', 'utf-16-le',
'utf-32-be', 'utf-32-le'
def test_encoded(self, encoding):
data = '{}'.encode(encoding)
assert guess_json_utf(data) == encoding
def test_bad_utf_like_encoding(self):
assert guess_json_utf(b'\x00\x00\x00\x00') is None
('encoding', 'expected'), (
('utf-16-be', 'utf-16'),
('utf-16-le', 'utf-16'),
('utf-32-be', 'utf-32'),
('utf-32-le', 'utf-32')
def test_guess_by_bom(self, encoding, expected):
data = u'\ufeff{}'.encode(encoding)
assert guess_json_utf(data) == expected
USER = PASSWORD = "%!*'();:@&=+$,/?#[] "
ENCODED_USER = compat.quote(USER, '')
'url, auth', (
'http://' + ENCODED_USER + ':' + ENCODED_PASSWORD + '@' +
('user', 'pass')
('user', 'pass pass')
('user', 'pass pass')
('user%user', 'pass')
('user', 'pass#pass')
('', '')
def test_get_auth_from_url(url, auth):
assert get_auth_from_url(url) == auth
'uri, expected', (
# Ensure requoting doesn't break expectations
# Ensure we handle unquoted percent signs in redirects
def test_requote_uri_with_unquoted_percents(uri, expected):
assert requote_uri(uri) == expected
'uri, expected', (
# Illegal bytes
# Reserved characters
def test_unquote_unreserved(uri, expected):
assert unquote_unreserved(uri) == expected
'mask, expected', (
(8, ''),
(24, ''),
(25, ''),
def test_dotted_netmask(mask, expected):
assert dotted_netmask(mask) == expected
http_proxies = {'http': 'http://http.proxy',
'': ''}
all_proxies = {'all': 'socks5://http.proxy',
'all://': 'socks5://'}
mixed_proxies = {'http': 'http://http.proxy',
'': '',
'all': 'socks5://http.proxy'}
'url, expected, proxies', (
('hTTp://u:p@Some.Host/path', '', http_proxies),
('hTTp://u:p@Other.Host/path', 'http://http.proxy', http_proxies),
('hTTp:///path', 'http://http.proxy', http_proxies),
('hTTps://Other.Host', None, http_proxies),
('file:///etc/motd', None, http_proxies),
('hTTp://u:p@Some.Host/path', 'socks5://', all_proxies),
('hTTp://u:p@Other.Host/path', 'socks5://http.proxy', all_proxies),
('hTTp:///path', 'socks5://http.proxy', all_proxies),
('hTTps://Other.Host', 'socks5://http.proxy', all_proxies),
('', 'http://http.proxy', mixed_proxies),
('', '', mixed_proxies),
('', 'socks5://http.proxy', mixed_proxies),
('', 'socks5://http.proxy', mixed_proxies),
('https://', 'socks5://http.proxy', mixed_proxies),
# XXX: unsure whether this is reasonable behavior
('file:///etc/motd', 'socks5://http.proxy', all_proxies),
def test_select_proxies(url, expected, proxies):
"""Make sure we can select per-host proxies correctly."""
assert select_proxy(url, proxies) == expected
'value, expected', (
('foo="is a fish", bar="as well"', {'foo': 'is a fish', 'bar': 'as well'}),
('key_without_value', {'key_without_value': None})
def test_parse_dict_header(value, expected):
assert parse_dict_header(value) == expected
'value, expected', (
CaseInsensitiveDict({'content-type': 'application/json; charset=utf-8'}),
CaseInsensitiveDict({'content-type': 'text/plain'}),
def test_get_encoding_from_headers(value, expected):
assert get_encoding_from_headers(value) == expected
'value, length', (
('', 0),
('T', 1),
('Test', 4),
('Cont', 0),
('Other', -5),
('Content', None),
def test_iter_slices(value, length):
if length is None or (length <= 0 and len(value) > 0):
# Reads all content at once
assert len(list(iter_slices(value, length))) == 1
assert len(list(iter_slices(value, 1))) == length
'value, expected', (
'<http:/.../front.jpeg>; rel=front; type="image/jpeg"',
[{'url': 'http:/.../front.jpeg', 'rel': 'front', 'type': 'image/jpeg'}]
[{'url': 'http:/.../front.jpeg'}]
[{'url': 'http:/.../front.jpeg'}]
'<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;',
{'url': 'http:/.../front.jpeg', 'type': 'image/jpeg'},
{'url': 'http://.../back.jpeg'}
def test_parse_header_links(value, expected):
assert parse_header_links(value) == expected
'value, expected', (
('', ''),
('//', ''),
def test_prepend_scheme_if_needed(value, expected):
assert prepend_scheme_if_needed(value, 'http') == expected
'value, expected', (
('T', 'T'),
(b'T', 'T'),
(u'T', 'T'),
def test_to_native_string(value, expected):
assert to_native_string(value) == expected
'url, expected', (
('', ''),
('', ''),
('//', '//'),
('//', '//'),
('', '//'),
('', 'scheme://'),
def test_urldefragauth(url, expected):
assert urldefragauth(url) == expected
'url, expected', (
('', True),
('', True),
('', True),
('', True),
('http://localhost.localdomain:5000/v1.0/', True),
('', False),
('', False),
('', False),
def test_should_bypass_proxies(url, expected, monkeypatch):
"""Tests for function should_bypass_proxies to check if proxy
can be bypassed or not
monkeypatch.setenv('no_proxy', ',,localhost.localdomain,')
monkeypatch.setenv('NO_PROXY', ',,localhost.localdomain,')
assert should_bypass_proxies(url, no_proxy=None) == expected
'cookiejar', (
def test_add_dict_to_cookiejar(cookiejar):
"""Ensure add_dict_to_cookiejar works for
non-RequestsCookieJar CookieJars
cookiedict = {'test': 'cookies',
'good': 'cookies'}
cj = add_dict_to_cookiejar(cookiejar, cookiedict)
cookies = dict((, cookie.value) for cookie in cj)
assert cookiedict == cookies
'value, expected', (
(u'test', True),
(u'æíöû', False),
(u'ジェーピーニック', False),
def test_unicode_is_ascii(value, expected):
assert unicode_is_ascii(value) is expected
'url, expected', (
('', True),
('', True),
('', True),
('', True),
('http://localhost.localdomain:5000/v1.0/', True),
('', False),
('', False),
('', False),
def test_should_bypass_proxies_no_proxy(
url, expected, monkeypatch):
"""Tests for function should_bypass_proxies to check if proxy
can be bypassed or not using the 'no_proxy' argument
no_proxy = ',,localhost.localdomain,'
# Test 'no_proxy' argument
assert should_bypass_proxies(url, no_proxy=no_proxy) == expected