import argparse import base64 import cgi import http.cookies import http.server import math import mimetypes import os import sys import types import urllib.parse import zipstream from voussoirkit import betterhelp from voussoirkit import bytestring from voussoirkit import gentools from voussoirkit import passwordy from voussoirkit import pathclass from voussoirkit import ratelimiter CHUNK_SIZE = bytestring.MIBIBYTE OPENDIR_TEMPLATE = ''' {title} {table_rows}
''' PASSWORD_PROMPT_HTML = '''
''' ROOT_DIRECTORY = pathclass.Path(os.getcwd()) HIDDEN_FILENAMES = {'thumbs.db', 'desktop.ini', '$recycle.bin', 'system volume information'} TOKEN_COOKIE_NAME = 'simpleserver_token' # SERVER ########################################################################################### class RequestHandler(http.server.BaseHTTPRequestHandler): def __init__(self, request, client_info, server, individual_ratelimit): self.individual_ratelimit = ratelimiter.Ratelimiter(individual_ratelimit) super().__init__(request, client_info, server) @property def auth_cookie(self): cookie = self.headers.get('Cookie') if not cookie: return None cookie = http.cookies.SimpleCookie(cookie) token = cookie.get(TOKEN_COOKIE_NAME) if not token: return None return token @property def auth_header(self): authorization = self.headers.get('Authorization') if not authorization: return None (auth_type, authorization) = authorization.split(' ', 1) if auth_type != 'Basic': return None authorization = base64.b64decode(authorization).decode() (username, password) = authorization.split(':', 1) return password @property def remote_addr(self): return self.request.getpeername()[0] def check_password(self, attempt): if self.server.password is None: return True if attempt == self.server.password: return True return False def check_has_password(self): if self.server.password is None: return True if self.auth_header == self.server.password: return True if self.server.accepted_tokens is not None and self.auth_cookie in self.server.accepted_tokens: return True if self.server.accepted_ips is not None and self.remote_addr in self.server.accepted_ips: return True return False def write(self, data): if isinstance(data, str): data = data.encode('utf-8') if isinstance(data, bytes): databytes = data data = (databytes[i*CHUNK_SIZE:(i+1)*CHUNK_SIZE] for i in range(math.ceil(len(data)/CHUNK_SIZE))) for chunk in data: self.wfile.write(chunk) chunksize = len(chunk) self.server.overall_ratelimit.limit(chunksize) self.individual_ratelimit.limit(chunksize) def do_GET(self): if not self.check_has_password(): self.send_response(401) self.end_headers() self.write(PASSWORD_PROMPT_HTML.format(goto=self.path)) return path = url_to_path(self.path) if self.send_path_validation_error(path): return range_min = None range_max = None status_code = 200 headers = {} if path.is_file: file_size = path.size if 'range' in self.headers: desired_range = self.headers['range'] desired_range = desired_range.lower() desired_range = desired_range.split('bytes=')[-1] helper = lambda x: int(x) if x and x.isdigit() else None if '-' in desired_range: (desired_min, desired_max) = desired_range.split('-') range_min = helper(desired_min) range_max = helper(desired_max) else: range_min = helper(desired_range) if range_min is None: range_min = 0 if range_max is None: range_max = file_size # because ranges are 0 indexed range_max = min(range_max, file_size - 1) range_min = max(range_min, 0) status_code = 206 range_header = 'bytes {min}-{max}/{outof}'.format( min=range_min, max=range_max, outof=file_size, ) headers['Content-Range'] = range_header headers['Accept-Ranges'] = 'bytes' content_length = (range_max - range_min) + 1 else: content_length = file_size headers['Content-length'] = content_length if path.is_file: headers['Content-type'] = mimetypes.guess_type(path.absolute_path)[0] response = read_filebytes(path, range_min=range_min, range_max=range_max) elif path.is_dir: headers['Content-type'] = 'text/html' response = generate_opendir(path, enable_zip=self.server.enable_zip) response = response.encode('utf-8') elif self.path.endswith('.zip') and self.server.enable_zip: path = url_to_path(self.path.rsplit('.zip', 1)[0]) headers['Content-type'] = 'application/octet-stream' download_as = urllib.parse.quote(path.basename) download_as += '.zip' headers['Content-Disposition'] = f'attachment; filename*=UTF-8\'\'{download_as}' response = zip_directory(path) response = iter(response) # response = (print(chunk) or chunk for chunk in response) else: status_code = 404 self.send_error(status_code) response = bytes() self.send_response(status_code) for (key, value) in headers.items(): self.send_header(key, value) self.end_headers() self.write(response) def do_HEAD(self): if not self.check_has_password(): self.send_response(401) self.end_headers() return path = url_to_path(self.path) if self.send_path_validation_error(path): return status_code = 200 self.send_response(status_code) if path.is_dir: mime = 'text/html' else: mime = mimetypes.guess_type(path.absolute_path)[0] self.send_header('Content-length', path.size) if mime is not None: self.send_header('Content-type', mime) self.end_headers() def do_POST(self): (ctype, pdict) = cgi.parse_header(self.headers.get('content-type')) if ctype == 'multipart/form-data': form = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers.get('content-length')) form = urllib.parse.parse_qs(self.rfile.read(length), keep_blank_values=1) else: form = {} if self.path == '/password': attempt = form.get(b'password')[0].decode('utf-8') goto = form.get(b'goto')[0].decode('utf-8') if self.check_password(attempt): self.send_response(302) if self.server.accepted_tokens is not None: cookie = http.cookies.SimpleCookie() token = passwordy.random_hex(32) cookie[TOKEN_COOKIE_NAME] = token self.server.accepted_tokens.add(token) self.send_header('Set-Cookie', cookie.output(header='', sep='')) if self.server.accepted_ips is not None: self.server.accepted_ips.add(self.remote_addr) self.send_header('Location', goto) else: self.send_response(401) elif not self.check_has_password(): self.send_response(401) self.end_headers() return else: self.send_response(400) self.end_headers() def send_path_validation_error(self, path): if not allowed(path): self.send_error(403, 'Stop that!') return True return False class SimpleServer: def __init__( self, port, *, password, authorize_by_ip, enable_zip, overall_ratelimit, individual_ratelimit, ): self.port = port self.password = password self.authorize_by_ip = authorize_by_ip self.enable_zip = enable_zip self.overall_ratelimit = ratelimiter.Ratelimiter(overall_ratelimit) self.individual_ratelimit = individual_ratelimit if authorize_by_ip: self.accepted_ips = set() self.accepted_tokens = None else: self.accepted_tokens = set() self.accepted_ips = None def make_request_handler(self, request, client_info, _server): # We ignore the given _server and use self instead because _server will # be the ThreadingHTTPServer instance. return RequestHandler(request, client_info, self, individual_ratelimit=self.individual_ratelimit) def start(self): server = http.server.ThreadingHTTPServer(('0.0.0.0', self.port), self.make_request_handler) print(f'Server starting on {self.port}, pid={os.getpid()}.') try: server.serve_forever() except KeyboardInterrupt: print('Goodbye.') server.shutdown() # HELPERS ########################################################################################## def allowed(path): return path == ROOT_DIRECTORY or path in ROOT_DIRECTORY def atag(path, display_name=None): if display_name is None: display_name = path.basename if path.is_dir: # Folder emoji icon = '\U0001F4C1' else: # Gift emoji icon = '\U0001F381' if display_name.endswith('.placeholder'): a = '{icon} {display}' else: a = '{icon} {display}' a = a.format( full=path_to_url(path), icon=icon, display=display_name, ) return a def generate_opendir(path, enable_zip): try: path.correct_case() items = path.listdir() except FileNotFoundError: items = [] # This places directories above files, each ordered alphabetically directories = [] files = [] for item in sorted(items, key=lambda p: p.basename.lower()): if item.basename.lower() in HIDDEN_FILENAMES: continue if item.is_dir: directories.append(item) else: files.append(item) items = directories + files table_rows = [] shaded = False is_root = path.absolute_path == ROOT_DIRECTORY.absolute_path if is_root: # This is different than a permission check, we're seeing if they're # actually at the top, in which case they don't need an up button. entry = table_row(path, display_name='.', shaded=shaded) table_rows.append(entry) shaded = not shaded else: entry = table_row(path.parent, display_name='up', shaded=shaded) table_rows.append(entry) shaded = not shaded for item in items: entry = table_row(item, shaded=shaded) table_rows.append(entry) shaded = not shaded if len(items) > 0 and enable_zip: entry = table_row(path.replace_extension('.zip'), display_name='zip', shaded=shaded) shaded = not shaded table_rows.append(entry) table_rows = '\n'.join(table_rows) title = '/' if is_root else path.basename text = OPENDIR_TEMPLATE.format(title=title, table_rows=table_rows) return text def read_filebytes(path, range_min=None, range_max=None): if range_min is None: range_min = 0 if range_max is None: range_max = path.size range_span = range_max - range_min f = path.open('rb') f.seek(range_min) sent_amount = 0 while sent_amount < range_span: chunk = f.read(CHUNK_SIZE) if len(chunk) == 0: break yield chunk sent_amount += len(chunk) f.close() def table_row(path, display_name=None, shaded=False): form = ''' {anchor} {size} '''.replace('\n', ' ') if path.is_file: size = bytestring.bytestring(path.size) else: size = '' bg = 'ddd' if shaded else 'fff' anchor = atag(path, display_name=display_name) row = form.format( bg=bg, anchor=anchor, size=size, ) return row def path_to_url(path): url = path.relative_to(ROOT_DIRECTORY, simple=True) url = url.replace(os.sep, '/') url = '/' + url url = urllib.parse.quote(url) return url def url_to_path(path): path = urllib.parse.unquote(path) path = path.strip('/') return pathclass.Path(path) def zip_directory(path): zipfile = zipstream.ZipFile(mode='w', compression=zipstream.ZIP_STORED) for item in path.walk(): if item.is_dir: continue arcname = item.relative_to(path).lstrip('.' + os.sep) zipfile.write(filename=item.absolute_path, arcname=arcname) return zipfile # COMMAND LINE ##################################################################################### DOCSTRING = ''' simpleserver ============ Run a simple file server from your computer. > simpleserver port port: Port number, an integer. flags: --password X: A password string. The user will be prompted to enter it before proceeding to any URL. A token is stored in a cookie unless authorize_by_ip is used. --authorize_by_ip: After the user enters the password, their entire IP becomes authorized for all future requests. This reduces security, because a single IP can be home to many different people, but increases convenience, because the user can use download managers / scripts where adding auth is not convenient. --enable_zip: Add a 'zip' link to every directory and allow the user to download the entire directory as a zip file. --overall_ratelimit X: An integer number of bytes, the maximum bytes/sec of the server overall. --individual_ratelimit X: An integer number of bytes, the maximum bytes/sec for any single request. ''' def simpleserver_argparse(args): server = SimpleServer( port=args.port, password=args.password, authorize_by_ip=args.authorize_by_ip, enable_zip=args.enable_zip, overall_ratelimit=args.overall_ratelimit, individual_ratelimit=args.individual_ratelimit, ) server.start() def main(argv): parser = argparse.ArgumentParser(description=DOCSTRING) parser.add_argument('port', nargs='?', type=int, default=40000) parser.add_argument('--password', dest='password', default=None) parser.add_argument('--authorize_by_ip', '--authorize-by-ip', dest='authorize_by_ip', action='store_true') parser.add_argument('--enable_zip', '--enable-zip', dest='enable_zip', action='store_true') parser.add_argument('--overall_ratelimit', '--overall-ratelimit', type=bytestring.parsebytes, default=20*bytestring.MIBIBYTE) parser.add_argument('--individual_ratelimit', '--individual-ratelimit', type=bytestring.parsebytes, default=10*bytestring.MIBIBYTE) parser.set_defaults(func=simpleserver_argparse) return betterhelp.single_main(argv, parser, DOCSTRING) if __name__ == '__main__': raise SystemExit(main(sys.argv[1:]))