Simpleserver update.

This commit is contained in:
Ethan Dalool 2020-01-19 21:19:45 -08:00
parent 6d3f196214
commit 0965384c7f

View file

@ -1,4 +1,8 @@
import argparse
import cgi
import http.cookies
import http.server import http.server
import math
import mimetypes import mimetypes
import os import os
import pathlib import pathlib
@ -32,9 +36,55 @@ OPENDIR_TEMPLATE = '''
</html> </html>
''' '''
PASSWORD_PROMPT_HTML = '''
<html>
<body>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<style type="text/css">Body {{font-family:Consolas}}</style>
<form action="/password" method="post">
<input type="text" name="password" placeholder="password"/>
<input type="hidden" name="goto" value="{goto}"/>
<input type="submit" value="Submit"/>
</form>
</body>
</html>
'''
ROOT_DIRECTORY = pathclass.Path(os.getcwd()) ROOT_DIRECTORY = pathclass.Path(os.getcwd())
class RequestHandler(http.server.BaseHTTPRequestHandler): class RequestHandler(http.server.BaseHTTPRequestHandler):
def __init__(self, *args, passw=None, accepted_tokens=None, **kwargs):
self.accepted_tokens = accepted_tokens
self.password = passw
super().__init__(*args, **kwargs)
def check_password(self, attempt):
if self.password is None:
return True
if attempt == self.password:
return True
return False
def check_has_password(self):
if self.password is None:
return True
if self.headers.get('password', None) == self.password:
return True
if self.headers.get('Cookie'):
cookie = http.cookies.SimpleCookie()
cookie.load(self.headers.get('Cookie'))
token = cookie.get('token')
if token and token.value in self.accepted_tokens:
return True
return False
def write(self, data): def write(self, data):
if isinstance(data, str): if isinstance(data, str):
data = data.encode('utf-8') data = data.encode('utf-8')
@ -46,6 +96,12 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
self.wfile.write(data) self.wfile.write(data)
def do_GET(self): def do_GET(self):
if not self.check_has_password():
self.send_response(401)
self.end_headers()
self.write(PASSWORD_PROMPT_HTML.format(goto=self.path))
return
path = url_to_path(self.path) path = url_to_path(self.path)
if self.send_path_validation_error(path): if self.send_path_validation_error(path):
return return
@ -108,7 +164,9 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
elif self.path.endswith('.zip'): elif self.path.endswith('.zip'):
path = url_to_path(self.path.rsplit('.zip', 1)[0]) path = url_to_path(self.path.rsplit('.zip', 1)[0])
headers['Content-type'] = 'application/octet-stream' headers['Content-type'] = 'application/octet-stream'
headers['Content-Disposition'] = f'attachment; filename*=UTF-8\'\'{path.basename}.zip' download_as = urllib.parse.quote(path.basename)
download_as += '.zip'
headers['Content-Disposition'] = f'attachment; filename*=UTF-8\'\'{download_as}'
response = zip_directory(path) response = zip_directory(path)
response = iter(response) response = iter(response)
# response = (print(chunk) or chunk for chunk in response) # response = (print(chunk) or chunk for chunk in response)
@ -124,9 +182,13 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
self.write(response) self.write(response)
return
def do_HEAD(self): def do_HEAD(self):
if not self.check_has_password():
self.send_response(401)
self.end_headers()
return
path = url_to_path(self.path) path = url_to_path(self.path)
if self.send_path_validation_error(path): if self.send_path_validation_error(path):
return return
@ -145,6 +207,34 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
def do_POST(self):
ctype, pdict = cgi.parse_header(self.headers.get('content-type'))
if ctype == 'multipart/form-data':
form = cgi.parse_multipart(self.rfile, pdict)
elif ctype == 'application/x-www-form-urlencoded':
length = int(self.headers.get('content-length'))
form = urllib.parse.parse_qs(self.rfile.read(length), keep_blank_values=1)
else:
form = {}
if self.path == '/password':
attempt = form.get(b'password')[0].decode('utf-8')
goto = form.get(b'goto')[0].decode('utf-8')
if self.check_password(attempt):
cookie = http.cookies.SimpleCookie()
token = random_hex(32)
cookie['token'] = token
self.accepted_tokens.add(token)
self.send_response(302)
self.send_header('Set-Cookie', cookie.output(header='', sep=''))
self.send_header('Location', goto)
else:
self.send_response(401)
else:
self.send_response(400)
self.end_headers()
def send_path_validation_error(self, path): def send_path_validation_error(self, path):
if not allowed(path): if not allowed(path):
self.send_error(403, 'Stop that!') self.send_error(403, 'Stop that!')
@ -152,12 +242,6 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
return False return False
# class ThreadedServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
# '''
# Thanks root and twasbrillig http://stackoverflow.com/a/14089457
# '''
# pass
def allowed(path): def allowed(path):
return path == ROOT_DIRECTORY or path in ROOT_DIRECTORY return path == ROOT_DIRECTORY or path in ROOT_DIRECTORY
@ -229,10 +313,10 @@ def generate_opendir(path):
table_rows.append(entry) table_rows.append(entry)
shaded = not shaded shaded = not shaded
# if len(items) > 0: if len(items) > 0:
# entry = table_row(path.replace_extension('.zip'), display_name='zip', shaded=shaded) entry = table_row(path.replace_extension('.zip'), display_name='zip', shaded=shaded)
# shaded = not shaded shaded = not shaded
# table_rows.append(entry) table_rows.append(entry)
table_rows = '\n'.join(table_rows) table_rows = '\n'.join(table_rows)
text = OPENDIR_TEMPLATE.format(table_rows=table_rows) text = OPENDIR_TEMPLATE.format(table_rows=table_rows)
@ -245,6 +329,12 @@ def generate_random_filename(original_filename='', length=8):
identifier = '{:x}'.format(bits).rjust(length, '0') identifier = '{:x}'.format(bits).rjust(length, '0')
return identifier return identifier
def random_hex(length=12):
randbytes = os.urandom(math.ceil(length / 2))
token = ''.join('{:02x}'.format(x) for x in randbytes)
token = token[:length]
return token
def read_filebytes(path, range_min=None, range_max=None): def read_filebytes(path, range_min=None, range_max=None):
#print(path) #print(path)
if range_min is None: if range_min is None:
@ -312,12 +402,16 @@ def zip_directory(path):
return zipfile return zipfile
def main(): def RRR(password=None):
try: accepted_tokens = set()
port = int(sys.argv[1]) def R(*args, **kwargs):
except IndexError: handler = RequestHandler(passw=password, accepted_tokens=accepted_tokens, *args, **kwargs)
port = 40000 return handler
server = http.server.ThreadingHTTPServer(('', port), RequestHandler)
return R
def simpleserver(port, password=None):
server = http.server.ThreadingHTTPServer(('', port), RRR(password=password))
print(f'server starting on {port}') print(f'server starting on {port}')
try: try:
server.serve_forever() server.serve_forever()
@ -330,5 +424,21 @@ def main():
print('really goodbye.') print('really goodbye.')
return 0 return 0
def simpleserver_argparse(args):
return simpleserver(
port=args.port,
password=args.password,
)
def main(argv):
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('port', nargs='?', type=int, default=40000)
parser.add_argument('--password', dest='password', default=None)
parser.set_defaults(func=simpleserver_argparse)
args = parser.parse_args(argv)
return args.func(args)
if __name__ == '__main__': if __name__ == '__main__':
main() raise SystemExit(main(sys.argv[1:]))