| #!/usr/bin/env python |
| # -*- coding: utf-8 -*- |
| |
| """Tests for Requests.""" |
| |
| from __future__ import division |
| import json |
| import os |
| import unittest |
| import pickle |
| |
| import requests |
| from requests.auth import HTTPDigestAuth |
| from requests.adapters import HTTPAdapter |
| from requests.compat import str, cookielib |
| from requests.cookies import cookiejar_from_dict |
| from requests.exceptions import InvalidURL, MissingSchema |
| from requests.structures import CaseInsensitiveDict |
| |
| try: |
| import StringIO |
| except ImportError: |
| import io as StringIO |
| |
| HTTPBIN = os.environ.get('HTTPBIN_URL', 'http://httpbin.org/') |
| |
| |
| def httpbin(*suffix): |
| """Returns url for HTTPBIN resource.""" |
| return HTTPBIN + '/'.join(suffix) |
| |
| |
| class RequestsTestCase(unittest.TestCase): |
| |
| _multiprocess_can_split_ = True |
| |
| def setUp(self): |
| """Create simple data set with headers.""" |
| pass |
| |
| def tearDown(self): |
| """Teardown.""" |
| pass |
| |
| def test_entry_points(self): |
| |
| requests.session |
| requests.session().get |
| requests.session().head |
| requests.get |
| requests.head |
| requests.put |
| requests.patch |
| requests.post |
| |
| def test_invalid_url(self): |
| self.assertRaises(MissingSchema, requests.get, 'hiwpefhipowhefopw') |
| self.assertRaises(InvalidURL, requests.get, 'http://') |
| |
| def test_basic_building(self): |
| req = requests.Request() |
| req.url = 'http://kennethreitz.org/' |
| req.data = {'life': '42'} |
| |
| pr = req.prepare() |
| assert pr.url == req.url |
| assert pr.body == 'life=42' |
| |
| def test_no_content_length(self): |
| get_req = requests.Request('GET', httpbin('get')).prepare() |
| self.assertTrue('Content-Length' not in get_req.headers) |
| head_req = requests.Request('HEAD', httpbin('head')).prepare() |
| self.assertTrue('Content-Length' not in head_req.headers) |
| |
| def test_path_is_not_double_encoded(self): |
| request = requests.Request('GET', "http://0.0.0.0/get/test case").prepare() |
| |
| self.assertEqual(request.path_url, "/get/test%20case") |
| |
| def test_params_are_added_before_fragment(self): |
| request = requests.Request('GET', |
| "http://example.com/path#fragment", params={"a": "b"}).prepare() |
| self.assertEqual(request.url, |
| "http://example.com/path?a=b#fragment") |
| request = requests.Request('GET', |
| "http://example.com/path?key=value#fragment", params={"a": "b"}).prepare() |
| self.assertEqual(request.url, |
| "http://example.com/path?key=value&a=b#fragment") |
| |
| def test_mixed_case_scheme_acceptable(self): |
| s = requests.Session() |
| r = requests.Request('GET', 'http://httpbin.org/get') |
| r = s.send(r.prepare()) |
| self.assertEqual(r.status_code,200) |
| s = requests.Session() |
| r = requests.Request('GET', 'HTTP://httpbin.org/get') |
| r = s.send(r.prepare()) |
| self.assertEqual(r.status_code,200) |
| r = requests.Request('GET', 'hTTp://httpbin.org/get') |
| r = s.send(r.prepare()) |
| self.assertEqual(r.status_code,200) |
| r = requests.Request('GET', 'HttP://httpbin.org/get') |
| r = s.send(r.prepare()) |
| self.assertEqual(r.status_code,200) |
| r = requests.Request('GET', 'https://httpbin.org/get') |
| r = s.send(r.prepare()) |
| self.assertEqual(r.status_code,200) |
| r = requests.Request('GET', 'HTTPS://httpbin.org/get') |
| r = s.send(r.prepare()) |
| self.assertEqual(r.status_code,200) |
| r = requests.Request('GET', 'hTTps://httpbin.org/get') |
| r = s.send(r.prepare()) |
| self.assertEqual(r.status_code,200) |
| r = requests.Request('GET', 'HttPs://httpbin.org/get') |
| r = s.send(r.prepare()) |
| self.assertEqual(r.status_code,200) |
| |
| def test_HTTP_200_OK_GET_ALTERNATIVE(self): |
| r = requests.Request('GET', httpbin('get')) |
| s = requests.Session() |
| |
| r = s.send(r.prepare()) |
| |
| self.assertEqual(r.status_code, 200) |
| |
| def test_HTTP_302_ALLOW_REDIRECT_GET(self): |
| r = requests.get(httpbin('redirect', '1')) |
| self.assertEqual(r.status_code, 200) |
| |
| # def test_HTTP_302_ALLOW_REDIRECT_POST(self): |
| # r = requests.post(httpbin('status', '302'), data={'some': 'data'}) |
| # self.assertEqual(r.status_code, 200) |
| |
| def test_HTTP_200_OK_GET_WITH_PARAMS(self): |
| heads = {'User-agent': 'Mozilla/5.0'} |
| |
| r = requests.get(httpbin('user-agent'), headers=heads) |
| |
| self.assertTrue(heads['User-agent'] in r.text) |
| self.assertEqual(r.status_code, 200) |
| |
| def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self): |
| heads = {'User-agent': 'Mozilla/5.0'} |
| |
| r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads) |
| self.assertEqual(r.status_code, 200) |
| |
| def test_set_cookie_on_301(self): |
| s = requests.session() |
| url = httpbin('cookies/set?foo=bar') |
| r = s.get(url) |
| self.assertTrue(s.cookies['foo'] == 'bar') |
| |
| def test_cookie_sent_on_redirect(self): |
| s = requests.session() |
| s.get(httpbin('cookies/set?foo=bar')) |
| r = s.get(httpbin('redirect/1')) # redirects to httpbin('get') |
| self.assertTrue("Cookie" in r.json()["headers"]) |
| |
| def test_cookie_removed_on_expire(self): |
| s = requests.session() |
| s.get(httpbin('cookies/set?foo=bar')) |
| self.assertTrue(s.cookies['foo'] == 'bar') |
| s.get( |
| httpbin('response-headers'), |
| params={ |
| 'Set-Cookie': |
| 'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT' |
| } |
| ) |
| assert 'foo' not in s.cookies |
| |
| def test_request_cookie_overrides_session_cookie(self): |
| s = requests.session() |
| s.cookies['foo'] = 'bar' |
| r = s.get(httpbin('cookies'), cookies={'foo': 'baz'}) |
| assert r.json()['cookies']['foo'] == 'baz' |
| # Session cookie should not be modified |
| assert s.cookies['foo'] == 'bar' |
| |
| def test_generic_cookiejar_works(self): |
| cj = cookielib.CookieJar() |
| cookiejar_from_dict({'foo': 'bar'}, cj) |
| s = requests.session() |
| s.cookies = cj |
| r = s.get(httpbin('cookies')) |
| # Make sure the cookie was sent |
| assert r.json()['cookies']['foo'] == 'bar' |
| # Make sure the session cj is still the custom one |
| assert s.cookies is cj |
| |
| def test_requests_in_history_are_not_overridden(self): |
| resp = requests.get(httpbin('redirect/3')) |
| urls = [r.url for r in resp.history] |
| req_urls = [r.request.url for r in resp.history] |
| self.assertEquals(urls, req_urls) |
| |
| def test_user_agent_transfers(self): |
| |
| heads = { |
| 'User-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)' |
| } |
| |
| r = requests.get(httpbin('user-agent'), headers=heads) |
| self.assertTrue(heads['User-agent'] in r.text) |
| |
| heads = { |
| 'user-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)' |
| } |
| |
| r = requests.get(httpbin('user-agent'), headers=heads) |
| self.assertTrue(heads['user-agent'] in r.text) |
| |
| def test_HTTP_200_OK_HEAD(self): |
| r = requests.head(httpbin('get')) |
| self.assertEqual(r.status_code, 200) |
| |
| def test_HTTP_200_OK_PUT(self): |
| r = requests.put(httpbin('put')) |
| self.assertEqual(r.status_code, 200) |
| |
| def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self): |
| auth = ('user', 'pass') |
| url = httpbin('basic-auth', 'user', 'pass') |
| |
| r = requests.get(url, auth=auth) |
| self.assertEqual(r.status_code, 200) |
| |
| r = requests.get(url) |
| self.assertEqual(r.status_code, 401) |
| |
| s = requests.session() |
| s.auth = auth |
| r = s.get(url) |
| self.assertEqual(r.status_code, 200) |
| |
| def test_DIGEST_HTTP_200_OK_GET(self): |
| |
| auth = HTTPDigestAuth('user', 'pass') |
| url = httpbin('digest-auth', 'auth', 'user', 'pass') |
| |
| r = requests.get(url, auth=auth) |
| self.assertEqual(r.status_code, 200) |
| |
| r = requests.get(url) |
| self.assertEqual(r.status_code, 401) |
| |
| s = requests.session() |
| s.auth = auth |
| r = s.get(url) |
| self.assertEqual(r.status_code, 200) |
| |
| def test_DIGEST_STREAM(self): |
| |
| auth = HTTPDigestAuth('user', 'pass') |
| url = httpbin('digest-auth', 'auth', 'user', 'pass') |
| |
| r = requests.get(url, auth=auth, stream=True) |
| self.assertNotEqual(r.raw.read(), b'') |
| |
| r = requests.get(url, auth=auth, stream=False) |
| self.assertEqual(r.raw.read(), b'') |
| |
| |
| def test_DIGESTAUTH_WRONG_HTTP_401_GET(self): |
| |
| auth = HTTPDigestAuth('user', 'wrongpass') |
| url = httpbin('digest-auth', 'auth', 'user', 'pass') |
| |
| r = requests.get(url, auth=auth) |
| self.assertEqual(r.status_code, 401) |
| |
| r = requests.get(url) |
| self.assertEqual(r.status_code, 401) |
| |
| s = requests.session() |
| s.auth = auth |
| r = s.get(url) |
| self.assertEqual(r.status_code, 401) |
| |
| def test_POSTBIN_GET_POST_FILES(self): |
| |
| url = httpbin('post') |
| post1 = requests.post(url).raise_for_status() |
| |
| post1 = requests.post(url, data={'some': 'data'}) |
| self.assertEqual(post1.status_code, 200) |
| |
| with open('requirements.txt') as f: |
| post2 = requests.post(url, files={'some': f}) |
| self.assertEqual(post2.status_code, 200) |
| |
| post4 = requests.post(url, data='[{"some": "json"}]') |
| self.assertEqual(post4.status_code, 200) |
| |
| try: |
| requests.post(url, files=['bad file data']) |
| except ValueError: |
| pass |
| |
| def test_POSTBIN_GET_POST_FILES_WITH_DATA(self): |
| |
| url = httpbin('post') |
| post1 = requests.post(url).raise_for_status() |
| |
| post1 = requests.post(url, data={'some': 'data'}) |
| self.assertEqual(post1.status_code, 200) |
| |
| with open('requirements.txt') as f: |
| post2 = requests.post(url, data={'some': 'data'}, files={'some': f}) |
| self.assertEqual(post2.status_code, 200) |
| |
| post4 = requests.post(url, data='[{"some": "json"}]') |
| self.assertEqual(post4.status_code, 200) |
| |
| try: |
| requests.post(url, files=['bad file data']) |
| except ValueError: |
| pass |
| |
| def test_request_ok_set(self): |
| r = requests.get(httpbin('status', '404')) |
| self.assertEqual(r.ok, False) |
| |
| def test_status_raising(self): |
| r = requests.get(httpbin('status', '404')) |
| self.assertRaises(requests.exceptions.HTTPError, r.raise_for_status) |
| |
| r = requests.get(httpbin('status', '500')) |
| self.assertFalse(r.ok) |
| |
| def test_decompress_gzip(self): |
| r = requests.get(httpbin('gzip')) |
| r.content.decode('ascii') |
| |
| def test_unicode_get(self): |
| url = httpbin('/get') |
| requests.get(url, params={'foo': 'føø'}) |
| requests.get(url, params={'føø': 'føø'}) |
| requests.get(url, params={'føø': 'føø'}) |
| requests.get(url, params={'foo': 'foo'}) |
| requests.get(httpbin('ø'), params={'foo': 'foo'}) |
| |
| def test_unicode_header_name(self): |
| requests.put(httpbin('put'), headers={str('Content-Type'): 'application/octet-stream'}, data='\xff') # compat.str is unicode. |
| |
| def test_urlencoded_get_query_multivalued_param(self): |
| |
| r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz'])) |
| self.assertEqual(r.status_code, 200) |
| self.assertEqual(r.url, httpbin('get?test=foo&test=baz')) |
| |
| def test_different_encodings_dont_break_post(self): |
| r = requests.post(httpbin('post'), |
| data={'stuff': json.dumps({'a': 123})}, |
| params={'blah': 'asdf1234'}, |
| files={'file': ('test_requests.py', open(__file__, 'rb'))}) |
| self.assertEqual(r.status_code, 200) |
| |
| def test_unicode_multipart_post(self): |
| r = requests.post(httpbin('post'), |
| data={'stuff': u'ëlïxr'}, |
| files={'file': ('test_requests.py', open(__file__, 'rb'))}) |
| self.assertEqual(r.status_code, 200) |
| |
| r = requests.post(httpbin('post'), |
| data={'stuff': u'ëlïxr'.encode('utf-8')}, |
| files={'file': ('test_requests.py', open(__file__, 'rb'))}) |
| self.assertEqual(r.status_code, 200) |
| |
| r = requests.post(httpbin('post'), |
| data={'stuff': 'elixr'}, |
| files={'file': ('test_requests.py', open(__file__, 'rb'))}) |
| self.assertEqual(r.status_code, 200) |
| |
| r = requests.post(httpbin('post'), |
| data={'stuff': 'elixr'.encode('utf-8')}, |
| files={'file': ('test_requests.py', open(__file__, 'rb'))}) |
| self.assertEqual(r.status_code, 200) |
| |
| def test_unicode_multipart_post_fieldnames(self): |
| filename = os.path.splitext(__file__)[0] + '.py' |
| r = requests.Request(method='POST', |
| url=httpbin('post'), |
| data={'stuff'.encode('utf-8'): 'elixr'}, |
| files={'file': ('test_requests.py', |
| open(filename, 'rb'))}) |
| prep = r.prepare() |
| self.assertTrue(b'name="stuff"' in prep.body) |
| self.assertFalse(b'name="b\'stuff\'"' in prep.body) |
| |
| def test_custom_content_type(self): |
| r = requests.post(httpbin('post'), |
| data={'stuff': json.dumps({'a': 123})}, |
| files={'file1': ('test_requests.py', open(__file__, 'rb')), |
| 'file2': ('test_requests', open(__file__, 'rb'), |
| 'text/py-content-type')}) |
| self.assertEqual(r.status_code, 200) |
| self.assertTrue(b"text/py-content-type" in r.request.body) |
| |
| def test_hook_receives_request_arguments(self): |
| def hook(resp, **kwargs): |
| assert resp is not None |
| assert kwargs != {} |
| |
| requests.Request('GET', HTTPBIN, hooks={'response': hook}) |
| |
| def test_prepared_request_hook(self): |
| def hook(resp, **kwargs): |
| resp.hook_working = True |
| return resp |
| |
| req = requests.Request('GET', HTTPBIN, hooks={'response': hook}) |
| prep = req.prepare() |
| |
| s = requests.Session() |
| resp = s.send(prep) |
| |
| self.assertTrue(hasattr(resp, 'hook_working')) |
| |
| def test_links(self): |
| r = requests.Response() |
| r.headers = { |
| 'cache-control': 'public, max-age=60, s-maxage=60', |
| 'connection': 'keep-alive', |
| 'content-encoding': 'gzip', |
| 'content-type': 'application/json; charset=utf-8', |
| 'date': 'Sat, 26 Jan 2013 16:47:56 GMT', |
| 'etag': '"6ff6a73c0e446c1f61614769e3ceb778"', |
| 'last-modified': 'Sat, 26 Jan 2013 16:22:39 GMT', |
| 'link': ('<https://api.github.com/users/kennethreitz/repos?' |
| 'page=2&per_page=10>; rel="next", <https://api.github.' |
| 'com/users/kennethreitz/repos?page=7&per_page=10>; ' |
| ' rel="last"'), |
| 'server': 'GitHub.com', |
| 'status': '200 OK', |
| 'vary': 'Accept', |
| 'x-content-type-options': 'nosniff', |
| 'x-github-media-type': 'github.beta', |
| 'x-ratelimit-limit': '60', |
| 'x-ratelimit-remaining': '57' |
| } |
| self.assertEqual(r.links['next']['rel'], 'next') |
| |
| def test_cookie_parameters(self): |
| key = 'some_cookie' |
| value = 'some_value' |
| secure = True |
| domain = 'test.com' |
| rest = {'HttpOnly': True} |
| |
| jar = requests.cookies.RequestsCookieJar() |
| jar.set(key, value, secure=secure, domain=domain, rest=rest) |
| |
| self.assertEqual(len(jar), 1) |
| self.assertTrue('some_cookie' in jar) |
| |
| cookie = list(jar)[0] |
| self.assertEqual(cookie.secure, secure) |
| self.assertEqual(cookie.domain, domain) |
| self.assertEqual(cookie._rest['HttpOnly'], rest['HttpOnly']) |
| |
| def test_time_elapsed_blank(self): |
| r = requests.get(httpbin('get')) |
| td = r.elapsed |
| total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600) |
| * 10**6) / 10**6) |
| self.assertTrue(total_seconds > 0.0) |
| |
| def test_response_is_iterable(self): |
| r = requests.Response() |
| io = StringIO.StringIO('abc') |
| read_ = io.read |
| |
| def read_mock(amt, decode_content=None): |
| return read_(amt) |
| setattr(io, 'read', read_mock) |
| r.raw = io |
| self.assertTrue(next(iter(r))) |
| io.close() |
| |
| def test_get_auth_from_url(self): |
| url = 'http://user:pass@complex.url.com/path?query=yes' |
| self.assertEqual(('user', 'pass'), |
| requests.utils.get_auth_from_url(url)) |
| |
| def test_cannot_send_unprepared_requests(self): |
| r = requests.Request(url=HTTPBIN) |
| self.assertRaises(ValueError, requests.Session().send, r) |
| |
| def test_http_error(self): |
| error = requests.exceptions.HTTPError() |
| self.assertEqual(error.response, None) |
| response = requests.Response() |
| error = requests.exceptions.HTTPError(response=response) |
| self.assertEqual(error.response, response) |
| error = requests.exceptions.HTTPError('message', response=response) |
| self.assertEqual(str(error), 'message') |
| self.assertEqual(error.response, response) |
| |
| def test_session_pickling(self): |
| r = requests.Request('GET', httpbin('get')) |
| s = requests.Session() |
| |
| s = pickle.loads(pickle.dumps(s)) |
| |
| r = s.send(r.prepare()) |
| self.assertEqual(r.status_code, 200) |
| |
| def test_fixes_1329(self): |
| """ |
| Ensure that header updates are done case-insensitively. |
| """ |
| s = requests.Session() |
| s.headers.update({'ACCEPT': 'BOGUS'}) |
| s.headers.update({'accept': 'application/json'}) |
| r = s.get(httpbin('get')) |
| headers = r.request.headers |
| # ASCII encode because of key comparison changes in py3 |
| self.assertEqual( |
| headers['accept'.encode('ascii')], |
| 'application/json' |
| ) |
| self.assertEqual( |
| headers['Accept'.encode('ascii')], |
| 'application/json' |
| ) |
| self.assertEqual( |
| headers['ACCEPT'.encode('ascii')], |
| 'application/json' |
| ) |
| |
| def test_uppercase_scheme(self): |
| r = requests.get('HTTP://example.com/') |
| self.assertEqual(r.status_code, 200) |
| |
| def test_uppercase_scheme_redirect(self): |
| r = requests.get(httpbin('redirect-to'), params={'url': 'HTTP://example.com/'}) |
| self.assertEqual(r.status_code, 200) |
| |
| def test_transport_adapter_ordering(self): |
| s = requests.Session() |
| order = ['https://', 'http://'] |
| self.assertEqual(order, list(s.adapters)) |
| s.mount('http://git', HTTPAdapter()) |
| s.mount('http://github', HTTPAdapter()) |
| s.mount('http://github.com', HTTPAdapter()) |
| s.mount('http://github.com/about/', HTTPAdapter()) |
| order = [ |
| 'http://github.com/about/', |
| 'http://github.com', |
| 'http://github', |
| 'http://git', |
| 'https://', |
| 'http://', |
| ] |
| self.assertEqual(order, list(s.adapters)) |
| s.mount('http://gittip', HTTPAdapter()) |
| s.mount('http://gittip.com', HTTPAdapter()) |
| s.mount('http://gittip.com/about/', HTTPAdapter()) |
| order = [ |
| 'http://github.com/about/', |
| 'http://gittip.com/about/', |
| 'http://github.com', |
| 'http://gittip.com', |
| 'http://github', |
| 'http://gittip', |
| 'http://git', |
| 'https://', |
| 'http://', |
| ] |
| self.assertEqual(order, list(s.adapters)) |
| s2 = requests.Session() |
| s2.adapters = {'http://': HTTPAdapter()} |
| s2.mount('https://', HTTPAdapter()) |
| self.assertTrue('http://' in s2.adapters) |
| self.assertTrue('https://' in s2.adapters) |
| |
| def test_header_remove_is_case_insensitive(self): |
| # From issue #1321 |
| s = requests.Session() |
| s.headers['foo'] = 'bar' |
| r = s.get(httpbin('get'), headers={'FOO': None}) |
| assert 'foo' not in r.request.headers |
| |
| def test_params_are_merged_case_sensitive(self): |
| s = requests.Session() |
| s.params['foo'] = 'bar' |
| r = s.get(httpbin('get'), params={'FOO': 'bar'}) |
| assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'} |
| |
| |
| def test_long_authinfo_in_url(self): |
| url = 'http://{0}:{1}@{2}:9000/path?query#frag'.format( |
| 'E8A3BE87-9E3F-4620-8858-95478E385B5B', |
| 'EA770032-DA4D-4D84-8CE9-29C6D910BF1E', |
| 'exactly-------------sixty-----------three------------characters', |
| ) |
| r = requests.Request('GET', url).prepare() |
| self.assertEqual(r.url, url) |
| |
| |
| class TestCaseInsensitiveDict(unittest.TestCase): |
| |
| def test_mapping_init(self): |
| cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'}) |
| self.assertEqual(len(cid), 2) |
| self.assertTrue('foo' in cid) |
| self.assertTrue('bar' in cid) |
| |
| def test_iterable_init(self): |
| cid = CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')]) |
| self.assertEqual(len(cid), 2) |
| self.assertTrue('foo' in cid) |
| self.assertTrue('bar' in cid) |
| |
| def test_kwargs_init(self): |
| cid = CaseInsensitiveDict(FOO='foo', BAr='bar') |
| self.assertEqual(len(cid), 2) |
| self.assertTrue('foo' in cid) |
| self.assertTrue('bar' in cid) |
| |
| def test_docstring_example(self): |
| cid = CaseInsensitiveDict() |
| cid['Accept'] = 'application/json' |
| self.assertEqual(cid['aCCEPT'], 'application/json') |
| self.assertEqual(list(cid), ['Accept']) |
| |
| def test_len(self): |
| cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'}) |
| cid['A'] = 'a' |
| self.assertEqual(len(cid), 2) |
| |
| def test_getitem(self): |
| cid = CaseInsensitiveDict({'Spam': 'blueval'}) |
| self.assertEqual(cid['spam'], 'blueval') |
| self.assertEqual(cid['SPAM'], 'blueval') |
| |
| def test_fixes_649(self): |
| """__setitem__ should behave case-insensitively.""" |
| cid = CaseInsensitiveDict() |
| cid['spam'] = 'oneval' |
| cid['Spam'] = 'twoval' |
| cid['sPAM'] = 'redval' |
| cid['SPAM'] = 'blueval' |
| self.assertEqual(cid['spam'], 'blueval') |
| self.assertEqual(cid['SPAM'], 'blueval') |
| self.assertEqual(list(cid.keys()), ['SPAM']) |
| |
| def test_delitem(self): |
| cid = CaseInsensitiveDict() |
| cid['Spam'] = 'someval' |
| del cid['sPam'] |
| self.assertFalse('spam' in cid) |
| self.assertEqual(len(cid), 0) |
| |
| def test_contains(self): |
| cid = CaseInsensitiveDict() |
| cid['Spam'] = 'someval' |
| self.assertTrue('Spam' in cid) |
| self.assertTrue('spam' in cid) |
| self.assertTrue('SPAM' in cid) |
| self.assertTrue('sPam' in cid) |
| self.assertFalse('notspam' in cid) |
| |
| def test_get(self): |
| cid = CaseInsensitiveDict() |
| cid['spam'] = 'oneval' |
| cid['SPAM'] = 'blueval' |
| self.assertEqual(cid.get('spam'), 'blueval') |
| self.assertEqual(cid.get('SPAM'), 'blueval') |
| self.assertEqual(cid.get('sPam'), 'blueval') |
| self.assertEqual(cid.get('notspam', 'default'), 'default') |
| |
| def test_update(self): |
| cid = CaseInsensitiveDict() |
| cid['spam'] = 'blueval' |
| cid.update({'sPam': 'notblueval'}) |
| self.assertEqual(cid['spam'], 'notblueval') |
| cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'}) |
| cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'}) |
| self.assertEqual(len(cid), 2) |
| self.assertEqual(cid['foo'], 'anotherfoo') |
| self.assertEqual(cid['bar'], 'anotherbar') |
| |
| def test_update_retains_unchanged(self): |
| cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'}) |
| cid.update({'foo': 'newfoo'}) |
| self.assertEquals(cid['bar'], 'bar') |
| |
| def test_iter(self): |
| cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'}) |
| keys = frozenset(['Spam', 'Eggs']) |
| self.assertEqual(frozenset(iter(cid)), keys) |
| |
| def test_equality(self): |
| cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'}) |
| othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'}) |
| self.assertEqual(cid, othercid) |
| del othercid['spam'] |
| self.assertNotEqual(cid, othercid) |
| self.assertEqual(cid, {'spam': 'blueval', 'eggs': 'redval'}) |
| |
| def test_setdefault(self): |
| cid = CaseInsensitiveDict({'Spam': 'blueval'}) |
| self.assertEqual( |
| cid.setdefault('spam', 'notblueval'), |
| 'blueval' |
| ) |
| self.assertEqual( |
| cid.setdefault('notspam', 'notblueval'), |
| 'notblueval' |
| ) |
| |
| def test_lower_items(self): |
| cid = CaseInsensitiveDict({ |
| 'Accept': 'application/json', |
| 'user-Agent': 'requests', |
| }) |
| keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items()) |
| lowerkeyset = frozenset(['accept', 'user-agent']) |
| self.assertEqual(keyset, lowerkeyset) |
| |
| def test_preserve_key_case(self): |
| cid = CaseInsensitiveDict({ |
| 'Accept': 'application/json', |
| 'user-Agent': 'requests', |
| }) |
| keyset = frozenset(['Accept', 'user-Agent']) |
| self.assertEqual(frozenset(i[0] for i in cid.items()), keyset) |
| self.assertEqual(frozenset(cid.keys()), keyset) |
| self.assertEqual(frozenset(cid), keyset) |
| |
| def test_preserve_last_key_case(self): |
| cid = CaseInsensitiveDict({ |
| 'Accept': 'application/json', |
| 'user-Agent': 'requests', |
| }) |
| cid.update({'ACCEPT': 'application/json'}) |
| cid['USER-AGENT'] = 'requests' |
| keyset = frozenset(['ACCEPT', 'USER-AGENT']) |
| self.assertEqual(frozenset(i[0] for i in cid.items()), keyset) |
| self.assertEqual(frozenset(cid.keys()), keyset) |
| self.assertEqual(frozenset(cid), keyset) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |