| #!/usr/bin/env python |
| # Copyright 2011 Google Inc. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import calendar |
| import email.utils |
| import httparchive |
| import unittest |
| |
| |
| def create_request(headers): |
| return httparchive.ArchivedHttpRequest( |
| 'GET', 'www.test.com', '/', None, headers) |
| |
| def create_response(headers): |
| return httparchive.ArchivedHttpResponse( |
| 11, 200, 'OK', headers, '') |
| |
| |
| class HttpArchiveTest(unittest.TestCase): |
| |
| REQUEST_HEADERS = {} |
| REQUEST = create_request(REQUEST_HEADERS) |
| |
| # Used for if-(un)modified-since checks |
| DATE_PAST = 'Wed, 13 Jul 2011 03:58:08 GMT' |
| DATE_PRESENT = 'Wed, 20 Jul 2011 04:58:08 GMT' |
| DATE_FUTURE = 'Wed, 27 Jul 2011 05:58:08 GMT' |
| DATE_INVALID = 'This is an invalid date!!' |
| |
| # etag values |
| ETAG_VALID = 'etag' |
| ETAG_INVALID = 'This is an invalid etag value!!' |
| |
| RESPONSE_HEADERS = [('last-modified', DATE_PRESENT), ('etag', ETAG_VALID)] |
| RESPONSE = create_response(RESPONSE_HEADERS) |
| |
| def setUp(self): |
| self.archive = httparchive.HttpArchive() |
| self.archive[self.REQUEST] = self.RESPONSE |
| |
| # Also add an identical POST request for testing |
| request = httparchive.ArchivedHttpRequest( |
| 'POST', 'www.test.com', '/', None, self.REQUEST_HEADERS) |
| self.archive[request] = self.RESPONSE |
| |
| def tearDown(self): |
| pass |
| |
| def test_init(self): |
| archive = httparchive.HttpArchive() |
| self.assertEqual(len(archive), 0) |
| |
| def test_request__TrimHeaders(self): |
| request = httparchive.ArchivedHttpRequest |
| header1 = {'accept-encoding': 'gzip,deflate'} |
| self.assertEqual(request._TrimHeaders(header1), |
| [(k, v) for k, v in header1.items()]) |
| |
| header2 = {'referer': 'www.google.com'} |
| self.assertEqual(request._TrimHeaders(header2), []) |
| |
| header3 = {'referer': 'www.google.com', 'cookie': 'cookie_monster!', |
| 'hello': 'world'} |
| self.assertEqual(request._TrimHeaders(header3), [('hello', 'world')]) |
| |
| # Tests that spaces and trailing comma get stripped. |
| header4 = {'accept-encoding': 'gzip, deflate,, '} |
| self.assertEqual(request._TrimHeaders(header4), |
| [('accept-encoding', 'gzip,deflate')]) |
| |
| # Tests that 'lzma' gets stripped. |
| header5 = {'accept-encoding': 'gzip, deflate, lzma'} |
| self.assertEqual(request._TrimHeaders(header5), |
| [('accept-encoding', 'gzip,deflate')]) |
| |
| # Tests that x-client-data gets stripped. |
| header6 = {'x-client-data': 'testdata'} |
| self.assertEqual(request._TrimHeaders(header6), []) |
| |
| def test_matches(self): |
| headers = {} |
| request1 = httparchive.ArchivedHttpRequest( |
| 'GET', 'www.test.com', '/index.html?hello=world', None, headers) |
| request2 = httparchive.ArchivedHttpRequest( |
| 'GET', 'www.test.com', '/index.html?foo=bar', None, headers) |
| |
| self.assert_(not request1.matches( |
| request2.command, request2.host, request2.full_path, use_query=True)) |
| self.assert_(request1.matches( |
| request2.command, request2.host, request2.full_path, use_query=False)) |
| |
| self.assert_(request1.matches( |
| request2.command, request2.host, None, use_query=True)) |
| self.assert_(request1.matches( |
| request2.command, None, request2.full_path, use_query=False)) |
| |
| empty_request = httparchive.ArchivedHttpRequest( |
| None, None, None, None, headers) |
| self.assert_(not empty_request.matches( |
| request2.command, request2.host, None, use_query=True)) |
| self.assert_(not empty_request.matches( |
| request2.command, None, request2.full_path, use_query=False)) |
| |
| def setup_find_closest_request(self): |
| headers = {} |
| request1 = httparchive.ArchivedHttpRequest( |
| 'GET', 'www.test.com', '/a?hello=world', None, headers) |
| request2 = httparchive.ArchivedHttpRequest( |
| 'GET', 'www.test.com', '/a?foo=bar', None, headers) |
| request3 = httparchive.ArchivedHttpRequest( |
| 'GET', 'www.test.com', '/b?hello=world', None, headers) |
| request4 = httparchive.ArchivedHttpRequest( |
| 'GET', 'www.test.com', '/c?hello=world', None, headers) |
| |
| archive = httparchive.HttpArchive() |
| # Add requests 2 and 3 and find closest match with request1 |
| archive[request2] = self.RESPONSE |
| archive[request3] = self.RESPONSE |
| |
| return archive, request1, request2, request3, request4 |
| |
| def test_find_closest_request(self): |
| archive, request1, request2, request3, request4 = ( |
| self.setup_find_closest_request()) |
| |
| # Always favor requests with same paths, even if use_path=False. |
| self.assertEqual( |
| request2, archive.find_closest_request(request1, use_path=False)) |
| # If we match strictly on path, request2 is the only match |
| self.assertEqual( |
| request2, archive.find_closest_request(request1, use_path=True)) |
| # request4 can be matched with request3, if use_path=False |
| self.assertEqual( |
| request3, archive.find_closest_request(request4, use_path=False)) |
| # ...but None, if use_path=True |
| self.assertEqual( |
| None, archive.find_closest_request(request4, use_path=True)) |
| |
| def test_find_closest_request_delete_simple(self): |
| archive, request1, request2, request3, request4 = ( |
| self.setup_find_closest_request()) |
| |
| del archive[request3] |
| self.assertEqual( |
| request2, archive.find_closest_request(request1, use_path=False)) |
| self.assertEqual( |
| request2, archive.find_closest_request(request1, use_path=True)) |
| |
| def test_find_closest_request_delete_complex(self): |
| archive, request1, request2, request3, request4 = ( |
| self.setup_find_closest_request()) |
| |
| del archive[request2] |
| self.assertEqual( |
| request3, archive.find_closest_request(request1, use_path=False)) |
| self.assertEqual( |
| None, archive.find_closest_request(request1, use_path=True)) |
| |
| def test_find_closest_request_timestamp(self): |
| headers = {} |
| request1 = httparchive.ArchivedHttpRequest( |
| 'GET', 'www.test.com', '/index.html?time=100000000&important=true', |
| None, headers) |
| request2 = httparchive.ArchivedHttpRequest( |
| 'GET', 'www.test.com', '/index.html?time=99999999&important=true', |
| None, headers) |
| request3 = httparchive.ArchivedHttpRequest( |
| 'GET', 'www.test.com', '/index.html?time=10000000&important=false', |
| None, headers) |
| archive = httparchive.HttpArchive() |
| # Add requests 2 and 3 and find closest match with request1 |
| archive[request2] = self.RESPONSE |
| archive[request3] = self.RESPONSE |
| |
| # Although request3 is lexicographically closer, request2 is semantically |
| # more similar. |
| self.assertEqual( |
| request2, archive.find_closest_request(request1, use_path=True)) |
| |
| def test_get_cmp_seq(self): |
| # The order of key-value pairs in query and header respectively should not |
| # matter. |
| headers = {'k2': 'v2', 'k1': 'v1'} |
| request = httparchive.ArchivedHttpRequest( |
| 'GET', 'www.test.com', '/a?c=d&a=b;e=f', None, headers) |
| self.assertEqual([('a', 'b'), ('c', 'd'), ('e', 'f'), |
| ('k1', 'v1'), ('k2', 'v2')], |
| request._GetCmpSeq('c=d&a=b;e=f')) |
| |
| def test_get_simple(self): |
| request = self.REQUEST |
| response = self.RESPONSE |
| archive = self.archive |
| |
| self.assertEqual(archive.get(request), response) |
| |
| false_request_headers = {'foo': 'bar'} |
| false_request = create_request(false_request_headers) |
| self.assertEqual(archive.get(false_request, default=None), None) |
| |
| def test_get_modified_headers(self): |
| request = self.REQUEST |
| response = self.RESPONSE |
| archive = self.archive |
| not_modified_response = httparchive.create_response(304) |
| |
| # Fail check and return response again |
| request_headers = {'if-modified-since': self.DATE_PAST} |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), response) |
| |
| # Succeed check and return 304 Not Modified |
| request_headers = {'if-modified-since': self.DATE_FUTURE} |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), not_modified_response) |
| |
| # Succeed check and return 304 Not Modified |
| request_headers = {'if-modified-since': self.DATE_PRESENT} |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), not_modified_response) |
| |
| # Invalid date, fail check and return response again |
| request_headers = {'if-modified-since': self.DATE_INVALID} |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), response) |
| |
| # fail check since the request is not a GET or HEAD request (as per RFC) |
| request_headers = {'if-modified-since': self.DATE_FUTURE} |
| request = httparchive.ArchivedHttpRequest( |
| 'POST', 'www.test.com', '/', None, request_headers) |
| self.assertEqual(archive.get(request), response) |
| |
| def test_get_unmodified_headers(self): |
| request = self.REQUEST |
| response = self.RESPONSE |
| archive = self.archive |
| not_modified_response = httparchive.create_response(304) |
| |
| # Succeed check |
| request_headers = {'if-unmodified-since': self.DATE_PAST} |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), not_modified_response) |
| |
| # Fail check |
| request_headers = {'if-unmodified-since': self.DATE_FUTURE} |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), response) |
| |
| # Succeed check |
| request_headers = {'if-unmodified-since': self.DATE_PRESENT} |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), not_modified_response) |
| |
| # Fail check |
| request_headers = {'if-unmodified-since': self.DATE_INVALID} |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), response) |
| |
| # Fail check since the request is not a GET or HEAD request (as per RFC) |
| request_headers = {'if-modified-since': self.DATE_PAST} |
| request = httparchive.ArchivedHttpRequest( |
| 'POST', 'www.test.com', '/', None, request_headers) |
| self.assertEqual(archive.get(request), response) |
| |
| def test_get_etags(self): |
| request = self.REQUEST |
| response = self.RESPONSE |
| archive = self.archive |
| not_modified_response = httparchive.create_response(304) |
| precondition_failed_response = httparchive.create_response(412) |
| |
| # if-match headers |
| request_headers = {'if-match': self.ETAG_VALID} |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), response) |
| |
| request_headers = {'if-match': self.ETAG_INVALID} |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), precondition_failed_response) |
| |
| # if-none-match headers |
| request_headers = {'if-none-match': self.ETAG_VALID} |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), not_modified_response) |
| |
| request_headers = {'if-none-match': self.ETAG_INVALID} |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), response) |
| |
| def test_get_multiple_match_headers(self): |
| request = self.REQUEST |
| response = self.RESPONSE |
| archive = self.archive |
| not_modified_response = httparchive.create_response(304) |
| precondition_failed_response = httparchive.create_response(412) |
| |
| # if-match headers |
| # If the request would, without the If-Match header field, |
| # result in anything other than a 2xx or 412 status, |
| # then the If-Match header MUST be ignored. |
| |
| request_headers = { |
| 'if-match': self.ETAG_VALID, |
| 'if-modified-since': self.DATE_PAST, |
| } |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), response) |
| |
| # Invalid etag, precondition failed |
| request_headers = { |
| 'if-match': self.ETAG_INVALID, |
| 'if-modified-since': self.DATE_PAST, |
| } |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), precondition_failed_response) |
| |
| # 304 response; ignore if-match header |
| request_headers = { |
| 'if-match': self.ETAG_VALID, |
| 'if-modified-since': self.DATE_FUTURE, |
| } |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), not_modified_response) |
| |
| # 304 response; ignore if-match header |
| request_headers = { |
| 'if-match': self.ETAG_INVALID, |
| 'if-modified-since': self.DATE_PRESENT, |
| } |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), not_modified_response) |
| |
| # Invalid etag, precondition failed |
| request_headers = { |
| 'if-match': self.ETAG_INVALID, |
| 'if-modified-since': self.DATE_INVALID, |
| } |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), precondition_failed_response) |
| |
| def test_get_multiple_none_match_headers(self): |
| request = self.REQUEST |
| response = self.RESPONSE |
| archive = self.archive |
| not_modified_response = httparchive.create_response(304) |
| precondition_failed_response = httparchive.create_response(412) |
| |
| # if-none-match headers |
| # If the request would, without the If-None-Match header field, |
| # result in anything other than a 2xx or 304 status, |
| # then the If-None-Match header MUST be ignored. |
| |
| request_headers = { |
| 'if-none-match': self.ETAG_VALID, |
| 'if-modified-since': self.DATE_PAST, |
| } |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), response) |
| |
| request_headers = { |
| 'if-none-match': self.ETAG_INVALID, |
| 'if-modified-since': self.DATE_PAST, |
| } |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), response) |
| |
| # etag match, precondition failed |
| request_headers = { |
| 'if-none-match': self.ETAG_VALID, |
| 'if-modified-since': self.DATE_FUTURE, |
| } |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), not_modified_response) |
| |
| request_headers = { |
| 'if-none-match': self.ETAG_INVALID, |
| 'if-modified-since': self.DATE_PRESENT, |
| } |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), not_modified_response) |
| |
| request_headers = { |
| 'if-none-match': self.ETAG_INVALID, |
| 'if-modified-since': self.DATE_INVALID, |
| } |
| request = create_request(request_headers) |
| self.assertEqual(archive.get(request), response) |
| |
| def test_response__TrimHeaders(self): |
| response = httparchive.ArchivedHttpResponse |
| header1 = [('access-control-allow-origin', '*'), |
| ('content-type', 'image/jpeg'), |
| ('content-length', 2878)] |
| self.assertEqual(response._TrimHeaders(header1), header1) |
| |
| header2 = [('content-type', 'text/javascript; charset=utf-8'), |
| ('connection', 'keep-alive'), |
| ('cache-control', 'private, must-revalidate, max-age=0'), |
| ('content-encoding', 'gzip')] |
| self.assertEqual(response._TrimHeaders(header2), header2) |
| |
| header3 = [('content-security-policy', """\ |
| default-src 'self' http://*.cnn.com:* https://*.cnn.com:* \ |
| *.cnn.net:* *.turner.com:* *.ugdturner.com:* *.vgtf.net:*; \ |
| script-src 'unsafe-inline' 'unsafe-eval' 'self' *; \ |
| style-src 'unsafe-inline' 'self' *; frame-src 'self' *; \ |
| object-src 'self' *; img-src 'self' * data:; media-src 'self' *; \ |
| font-src 'self' *; connect-src 'self' *"""), |
| ('access-control-allow-origin', '*'), |
| ('content-type', 'text/html; charset=utf-8'), |
| ('content-encoding', 'gzip')] |
| self.assertEqual(response._TrimHeaders(header3), [ |
| ('access-control-allow-origin', '*'), |
| ('content-type', 'text/html; charset=utf-8'), |
| ('content-encoding', 'gzip') |
| ]) |
| |
| header4 = [('content-security-policy', """\ |
| default-src * data: blob:;script-src *.facebook.com *.fbcdn.net \ |
| *.facebook.net *.google-analytics.com *.virtualearth.net *.google.com \ |
| 127.0.0.1:* *.spotilocal.com:* 'unsafe-inline' 'unsafe-eval' \ |
| fbstatic-a.akamaihd.net fbcdn-static-b-a.akamaihd.net *.atlassolutions.com \ |
| blob: chrome-extension://lifbcibllhkdhoafpjfnlhfpfgnpldfl \ |
| *.liverail.com;style-src * 'unsafe-inline' data:;connect-src *.facebook.com \ |
| *.fbcdn.net *.facebook.net *.spotilocal.com:* *.akamaihd.net \ |
| wss://*.facebook.com:* https://fb.scanandcleanlocal.com:* \ |
| *.atlassolutions.com attachment.fbsbx.com ws://localhost:* \ |
| blob: 127.0.0.1:* *.liverail.com""")] |
| self.assertEqual(response._TrimHeaders(header4), []) |
| |
| |
| class ArchivedHttpResponse(unittest.TestCase): |
| PAST_DATE_A = 'Tue, 13 Jul 2010 03:47:07 GMT' |
| PAST_DATE_B = 'Tue, 13 Jul 2010 02:47:07 GMT' # PAST_DATE_A -1 hour |
| PAST_DATE_C = 'Tue, 13 Jul 2010 04:47:07 GMT' # PAST_DATE_A +1 hour |
| NOW_DATE_A = 'Wed, 20 Jul 2011 04:58:08 GMT' |
| NOW_DATE_B = 'Wed, 20 Jul 2011 03:58:08 GMT' # NOW_DATE_A -1 hour |
| NOW_DATE_C = 'Wed, 20 Jul 2011 05:58:08 GMT' # NOW_DATE_A +1 hour |
| NOW_SECONDS = calendar.timegm(email.utils.parsedate(NOW_DATE_A)) |
| |
| def setUp(self): |
| self.response = create_response([('date', self.PAST_DATE_A)]) |
| |
| def test_update_date_same_date(self): |
| self.assertEqual( |
| self.response.update_date(self.PAST_DATE_A, now=self.NOW_SECONDS), |
| self.NOW_DATE_A) |
| |
| def test_update_date_before_date(self): |
| self.assertEqual( |
| self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS), |
| self.NOW_DATE_B) |
| |
| def test_update_date_after_date(self): |
| self.assertEqual( |
| self.response.update_date(self.PAST_DATE_C, now=self.NOW_SECONDS), |
| self.NOW_DATE_C) |
| |
| def test_update_date_bad_date_param(self): |
| self.assertEqual( |
| self.response.update_date('garbage date', now=self.NOW_SECONDS), |
| 'garbage date') |
| |
| def test_update_date_bad_date_header(self): |
| self.response.set_header('date', 'garbage date') |
| self.assertEqual( |
| self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS), |
| self.PAST_DATE_B) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |