| import os, json |
| from urllib.parse import parse_qsl, SplitResult, urlencode, urlsplit, urlunsplit |
| |
| from wptserve.utils import isomorphic_decode, isomorphic_encode |
| |
| def get_template(template_basename): |
| script_directory = os.path.dirname(os.path.abspath(isomorphic_decode(__file__))) |
| template_directory = os.path.abspath(os.path.join(script_directory, |
| u"template")) |
| template_filename = os.path.join(template_directory, template_basename) |
| |
| with open(template_filename, "r") as f: |
| return f.read() |
| |
| |
| def redirect(url, response): |
| response.add_required_headers = False |
| response.writer.write_status(301) |
| response.writer.write_header(b"access-control-allow-origin", b"*") |
| response.writer.write_header(b"location", isomorphic_encode(url)) |
| response.writer.end_headers() |
| response.writer.write(u"") |
| |
| |
| # TODO(kristijanburnik): subdomain_prefix is a hardcoded value aligned with |
| # referrer-policy-test-case.js. The prefix should be configured in one place. |
| def __get_swapped_origin_netloc(netloc, subdomain_prefix = u"www1."): |
| if netloc.startswith(subdomain_prefix): |
| return netloc[len(subdomain_prefix):] |
| else: |
| return subdomain_prefix + netloc |
| |
| |
| # Creates a URL (typically a redirect target URL) that is the same as the |
| # current request URL `request.url`, except for: |
| # - When `swap_scheme` or `swap_origin` is True, its scheme/origin is changed |
| # to the other one. (http <-> https, ws <-> wss, etc.) |
| # - For `downgrade`, we redirect to a URL that would be successfully loaded |
| # if and only if upgrade-insecure-request is applied. |
| # - `query_parameter_to_remove` parameter is removed from query part. |
| # Its default is "redirection" to avoid redirect loops. |
| def create_url(request, |
| swap_scheme=False, |
| swap_origin=False, |
| downgrade=False, |
| query_parameter_to_remove=u"redirection"): |
| parsed = urlsplit(request.url) |
| destination_netloc = parsed.netloc |
| |
| scheme = parsed.scheme |
| if swap_scheme: |
| scheme = u"http" if parsed.scheme == u"https" else u"https" |
| hostname = parsed.netloc.split(u':')[0] |
| port = request.server.config[u"ports"][scheme][0] |
| destination_netloc = u":".join([hostname, str(port)]) |
| |
| if downgrade: |
| # These rely on some unintuitive cleverness due to WPT's test setup: |
| # 'Upgrade-Insecure-Requests' does not upgrade the port number, |
| # so we use URLs in the form `http://[domain]:[https-port]`, |
| # which will be upgraded to `https://[domain]:[https-port]`. |
| # If the upgrade fails, the load will fail, as we don't serve HTTP over |
| # the secure port. |
| if parsed.scheme == u"https": |
| scheme = u"http" |
| elif parsed.scheme == u"wss": |
| scheme = u"ws" |
| else: |
| raise ValueError(u"Downgrade redirection: Invalid scheme '%s'" % |
| parsed.scheme) |
| hostname = parsed.netloc.split(u':')[0] |
| port = request.server.config[u"ports"][parsed.scheme][0] |
| destination_netloc = u":".join([hostname, str(port)]) |
| |
| if swap_origin: |
| destination_netloc = __get_swapped_origin_netloc(destination_netloc) |
| |
| parsed_query = parse_qsl(parsed.query, keep_blank_values=True) |
| parsed_query = [x for x in parsed_query if x[0] != query_parameter_to_remove] |
| |
| destination_url = urlunsplit(SplitResult( |
| scheme = scheme, |
| netloc = destination_netloc, |
| path = parsed.path, |
| query = urlencode(parsed_query), |
| fragment = None)) |
| |
| return destination_url |
| |
| |
| def preprocess_redirection(request, response): |
| if b"redirection" not in request.GET: |
| return False |
| |
| redirection = request.GET[b"redirection"] |
| |
| if redirection == b"no-redirect": |
| return False |
| elif redirection == b"keep-scheme": |
| redirect_url = create_url(request, swap_scheme=False) |
| elif redirection == b"swap-scheme": |
| redirect_url = create_url(request, swap_scheme=True) |
| elif redirection == b"downgrade": |
| redirect_url = create_url(request, downgrade=True) |
| elif redirection == b"keep-origin": |
| redirect_url = create_url(request, swap_origin=False) |
| elif redirection == b"swap-origin": |
| redirect_url = create_url(request, swap_origin=True) |
| else: |
| raise ValueError(u"Invalid redirection type '%s'" % isomorphic_decode(redirection)) |
| |
| redirect(redirect_url, response) |
| return True |
| |
| |
| def preprocess_stash_action(request, response): |
| if b"action" not in request.GET: |
| return False |
| |
| action = request.GET[b"action"] |
| |
| key = request.GET[b"key"] |
| stash = request.server.stash |
| path = request.GET[b"path"] if b"path" in request.GET \ |
| else isomorphic_encode(request.url.split(u'?')[0]) |
| |
| if action == b"put": |
| value = isomorphic_decode(request.GET[b"value"]) |
| stash.take(key=key, path=path) |
| stash.put(key=key, value=value, path=path) |
| response_data = json.dumps({u"status": u"success", u"result": isomorphic_decode(key)}) |
| elif action == b"purge": |
| value = stash.take(key=key, path=path) |
| return False |
| elif action == b"take": |
| value = stash.take(key=key, path=path) |
| if value is None: |
| status = u"allowed" |
| else: |
| status = u"blocked" |
| response_data = json.dumps({u"status": status, u"result": value}) |
| else: |
| return False |
| |
| response.add_required_headers = False |
| response.writer.write_status(200) |
| response.writer.write_header(b"content-type", b"text/javascript") |
| response.writer.write_header(b"cache-control", b"no-cache; must-revalidate") |
| response.writer.end_headers() |
| response.writer.write(response_data) |
| return True |
| |
| |
| def __noop(request, response): |
| return u"" |
| |
| |
| def respond(request, |
| response, |
| status_code = 200, |
| content_type = b"text/html", |
| payload_generator = __noop, |
| cache_control = b"no-cache; must-revalidate", |
| access_control_allow_origin = b"*", |
| maybe_additional_headers = None): |
| if preprocess_redirection(request, response): |
| return |
| |
| if preprocess_stash_action(request, response): |
| return |
| |
| response.add_required_headers = False |
| response.writer.write_status(status_code) |
| |
| if access_control_allow_origin != None: |
| response.writer.write_header(b"access-control-allow-origin", |
| access_control_allow_origin) |
| response.writer.write_header(b"content-type", content_type) |
| response.writer.write_header(b"cache-control", cache_control) |
| |
| additional_headers = maybe_additional_headers or {} |
| for header, value in additional_headers.items(): |
| response.writer.write_header(header, value) |
| |
| response.writer.end_headers() |
| |
| new_headers = {} |
| new_val = [] |
| for key, val in request.headers.items(): |
| if len(val) == 1: |
| new_val = isomorphic_decode(val[0]) |
| else: |
| new_val = [isomorphic_decode(x) for x in val] |
| new_headers[isomorphic_decode(key)] = new_val |
| |
| server_data = {u"headers": json.dumps(new_headers, indent = 4)} |
| |
| payload = payload_generator(server_data) |
| response.writer.write(payload) |