253 lines
9.3 KiB
Python
253 lines
9.3 KiB
Python
import flask; from flask import request
|
|
import functools
|
|
import gzip
|
|
import io
|
|
import json
|
|
import time
|
|
import werkzeug.wrappers
|
|
|
|
from voussoirkit import bytestring
|
|
from voussoirkit import dotdict
|
|
from voussoirkit import passwordy
|
|
from voussoirkit import sentinel
|
|
|
|
GZIP_MINIMUM_SIZE = 500 * bytestring.BYTE
|
|
GZIP_MAXIMUM_SIZE = 5 * bytestring.MIBIBYTE
|
|
GZIP_LEVEL = 3
|
|
|
|
REQUEST_TYPES = (flask.Request, werkzeug.wrappers.Request, werkzeug.local.LocalProxy)
|
|
RESPONSE_TYPES = (flask.Response, werkzeug.wrappers.Response)
|
|
|
|
NOT_CACHED = sentinel.Sentinel('not cached', truthyness=False)
|
|
|
|
def cached_endpoint(max_age):
|
|
'''
|
|
The cached_endpoint decorator can be used on slow endpoints that don't need
|
|
to be constantly updated or endpoints that produce large, static responses.
|
|
|
|
WARNING: The return value of the endpoint is shared with all users.
|
|
You should never use this cache on an endpoint that provides private
|
|
or personalized data, and you should not try to pass other headers through
|
|
the response.
|
|
|
|
When the function is run, its return value is stored and a random etag is
|
|
generated so that subsequent runs can respond with 304. This way, large
|
|
response bodies do not need to be transmitted often.
|
|
|
|
Given a nonzero max_age, the endpoint will only be run once per max_age
|
|
seconds on a global basis (not per-user). This way, you can prevent a slow
|
|
function from being run very often. In-between requests will just receive
|
|
the previous return value (still using 200 or 304 as appropriate for the
|
|
client's provided etag).
|
|
|
|
With max_age=0, the function will be run every time to check if the value
|
|
has changed, but if it hasn't changed then we can still send a 304 response,
|
|
saving bandwidth.
|
|
|
|
An example use case would be large-sized data dumps that don't need to be
|
|
precisely up to date every time.
|
|
'''
|
|
if max_age < 0:
|
|
raise ValueError(f'max_age should be positive, not {max_age}.')
|
|
|
|
state = dotdict.DotDict({
|
|
'max_age': max_age,
|
|
'stored_value': NOT_CACHED,
|
|
'stored_etag': None,
|
|
'headers': {'ETag': None, 'Cache-Control': f'max-age={max_age}'},
|
|
'last_run': 0,
|
|
})
|
|
|
|
def wrapper(function):
|
|
def get_value(*args, **kwargs):
|
|
can_bail = (
|
|
state.stored_value is not NOT_CACHED and
|
|
state.max_age != 0 and
|
|
(time.time() - state.last_run) < state.max_age
|
|
)
|
|
if can_bail:
|
|
return state.stored_value
|
|
|
|
value = function(*args, **kwargs)
|
|
if isinstance(value, flask.Response):
|
|
if value.headers.get('Content-Type'):
|
|
state.headers['Content-Type'] = value.headers.get('Content-Type')
|
|
value = value.response
|
|
|
|
if value != state.stored_value:
|
|
state.stored_value = value
|
|
state.stored_etag = passwordy.random_hex(20)
|
|
state.headers['ETag'] = state.stored_etag
|
|
|
|
state.last_run = time.time()
|
|
return value
|
|
|
|
@functools.wraps(function)
|
|
def wrapped(*args, **kwargs):
|
|
value = get_value(*args, **kwargs)
|
|
|
|
client_etag = request.headers.get('If-None-Match', None)
|
|
if client_etag == state.stored_etag:
|
|
response = flask.Response(status=304, headers=state.headers)
|
|
else:
|
|
response = flask.Response(value, status=200, headers=state.headers)
|
|
|
|
return response
|
|
return wrapped
|
|
return wrapper
|
|
|
|
def decorate_and_route(flask_app, decorators):
|
|
'''
|
|
Flask provides decorators for before_request and after_request, but not for
|
|
wrapping the whole request. Sometimes I want to wrap the whole request,
|
|
either to catch exceptions (which don't get passed through after_request)
|
|
or to maintain some state before running the function and adding it to the
|
|
response after.
|
|
|
|
Instead of pasting my decorators onto every single endpoint and forgetting
|
|
to keep up with them in the future, we can just hijack the decorator I know
|
|
every endpoint will have: route.
|
|
|
|
You should set:
|
|
flask_app.route = flasktools.decorate_and_route(flask_app, decorators[...])
|
|
|
|
So every time your route something, it will also get the other decorators.
|
|
'''
|
|
old_route = flask_app.route
|
|
@functools.wraps(old_route)
|
|
def new_route(*route_args, **route_kwargs):
|
|
def wrapper(endpoint):
|
|
# Since a single endpoint function can have multiple route
|
|
# decorators on it, we might see the same function come through
|
|
# here multiple times. We'll only do the user's decorators once.
|
|
if not hasattr(endpoint, '_fully_decorated'):
|
|
for decorator in decorators:
|
|
endpoint = decorator(endpoint)
|
|
endpoint._fully_decorated = True
|
|
|
|
endpoint = old_route(*route_args, **route_kwargs)(endpoint)
|
|
return endpoint
|
|
return wrapper
|
|
return new_route
|
|
|
|
def ensure_response_type(function):
|
|
@functools.wraps(function)
|
|
def wrapped(*args, **kwargs):
|
|
response = function(*args, **kwargs)
|
|
if not isinstance(response, RESPONSE_TYPES):
|
|
response = flask.Response(response)
|
|
return response
|
|
return wrapped
|
|
|
|
def give_theme_cookie(function, *, cookie_name, default_theme):
|
|
'''
|
|
This decorator is one component of a theming system, where the user gets a
|
|
CSS stylesheet based on the value of their theme cookie.
|
|
|
|
Add this decorator to your endpoint. Then, use request.cookies.get to
|
|
check what theme they want. This decorator will inject the cookie before
|
|
your function runs. Add the appropriate stylesheet to your response HTML.
|
|
|
|
The user can change their theme by adding ?theme=name to the end of any URL
|
|
which uses this decorator.
|
|
'''
|
|
@functools.wraps(function)
|
|
def wrapped(*args, **kwargs):
|
|
old_theme = request.cookies.get(cookie_name, None)
|
|
new_theme = request.args.get('theme', None)
|
|
theme = new_theme or old_theme or default_theme
|
|
|
|
# The original data structure for request.cookies is immutable and we
|
|
# must turn it into this multidict.
|
|
request.cookies = werkzeug.datastructures.MultiDict(request.cookies)
|
|
# By injecting the cookie here, we allow the endpoint function to check
|
|
# request.cookies even if the client didn't actually have one when they
|
|
# started the request.
|
|
request.cookies[cookie_name] = theme
|
|
|
|
response = function(*args, **kwargs)
|
|
|
|
if new_theme is None:
|
|
pass
|
|
elif new_theme == '':
|
|
response.set_cookie(cookie_name, value='', expires=0)
|
|
elif new_theme != old_theme:
|
|
response.set_cookie(cookie_name, value=new_theme, expires=2147483647)
|
|
|
|
return response
|
|
return wrapped
|
|
|
|
def gzip_response(request, response):
|
|
if response.direct_passthrough:
|
|
return response
|
|
|
|
accept_encoding = request.headers.get('Accept-Encoding', '')
|
|
if 'gzip' not in accept_encoding.lower():
|
|
return response
|
|
|
|
if 'Content-Encoding' in response.headers:
|
|
return response
|
|
|
|
content_type = response.headers.get('Content-Type', '')
|
|
if not (content_type.startswith('application/json') or content_type.startswith('text/')):
|
|
return response
|
|
|
|
if response.status_code < 200:
|
|
return response
|
|
|
|
if response.status_code >= 300:
|
|
return response
|
|
|
|
content_length = response.headers.get('Content-Length', None)
|
|
if content_length is not None and int(content_length) > GZIP_MAXIMUM_SIZE:
|
|
return response
|
|
|
|
if content_length is not None and int(content_length) < GZIP_MINIMUM_SIZE:
|
|
return response
|
|
|
|
gzip_buffer = io.BytesIO()
|
|
gzip_file = gzip.GzipFile(mode='wb', compresslevel=GZIP_LEVEL, fileobj=gzip_buffer)
|
|
gzip_file.write(response.get_data())
|
|
gzip_file.close()
|
|
response.set_data(gzip_buffer.getvalue())
|
|
response.headers['Content-Encoding'] = 'gzip'
|
|
response.headers['Content-Length'] = len(response.get_data())
|
|
|
|
return response
|
|
|
|
def json_response(j, *args, **kwargs):
|
|
dumped = json.dumps(j)
|
|
response = flask.Response(dumped, *args, **kwargs)
|
|
response.headers['Content-Type'] = 'application/json;charset=utf-8'
|
|
return response
|
|
|
|
make_json_response = json_response
|
|
|
|
def required_fields(fields, forbid_whitespace=False):
|
|
'''
|
|
Declare that the endpoint requires certain POST body fields. Without them,
|
|
we respond with 400 and a message.
|
|
|
|
forbid_whitespace:
|
|
If True, then providing the field is not good enough. It must also
|
|
contain at least some non-whitespace characters.
|
|
'''
|
|
def wrapper(function):
|
|
@functools.wraps(function)
|
|
def wrapped(*args, **kwargs):
|
|
for requirement in fields:
|
|
missing = (
|
|
requirement not in request.form or
|
|
(forbid_whitespace and request.form[requirement].strip() == '')
|
|
)
|
|
if missing:
|
|
response = {
|
|
'type': 'error',
|
|
'error_type': 'MISSING_FIELDS',
|
|
'error_message': 'Required fields: %s' % ', '.join(fields),
|
|
}
|
|
response = json_response(response, status=400)
|
|
return response
|
|
return function(*args, **kwargs)
|
|
return wrapped
|
|
return wrapper
|