| # Copyright 2017 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| # |
| # |
| # This test checks that Chrome does not crash or otherwise wildly misbehave when |
| # it receives unexpected Chrome-Proxy header values and/or directives. This is |
| # done by fuzz testing which is configured through a dictionary object at the |
| # top of this file. |
| # |
| # The fuzz testing generates URLs which are requested from the Chrome Proxy test |
| # server which will modify its response based on the encoded data in the URL. |
| # |
| # The fuzz testing is configured in a dictionary-type object of the following |
| # format: |
| # { |
| # # This specifies a header that will be fuzzed. Only one header is fuzzed |
| # # per test. When a header is being fuzz tested, any entry it has in the |
| # # STATIC_RESPONSE_HEADERS object will be overwritten. |
| # "chrome-proxy": { |
| # # This specifies the directive key values that are possible to use in |
| # # fuzzing. |
| # "directive_keys: [ |
| # "page-policy", |
| # "foo", |
| # ], |
| # # This specifies the directive values that are possible to use in |
| # # fuzzing. Any one of these values may end up mapped to any one of the |
| # # directive_keys above. |
| # "directive_values": [ |
| # "empty-image", |
| # "bar", |
| # ], |
| # # The maximum number of directives to use in this header. If this value |
| # # is greater than the number of directives given above, that will be |
| # # used instead. |
| # "max_directives": 10, |
| # |
| # # The maximum number of directive values to use per directive. If this |
| # # value is greater than the number of directive values given above, that |
| # # will be used instead. |
| # "max_directive_values": 10, |
| # |
| # # This is used to join the directive values when multiple are used. |
| # "directive_value_joiner": "|", |
| # }, |
| # } |
| # |
| # If the above configuration was given, the following values would be generated |
| # for the chrome-proxy header: |
| # <empty-string> |
| # foo |
| # foo=bar |
| # foo=empty-image |
| # foo=bar|empty-image |
| # page_policies |
| # page_policies=bar |
| # page_policies=empty-image |
| # page_policies=bar|empty-image |
| # foo,page_policies |
| # foo=bar,page_policies=bar |
| # foo=empty-image,page_policies=empty-image |
| # foo=bar|empty-image,page_policies=bar|empty-image |
| # |
| # Randomly generated values are also supported in the fuzz_header field |
| # ("chrome-proxy" in the above example), directive_keys, and directive_values. |
| # Randomly generated values can be specified using these formats: |
| # {{RAND_STR(N)}} |
| # Creates a random string of lowercase and digit characters of length N |
| # |
| # {{RAND_INT(N)}} |
| # Creates a random integer in the range [0, 10^N+1) with leading zeros |
| # Example: "Num cookies: {{RAND_INT(4)}}" yields "Num cookies: 0123" |
| # |
| # {{RAND_DBL(P.Q)}} |
| # Creates a random double in the range [0, 10^P+1) with leading zeros and |
| # up to Q places after the decimal point |
| # Example: "My money = ${{RAND_DBL(3,2)}}" yields "My money = $001.53" |
| # |
| # {{RAND_BOOL}} |
| # Creates a random boolean, either 'true' or 'false' |
| # Example: "I am awesome: {{RAND_BOOL}}" yields "I am awesome: true" |
| |
| import BaseHTTPServer |
| import base64 |
| import itertools |
| import json |
| import random |
| import re |
| import string |
| |
| from common import TestDriver |
| from common import IntegrationTest |
| from decorators import Slow |
| |
| # This dict configures how the fuzzing will operate. See documentation above for |
| # more information. |
| FUZZ_HEADERS = { |
| "chrome-proxy": { |
| "directive_keys": [ |
| "{{RAND_STR(10)}}", |
| "{{RAND_STR(10)}}", |
| "page-policies", |
| ], |
| "directive_values": [ |
| "empty-image", |
| "{{RAND_INT(1)}}", |
| "{{RAND_INT(4)}}", |
| "{{RAND_STR(10)}}", |
| "{{RAND_STR(10)}}", |
| "{{RAND_STR(10)}}", |
| ], |
| "max_directives": 3, |
| "max_directive_values": 3, |
| "directive_value_joiner": "|", |
| }, |
| } |
| |
| TEST_SERVER = "chromeproxy-test.appspot.com" |
| |
| # These headers will be present in every test server response. If one of these |
| # entries is also a fuzzed header above, then the fuzzed value will take the |
| # place of the static one instead. |
| STATIC_RESPONSE_HEADERS = { |
| "content-type": ["text/html"], |
| "via": ["1.1 Chrome-Compression-Proxy"], |
| "cache-control": ["no-cache, no-store, must-revalidate"], |
| "pragma": ["no-cache"], |
| "expires": ["0"], |
| } |
| |
| # This string will be used as the response body in every test and will be |
| # checked for existence on the final loaded page. |
| STATIC_RESPONSE_BODY = 'ok' |
| |
| rand_str_re = re.compile(r'{{RAND_STR\((\d+)\)}}') |
| rand_int_re = re.compile(r'{{RAND_INT\((\d+)\)}}') |
| rand_dbl_re = re.compile(r'{{RAND_DBL\((\d+)\.(\d+)\)}}') |
| rand_bool_re = re.compile(r'{{RAND_BOOL}}') |
| |
| def ParseRand(key, val): |
| """This helper function parses the {{RAND}} expressions in the given values |
| and returns them with random values subsituted in place. |
| |
| Args: |
| key: the header key with 0 or more {{RAND}} expressions |
| val: the header value with 0 or more {{RAND}} expressions |
| Returns: |
| A key, value tuple with subsituted random values |
| """ |
| def GenerateRand(length, charset): |
| return ''.join(random.choice(charset) for _ in range(length)) |
| def _parse_rand(v): |
| result = v |
| had_match = True |
| while had_match: |
| had_match = False |
| str_match = rand_str_re.search(result) |
| if str_match: |
| had_match = True |
| mag = int(result[str_match.start(1):str_match.end(1)]) |
| rand_str = GenerateRand(mag, string.ascii_lowercase + string.digits) |
| result = (result[:str_match.start()] + rand_str |
| + result[str_match.end():]) |
| int_match = rand_int_re.search(result) |
| if int_match: |
| had_match = True |
| mag = int(result[int_match.start(1):int_match.end(1)]) |
| rand_int = GenerateRand(mag, string.digits) |
| result = (result[:int_match.start()] + rand_int |
| + result[int_match.end():]) |
| dbl_match = rand_dbl_re.search(result) |
| if dbl_match: |
| had_match = True |
| magN = int(result[dbl_match.start(1):dbl_match.end(1)]) |
| magD = int(result[dbl_match.start(2):dbl_match.end(2)]) |
| rand_dbl = GenerateRand(magN, string.digits) + '.' + GenerateRand(magD, |
| string.digits) |
| result = (result[:dbl_match.start()] + rand_dbl |
| + result[dbl_match.end():]) |
| bool_match = rand_bool_re.search(result) |
| if bool_match: |
| had_match = True |
| rand_bool = bool(random.getrandbits(1)) |
| result = (result[:bool_match.start()] + str(rand_bool).lower() |
| + result[bool_match.end():]) |
| return result |
| return (_parse_rand(key), _parse_rand(val)) |
| |
| def GenerateFuzzedHeaders(cfg=FUZZ_HEADERS): |
| """This function yields header key value pairs which can be used to update a |
| Python dict representing HTTP headers. See file level documentation for more |
| information. |
| |
| Args: |
| cfg: the configuration dict that specifies how to fuzz the proxy headers |
| Yields: |
| one header key value pair |
| """ |
| for header_key in cfg: |
| fuzz = cfg[header_key] |
| dirs = fuzz['directive_keys'] |
| vals = fuzz['directive_values'] |
| max_dirs = min(fuzz['max_directives'], len(dirs)) |
| max_vals = min(fuzz['max_directive_values'], len(vals)) |
| def GenerateFuzzedValues(): |
| for n in range(0, max_vals + 1): |
| for c in itertools.combinations(vals, n): |
| yield c |
| # Yield an empty header key,value pair before doing all the combinations. |
| yield (header_key, '') |
| for num_dirs in range(1, max_dirs + 1): |
| for directive_set in itertools.combinations(dirs, num_dirs): |
| for values in GenerateFuzzedValues(): |
| value_list = list(values) |
| if '' in value_list: |
| value_list.remove('') |
| value_str = fuzz['directive_value_joiner'].join(value_list) |
| header = [] |
| for directive in directive_set: |
| if len(value_str) == 0: |
| header.append(directive) |
| else: |
| header.append('%s=%s' % (directive, value_str)) |
| yield ParseRand(header_key, ','.join(header)) |
| |
| class FuzzUnitTests(IntegrationTest): |
| |
| def testParseRand(self): |
| tests = { |
| "{{RAND_STR(1)}": r"{{RAND_STR\(1\)}", |
| "{{RAND_INT(1)}": r"{{RAND_INT\(1\)}", |
| "{{RAND_DBL(1.1)}": r"{{RAND_DBL\(1\.1\)}", |
| "{{RAND_DBL(11)}}": r"{{RAND_DBL\(11\)}}", |
| "{{RAND_BOOL}": r"{{RAND_BOOL}", |
| "{{RAND_STR(0)}}": "", |
| "hi{{RAND_STR(0)}}": "hi", |
| "{{RAND_STR(0)}}there": "there", |
| "hi{{RAND_STR(0)}}there": "hithere", |
| "{{RAND_STR(3)}}": r"[a-z0-9][a-z0-9][a-z0-9]", |
| "{{RAND_STR(3)}}there": r"[a-z0-9][a-z0-9][a-z0-9]there", |
| "hi{{RAND_STR(3)}}": r"hi[a-z0-9][a-z0-9][a-z0-9]", |
| "hi{{RAND_STR(3)}}there": r"hi[a-z0-9][a-z0-9][a-z0-9]there", |
| "{{RAND_INT(0)}}": "", |
| "hi{{RAND_INT(0)}}": "hi", |
| "{{RAND_INT(0)}}there": "there", |
| "hi{{RAND_INT(0)}}there": "hithere", |
| "{{RAND_INT(3)}}": r"[0-9][0-9][0-9]", |
| "{{RAND_INT(3)}}there": r"[0-9][0-9][0-9]there", |
| "hi{{RAND_INT(3)}}": r"hi[0-9][0-9][0-9]", |
| "hi{{RAND_INT(3)}}there": r"hi[0-9][0-9][0-9]there", |
| "{{RAND_DBL(0.0)}}": r"\.", |
| "hi{{RAND_DBL(0.0)}}": r"hi\.", |
| "{{RAND_DBL(0.0)}}there": r"\.there", |
| "hi{{RAND_DBL(0.0)}}there": r"hi\.there", |
| "{{RAND_DBL(3.3)}}": r"[0-9][0-9][0-9]\.[0-9][0-9][0-9]", |
| "hi{{RAND_DBL(3.3)}}": r"hi[0-9][0-9][0-9]\.[0-9][0-9][0-9]", |
| "{{RAND_DBL(3.3)}}there": r"[0-9][0-9][0-9]\.[0-9][0-9][0-9]there", |
| "hi{{RAND_DBL(3.3)}}there": r"hi[0-9][0-9][0-9]\.[0-9][0-9][0-9]there", |
| "{{RAND_BOOL}}": r"(true|false)", |
| "{{RAND_BOOL}}there": r"(true|false)there", |
| "hi{{RAND_BOOL}}": r"hi(true|false)", |
| "hi{{RAND_BOOL}}there": r"hi(true|false)there", |
| "{{RAND_STR(1)}}{{RAND_STR(1)}}": r"[a-z0-9][a-z0-9]", |
| "{{RAND_STR(1)}}{{RAND_INT(1)}}": r"[a-z0-9][0-9]", |
| "{{RAND_STR(1)}}{{RAND_DBL(1.1)}}": r"[a-z0-9][0-9]\.[0-9]", |
| "{{RAND_STR(1)}}{{RAND_BOOL}}": r"[a-z0-9](true|false)", |
| "{{RAND_INT(1)}}{{RAND_STR(1)}}": r"[0-9][a-z0-9]", |
| "{{RAND_INT(1)}}{{RAND_INT(1)}}": r"[0-9][0-9]", |
| "{{RAND_INT(1)}}{{RAND_DBL(1.1)}}": r"[0-9][0-9]\.[0-9]", |
| "{{RAND_INT(1)}}{{RAND_BOOL}}": r"[0-9](true|false)", |
| "{{RAND_DBL(1.1)}}{{RAND_STR(1)}}": r"[0-9]\.[0-9][a-z0-9]", |
| "{{RAND_DBL(1.1)}}{{RAND_INT(1)}}": r"[0-9]\.[0-9][0-9]", |
| "{{RAND_DBL(1.1)}}{{RAND_DBL(1.1)}}": r"[0-9]\.[0-9][0-9]\.[0-9]", |
| "{{RAND_DBL(1.1)}}{{RAND_BOOL}}": r"[0-9]\.[0-9](true|false)", |
| "{{RAND_BOOL}}{{RAND_STR(1)}}": r"(true|false)[a-z0-9]", |
| "{{RAND_BOOL}}{{RAND_INT(1)}}": r"(true|false)[0-9]", |
| "{{RAND_BOOL}}{{RAND_DBL(1.1)}}": r"(true|false)[0-9]\.[0-9]", |
| "{{RAND_BOOL}}{{RAND_BOOL}}": r"(true|false)(true|false)", |
| } |
| for t in tests: |
| expected = re.compile('^' + tests[t] + '$') |
| gotK, gotV = ParseRand(t, t) |
| if not expected.match(gotK): |
| self.fail("%s doesn't match /%s/" % (gotK, tests[t])) |
| if not expected.match(gotV): |
| self.fail("%s doesn't match /%s/" % (gotK, tests[t])) |
| |
| def testGenerator(self): |
| test_cfg = { |
| "chrome-proxy": { |
| "directive_keys": [ |
| "foo", |
| "page_policies", |
| ], |
| "directive_values": [ |
| "bar", |
| "empty-image", |
| ], |
| "max_directives": 10, |
| "max_directive_values": 10, |
| "directive_value_joiner": "|", |
| }, |
| } |
| expected_headers = ['', 'foo', 'foo=bar', 'foo=empty-image', |
| 'foo=bar|empty-image', 'page_policies', 'page_policies=bar', |
| 'page_policies=empty-image', 'page_policies=bar|empty-image', |
| 'foo,page_policies', 'foo=bar,page_policies=bar', |
| 'foo=empty-image,page_policies=empty-image', |
| 'foo=bar|empty-image,page_policies=bar|empty-image', |
| ] |
| actual_headers = [] |
| for h in GenerateFuzzedHeaders(cfg=test_cfg): |
| actual_headers.append(h[1]) |
| expected_headers.sort() |
| actual_headers.sort() |
| self.assertEqual(expected_headers, actual_headers) |
| |
| class ProtocolFuzzer(IntegrationTest): |
| |
| def GenerateTestURLs(self): |
| """This function yields test URLs which will cause the test server to |
| respond with the given given headers and body. |
| |
| Yields: |
| URLs suitable for testing fuzzed response headers |
| """ |
| for fz_key, fz_val in GenerateFuzzedHeaders(): |
| headers = {} |
| headers.update(STATIC_RESPONSE_HEADERS) |
| headers.update({fz_key: [fz_val]}) |
| json_headers = json.dumps(headers) |
| b64_headers = base64.b64encode(json_headers) |
| url = "http://%s/default?respBody=%s&respHeader=%s" % (TEST_SERVER, |
| base64.b64encode(STATIC_RESPONSE_BODY), b64_headers) |
| yield (json_headers, url) |
| |
| @Slow |
| def testFuzzing(self): |
| with TestDriver() as t: |
| t.AddChromeArg('--enable-spdy-proxy-auth') |
| t.AddChromeArg('--data-reduction-proxy-http-proxies=' |
| 'https://chromeproxy-test.appspot.com') |
| for headers, url in self.GenerateTestURLs(): |
| try: |
| t.LoadURL(url) |
| # The main test is to make sure Chrome doesn't crash after loading a |
| # page with fuzzed headers, which would be raised as a ChromeDriver |
| # exception. Otherwise, we'll do a simple check and make sure the page |
| # body is correct and Chrome isn't displaying some kind of error page. |
| body = t.ExecuteJavascriptStatement('document.body.innerHTML') |
| self.assertEqual(body, STATIC_RESPONSE_BODY) |
| except Exception as e: |
| print 'Response headers: ' + headers |
| print 'URL: ' + url |
| raise e |
| |
| if __name__ == '__main__': |
| IntegrationTest.RunAllTests() |