blob: f11f3250568b662118d658388a2e4725357d7fd4 [file] [log] [blame]
import os, json, urllib, urlparse
def get_template(template_basename):
script_directory = os.path.dirname(os.path.abspath(__file__))
template_directory = os.path.abspath(os.path.join(script_directory,
"template"))
template_filename = os.path.join(template_directory, template_basename);
with open(template_filename, "r") as f:
return f.read()
def redirect(url, response):
response.add_required_headers = False
response.writer.write_status(301)
response.writer.write_header("access-control-allow-origin", "*")
response.writer.write_header("location", url)
response.writer.end_headers()
response.writer.write("")
# TODO(kristijanburnik): subdomain_prefix is a hardcoded value aligned with
# referrer-policy-test-case.js. The prefix should be configured in one place.
def __get_swapped_origin_netloc(netloc, subdomain_prefix = "www1."):
if netloc.startswith(subdomain_prefix):
return netloc[len(subdomain_prefix):]
else:
return subdomain_prefix + netloc
# Creates a URL (typically a redirect target URL) that is the same as the
# current request URL `request.url`, except for:
# - When `swap_scheme` or `swap_origin` is True, its scheme/origin is changed
# to the other one. (http <-> https, ws <-> wss, etc.)
# - `query_parameter_to_remove` parameter is removed from query part.
# Its default is "redirection" to avoid redirect loops.
def create_url(request, swap_scheme = False, swap_origin = False,
query_parameter_to_remove = "redirection"):
parsed = urlparse.urlsplit(request.url)
destination_netloc = parsed.netloc
scheme = parsed.scheme
if swap_scheme:
scheme = "http" if parsed.scheme == "https" else "https"
hostname = parsed.netloc.split(':')[0]
port = request.server.config["ports"][scheme][0]
destination_netloc = ":".join([hostname, str(port)])
if swap_origin:
destination_netloc = __get_swapped_origin_netloc(destination_netloc)
parsed_query = urlparse.parse_qsl(parsed.query, keep_blank_values=True)
parsed_query = filter(lambda x: x[0] != query_parameter_to_remove,
parsed_query)
destination_url = urlparse.urlunsplit(urlparse.SplitResult(
scheme = scheme,
netloc = destination_netloc,
path = parsed.path,
query = urllib.urlencode(parsed_query),
fragment = None))
return destination_url
def preprocess_redirection(request, response):
if "redirection" not in request.GET:
return False
redirection = request.GET["redirection"]
if redirection == "no-redirect":
return False
elif redirection == "keep-scheme-redirect":
redirect_url = create_url(request, swap_scheme=False)
elif redirection == "swap-scheme-redirect":
redirect_url = create_url(request, swap_scheme=True)
elif redirection == "keep-origin-redirect":
redirect_url = create_url(request, swap_origin=False)
elif redirection == "swap-origin-redirect":
redirect_url = create_url(request, swap_origin=True)
else:
raise ValueError("Invalid redirection type '%s'" % redirection)
redirect(redirect_url, response)
return True
def preprocess_stash_action(request, response):
if "action" not in request.GET:
return False
action = request.GET["action"]
key = request.GET["key"]
stash = request.server.stash
path = request.GET.get("path", request.url.split('?'))[0]
if action == "put":
value = request.GET["value"]
stash.take(key=key, path=path)
stash.put(key=key, value=value, path=path)
response_data = json.dumps({"status": "success", "result": key})
elif action == "purge":
value = stash.take(key=key, path=path)
return False
elif action == "take":
value = stash.take(key=key, path=path)
if value is None:
status = "allowed"
else:
status = "blocked"
response_data = json.dumps({"status": status, "result": value})
else:
return False
response.add_required_headers = False
response.writer.write_status(200)
response.writer.write_header("content-type", "text/javascript")
response.writer.write_header("cache-control", "no-cache; must-revalidate")
response.writer.end_headers()
response.writer.write(response_data)
return True
def __noop(request, response):
return ""
def respond(request,
response,
status_code = 200,
content_type = "text/html",
payload_generator = __noop,
cache_control = "no-cache; must-revalidate",
access_control_allow_origin = "*",
maybe_additional_headers = None):
if preprocess_redirection(request, response):
return
if preprocess_stash_action(request, response):
return
response.add_required_headers = False
response.writer.write_status(status_code)
if access_control_allow_origin != None:
response.writer.write_header("access-control-allow-origin",
access_control_allow_origin)
response.writer.write_header("content-type", content_type)
response.writer.write_header("cache-control", cache_control)
additional_headers = maybe_additional_headers or {}
for header, value in additional_headers.items():
response.writer.write_header(header, value)
response.writer.end_headers()
server_data = {"headers": json.dumps(request.headers, indent = 4)}
payload = payload_generator(server_data)
response.writer.write(payload)