import http.server import mimetypes import os import urllib.parse import pathlib import random import socketserver import sys import types # pip install voussoirkit from voussoirkit import bytestring from voussoirkit import pathclass from voussoirkit import ratelimiter FILE_READ_CHUNK = bytestring.MIBIBYTE RATELIMITER = ratelimiter.Ratelimiter(16 * bytestring.MIBIBYTE) # The paths which the user may access. # Attempting to access anything outside will 403. # These are convered to Path objects after that class definition. OKAY_PATHS = set(['files', 'favicon.ico']) OPENDIR_TEMPLATE = ''' {entries}
''' class Path(pathclass.Path): ''' Add some server-specific abilities to the Pathclass ''' def __init__(self, path): path = urllib.parse.unquote(path) path = path.strip('/') pathclass.Path.__init__(self, path) @property def allowed(self): return any(self in okay for okay in OKAY_PATHS) def anchor(self, display_name=None): self.correct_case() if display_name is None: display_name = self.basename if self.is_dir: # Folder emoji icon = '\U0001F4C1' else: # Diamond emoji, because there's not one for files. icon = '\U0001F48E' #print('anchor', path) if display_name.endswith('.placeholder'): a = '{icon} {display}' else: a = '{icon} {display}' a = a.format( full=self.url_path, icon=icon, display=display_name, ) return a def table_row(self, display_name=None, shaded=False): form = '{anchor}{size}' size = self.size if size is None: size = '' else: size = bytestring.bytestring(size) bg = 'ddd' if shaded else 'fff'; row = form.format( bg=bg, anchor=self.anchor(display_name=display_name), size=size, ) return row @property def url_path(self): url = self.relative_path url = url.replace(os.sep, '/') url = '/' + url url = urllib.parse.quote(url) return url OKAY_PATHS = set(Path(p) for p in OKAY_PATHS) class RequestHandler(http.server.BaseHTTPRequestHandler): def write(self, data): if isinstance(data, str): data = data.encode('utf-8') if isinstance(data, types.GeneratorType): for chunk in data: self.wfile.write(chunk) RATELIMITER.limit(len(chunk)) else: self.wfile.write(data) def read_filebytes(self, path, range_min=None, range_max=None): #print(path) if path.is_file: if range_min is None: range_min = 0 if range_max is None: range_max = path.size range_span = range_max - range_min #print('read span', range_min, range_max, range_span) f = open(path.absolute_path, 'rb') f.seek(range_min) sent_amount = 0 while sent_amount < range_span: chunk = f.read(FILE_READ_CHUNK) if len(chunk) == 0: break yield chunk sent_amount += len(chunk) #print('I read', len(fr)) f.close() elif path.is_dir: text = generate_opendir(path) text = text.encode('utf-8') yield text else: self.send_error(404) yield bytes() def do_GET(self): #print(dir(self)) path = Path(self.path) if self.send_path_validation_error(path): return range_min = None range_max = None status_code = 200 headers = {} if path.is_file: file_size = path.size if 'range' in self.headers: desired_range = self.headers['range'] desired_range = desired_range.lower() desired_range = desired_range.split('bytes=')[-1] helper = lambda x: int(x) if x and x.isdigit() else None if '-' in desired_range: (desired_min, desired_max) = desired_range.split('-') #print('desire', desired_min, desired_max) range_min = helper(desired_min) range_max = helper(desired_max) else: range_min = helper(desired_range) if range_min is None: range_min = 0 if range_max is None: range_max = file_size # because ranges are 0 indexed range_max = min(range_max, file_size - 1) range_min = max(range_min, 0) status_code = 206 range_header = 'bytes {min}-{max}/{outof}'.format( min=range_min, max=range_max, outof=file_size, ) headers['Content-Range'] = range_header headers['Accept-Ranges'] = 'bytes' content_length = (range_max - range_min) + 1 else: content_length = file_size headers['Content-length'] = content_length mime = mimetypes.guess_type(path.absolute_path)[0] if mime is not None: #print(mime) headers['Content-type'] = mime self.send_response(status_code) for (key, value) in headers.items(): self.send_header(key, value) d = self.read_filebytes(path, range_min=range_min, range_max=range_max) #print('write') self.end_headers() self.write(d) def do_HEAD(self): path = Path(self.path) if self.send_path_validation_error(path): return status_code = 200 self.send_response(status_code) if path.is_dir: mime = 'text/html' else: mime = mimetypes.guess_type(path.absolute_path)[0] self.send_header('Content-length', path.size) if mime is not None: self.send_header('Content-type', mime) self.end_headers() def send_path_validation_error(self, path): if not path.allowed: self.send_error(403, 'Stop that!') return True return False class ThreadedServer(socketserver.ThreadingMixIn, http.server.HTTPServer): ''' Thanks root and twasbrillig http://stackoverflow.com/a/14089457 ''' pass def generate_opendir(path): #print('Listdir:', path) items = os.listdir(path.absolute_path) items = [os.path.join(path.absolute_path, f) for f in items] #print(items) # This places directories above files, each ordered alphabetically items.sort(key=str.lower) directories = [] files = [] for item in items: if os.path.isdir(item): directories.append(item) else: files.append(item) items = directories + files items = [Path(f) for f in items] entries = [] if any(path.absolute_path == okay.absolute_path for okay in OKAY_PATHS): # This is different than a permission check, we're seeing if they're # actually at the top, in which case they don't need an up button. pass else: entry = path.parent.table_row(display_name='up') entries.append(entry) shaded = True for item in items: entry = item.table_row(shaded=shaded) entries.append(entry) shaded = not shaded entries = '\n'.join(entries) text = OPENDIR_TEMPLATE.format(entries=entries) return text def generate_random_filename(original_filename='', length=8): import random bits = length * 44 bits = random.getrandbits(bits) identifier = '{:x}'.format(bits).rjust(length, '0') return identifier def main(): server = ThreadedServer(('', int(sys.argv[1] or 32768)), RequestHandler) print('server starting') server.serve_forever() if __name__ == '__main__': main()