blob: b22dadada3beb8b9c689c4ac645431d6e8aba4ee [file] [log] [blame]
#!/usr/bin/env python
import datetime
import json
import time
from base64 import b64decode
NOTEHDRS = set(['content-type', 'access-control-allow-origin', 'last-modified', 'etag'])
NOBODYSTATUS = set([204, 304])
LOCATIONHDRS = set(['location', 'content-location'])
DATEHDRS = set(['date', 'expires', 'last-modified'])
def main(request, response):
dispatch = request.GET.first("dispatch", None)
uuid = request.GET.first("uuid", None)
if not uuid:
response.status = (404, "Not Found")
response.headers.set("Content-Type", "text/plain")
return "UUID not found"
if dispatch == 'test':
return handle_test(uuid, request, response)
elif dispatch == 'state':
return handle_state(uuid, request, response)
response.status = (404, "Not Found")
response.headers.set("Content-Type", "text/plain")
return "Fallthrough"
def handle_state(uuid, request, response):
response.headers.set("Content-Type", "text/plain")
return json.dumps(request.server.stash.take(uuid))
def handle_test(uuid, request, response):
server_state = request.server.stash.take(uuid) or []
try:
requests = json.loads(b64decode(request.headers.get('Test-Requests', "")))
except:
response.status = (400, "Bad Request")
response.headers.set("Content-Type", "text/plain")
return "No or bad Test-Requests request header"
config = requests[len(server_state)]
if not config:
response.status = (404, "Not Found")
response.headers.set("Content-Type", "text/plain")
return "Config not found"
noted_headers = {}
now = time.time()
for header in config.get('response_headers', []):
if header[0].lower() in LOCATIONHDRS: # magic locations
header[1] = "%s&target=%s" % (request.url, header[1])
if header[0].lower() in DATEHDRS and isinstance(header[1], int): # magic dates
header[1] = http_date(now, header[1])
response.headers.set(header[0], header[1])
if header[0].lower() in NOTEHDRS:
noted_headers[header[0].lower()] = header[1]
state = {
'now': now,
'request_method': request.method,
'request_headers': dict([[h.lower(), request.headers[h]] for h in request.headers]),
'response_headers': noted_headers
}
server_state.append(state)
request.server.stash.put(uuid, server_state)
if "access-control-allow-origin" not in noted_headers:
response.headers.set("Access-Control-Allow-Origin", "*")
if "content-type" not in noted_headers:
response.headers.set("Content-Type", "text/plain")
response.headers.set("Server-Request-Count", len(server_state))
code, phrase = config.get("response_status", [200, "OK"])
if config.get("expected_type", "").endswith('validated'):
ref_hdrs = server_state[0]['response_headers']
previous_lm = ref_hdrs.get('last-modified', False)
if previous_lm and request.headers.get("If-Modified-Since", False) == previous_lm:
code, phrase = [304, "Not Modified"]
previous_etag = ref_hdrs.get('etag', False)
if previous_etag and request.headers.get("If-None-Match", False) == previous_etag:
code, phrase = [304, "Not Modified"]
if code != 304:
code, phrase = [999, '304 Not Generated']
response.status = (code, phrase)
content = config.get("response_body", uuid)
if code in NOBODYSTATUS:
return ""
return content
def get_header(headers, header_name):
result = None
for header in headers:
if header[0].lower() == header_name.lower():
result = header[1]
return result
WEEKDAYS = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
MONTHS = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul',
'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
def http_date(now, delta_secs=0):
date = datetime.datetime.utcfromtimestamp(now + delta_secs)
return "%s, %.2d %s %.4d %.2d:%.2d:%.2d GMT" % (
WEEKDAYS[date.weekday()],
date.day,
MONTHS[date.month],
date.year,
date.hour,
date.minute,
date.second)