Update to the simple server.

This commit is contained in:
Ethan Dalool 2019-06-11 22:29:46 -07:00
parent 0c561c05c9
commit f1f42ba39a

View file

@ -1,12 +1,14 @@
import http.server import http.server
import mimetypes import mimetypes
import os import os
import urllib.parse
import pathlib import pathlib
import random import random
import socketserver import socketserver
import sys import sys
import threading
import types import types
import urllib.parse
import zipstream
# pip install voussoirkit # pip install voussoirkit
from voussoirkit import bytestring from voussoirkit import bytestring
@ -16,86 +18,21 @@ from voussoirkit import ratelimiter
FILE_READ_CHUNK = bytestring.MIBIBYTE FILE_READ_CHUNK = bytestring.MIBIBYTE
RATELIMITER = ratelimiter.Ratelimiter(16 * bytestring.MIBIBYTE) RATELIMITER = ratelimiter.Ratelimiter(16 * bytestring.MIBIBYTE)
# The paths which the user may access.
# Attempting to access anything outside will 403.
# These are convered to Path objects after that class definition.
OKAY_PATHS = set(['files', 'favicon.ico'])
OPENDIR_TEMPLATE = ''' OPENDIR_TEMPLATE = '''
<html> <html>
<body> <body>
<meta charset="UTF-8"> <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<style type="text/css">Body {{font-family:Consolas}}</style> <style type="text/css">Body {{font-family:Consolas}}</style>
<table style="width: 95%"> <table style="width: 100%">
{entries} {table_rows}
</table> </table>
</body> </body>
</html> </html>
''' '''
class Path(pathclass.Path): ROOT_DIRECTORY = pathclass.Path(os.getcwd())
'''
Add some server-specific abilities to the Pathclass
'''
def __init__(self, path):
path = urllib.parse.unquote(path)
path = path.strip('/')
pathclass.Path.__init__(self, path)
@property
def allowed(self):
return any(self in okay for okay in OKAY_PATHS)
def anchor(self, display_name=None):
self.correct_case()
if display_name is None:
display_name = self.basename
if self.is_dir:
# Folder emoji
icon = '\U0001F4C1'
else:
# Diamond emoji, because there's not one for files.
icon = '\U0001F48E'
#print('anchor', path)
if display_name.endswith('.placeholder'):
a = '<a>{icon} {display}</a>'
else:
a = '<a href="{full}">{icon} {display}</a>'
a = a.format(
full=self.url_path,
icon=icon,
display=display_name,
)
return a
def table_row(self, display_name=None, shaded=False):
form = '<tr style="background-color:#{bg}"><td style="width:90%">{anchor}</td><td>{size}</td></tr>'
size = self.size
if size is None:
size = ''
else:
size = bytestring.bytestring(size)
bg = 'ddd' if shaded else 'fff';
row = form.format(
bg=bg,
anchor=self.anchor(display_name=display_name),
size=size,
)
return row
@property
def url_path(self):
url = self.relative_path
url = url.replace(os.sep, '/')
url = '/' + url
url = urllib.parse.quote(url)
return url
OKAY_PATHS = set(Path(p) for p in OKAY_PATHS)
class RequestHandler(http.server.BaseHTTPRequestHandler): class RequestHandler(http.server.BaseHTTPRequestHandler):
def write(self, data): def write(self, data):
@ -108,45 +45,8 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
else: else:
self.wfile.write(data) self.wfile.write(data)
def read_filebytes(self, path, range_min=None, range_max=None):
#print(path)
if path.is_file:
if range_min is None:
range_min = 0
if range_max is None:
range_max = path.size
range_span = range_max - range_min
#print('read span', range_min, range_max, range_span)
f = open(path.absolute_path, 'rb')
f.seek(range_min)
sent_amount = 0
while sent_amount < range_span:
chunk = f.read(FILE_READ_CHUNK)
if len(chunk) == 0:
break
yield chunk
sent_amount += len(chunk)
#print('I read', len(fr))
f.close()
elif path.is_dir:
text = generate_opendir(path)
text = text.encode('utf-8')
yield text
else:
self.send_error(404)
yield bytes()
def do_GET(self): def do_GET(self):
#print(dir(self)) path = url_to_path(self.path)
path = Path(self.path)
if self.send_path_validation_error(path): if self.send_path_validation_error(path):
return return
@ -196,22 +96,38 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
headers['Content-length'] = content_length headers['Content-length'] = content_length
mime = mimetypes.guess_type(path.absolute_path)[0] if path.is_file:
if mime is not None: headers['Content-type'] = mimetypes.guess_type(path.absolute_path)[0]
#print(mime) response = read_filebytes(path, range_min=range_min, range_max=range_max)
headers['Content-type'] = mime
elif path.is_dir:
headers['Content-type'] = 'text/html'
response = generate_opendir(path)
response = response.encode('utf-8')
elif self.path.endswith('.zip'):
path = url_to_path(self.path.rsplit('.zip', 1)[0])
headers['Content-type'] = 'application/octet-stream'
headers['Content-Disposition'] = f'attachment; filename*=UTF-8\'\'{path.basename}.zip'
response = zip_directory(path)
response = iter(response)
# response = (print(chunk) or chunk for chunk in response)
else:
status_code = 404
self.send_error(status_code)
response = bytes()
self.send_response(status_code) self.send_response(status_code)
for (key, value) in headers.items(): for (key, value) in headers.items():
self.send_header(key, value) self.send_header(key, value)
d = self.read_filebytes(path, range_min=range_min, range_max=range_max)
#print('write')
self.end_headers() self.end_headers()
self.write(d) self.write(response)
return
def do_HEAD(self): def do_HEAD(self):
path = Path(self.path) path = url_to_path(self.path)
if self.send_path_validation_error(path): if self.send_path_validation_error(path):
return return
@ -230,55 +146,90 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
def send_path_validation_error(self, path): def send_path_validation_error(self, path):
if not path.allowed: if not allowed(path):
self.send_error(403, 'Stop that!') self.send_error(403, 'Stop that!')
return True return True
return False return False
class ThreadedServer(socketserver.ThreadingMixIn, http.server.HTTPServer): # class ThreadedServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
''' # '''
Thanks root and twasbrillig http://stackoverflow.com/a/14089457 # Thanks root and twasbrillig http://stackoverflow.com/a/14089457
''' # '''
pass # pass
def allowed(path):
return path in ROOT_DIRECTORY
def anchor(path, display_name=None):
path.correct_case()
if display_name is None:
display_name = path.basename
if path.is_dir:
# Folder emoji
icon = '\U0001F4C1'
else:
# Diamond emoji
#icon = '\U0001F48E'
icon = '\U0001F381'
#print('anchor', path)
if display_name.endswith('.placeholder'):
a = '<a>{icon} {display}</a>'
else:
a = '<a href="{full}">{icon} {display}</a>'
a = a.format(
full=path_to_url(path),
icon=icon,
display=display_name,
)
return a
def generate_opendir(path): def generate_opendir(path):
#print('Listdir:', path) #print('Listdir:', path)
items = os.listdir(path.absolute_path)
items = [os.path.join(path.absolute_path, f) for f in items]
#print(items)
# This places directories above files, each ordered alphabetically # This places directories above files, each ordered alphabetically
items.sort(key=str.lower) try:
items = path.listdir()
except FileNotFoundError:
items = []
directories = [] directories = []
files = [] files = []
for item in items:
if os.path.isdir(item): for item in sorted(items, key=lambda p: p.basename.lower()):
if item.is_dir:
directories.append(item) directories.append(item)
else: else:
files.append(item) files.append(item)
items = directories + files items = directories + files
items = [Path(f) for f in items] table_rows = []
entries = []
if any(path.absolute_path == okay.absolute_path for okay in OKAY_PATHS): shaded = False
if path.absolute_path == ROOT_DIRECTORY.absolute_path:
# This is different than a permission check, we're seeing if they're # This is different than a permission check, we're seeing if they're
# actually at the top, in which case they don't need an up button. # actually at the top, in which case they don't need an up button.
pass pass
else: else:
entry = path.parent.table_row(display_name='up') entry = table_row(path.parent, display_name='up', shaded=shaded)
entries.append(entry) table_rows.append(entry)
shaded = True
for item in items:
entry = item.table_row(shaded=shaded)
entries.append(entry)
shaded = not shaded shaded = not shaded
entries = '\n'.join(entries) for item in items:
text = OPENDIR_TEMPLATE.format(entries=entries) entry = table_row(item, shaded=shaded)
table_rows.append(entry)
shaded = not shaded
if len(items) > 0:
entry = table_row(path.replace_extension('.zip'), display_name='zip', shaded=shaded)
shaded = not shaded
table_rows.append(entry)
table_rows = '\n'.join(table_rows)
text = OPENDIR_TEMPLATE.format(table_rows=table_rows)
return text return text
def generate_random_filename(original_filename='', length=8): def generate_random_filename(original_filename='', length=8):
@ -288,10 +239,87 @@ def generate_random_filename(original_filename='', length=8):
identifier = '{:x}'.format(bits).rjust(length, '0') identifier = '{:x}'.format(bits).rjust(length, '0')
return identifier return identifier
def read_filebytes(path, range_min=None, range_max=None):
#print(path)
if range_min is None:
range_min = 0
if range_max is None:
range_max = path.size
range_span = range_max - range_min
#print('read span', range_min, range_max, range_span)
f = open(path.absolute_path, 'rb')
f.seek(range_min)
sent_amount = 0
while sent_amount < range_span:
chunk = f.read(FILE_READ_CHUNK)
if len(chunk) == 0:
break
yield chunk
sent_amount += len(chunk)
#print('I read', len(fr))
f.close()
def table_row(path, display_name=None, shaded=False):
form = '<tr style="background-color:#{bg}"><td style="">{anchor}</td><td>{size}</td></tr>'
size = path.size
if size is None:
size = ''
else:
size = bytestring.bytestring(size)
bg = 'ddd' if shaded else 'fff';
row = form.format(
bg=bg,
anchor=anchor(path, display_name=display_name),
size=size,
)
return row
def path_to_url(path):
url = path.relative_path[2:]
url = url.replace(os.sep, '/')
url = '/' + url
url = urllib.parse.quote(url)
return url
def url_to_path(path):
path = urllib.parse.unquote(path)
path = path.strip('/')
return pathclass.Path(path)
def zip_directory(path):
zipfile = zipstream.ZipFile(mode='w', compression=zipstream.ZIP_STORED)
for item in path.walk():
if item.is_dir:
continue
arcname = item.relative_to(path).lstrip('.' + os.sep)
zipfile.write(filename=item.absolute_path, arcname=arcname)
return zipfile
def main(): def main():
server = ThreadedServer(('', int(sys.argv[1] or 32768)), RequestHandler) try:
print('server starting') port = int(sys.argv[1])
server.serve_forever() except IndexError:
port = 40000
server = http.server.ThreadingHTTPServer(('', port), RequestHandler)
print(f'server starting on {port}')
try:
server.serve_forever()
except KeyboardInterrupt:
print('goodbye.')
t = threading.Thread(target=server.shutdown)
t.daemon = True
t.start()
server.shutdown()
print('really goodbye.')
return 0
if __name__ == '__main__': if __name__ == '__main__':
main() main()