commit b4b51e3604df35a15cf08c5a99052b89d3e8ff73 Author: Ethan Dalool Date: Mon Dec 17 22:10:00 2018 -0800 Initial migratory commit. diff --git a/README.md b/README.md new file mode 100644 index 0000000..e15bb92 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +voussoirkit +=========== + +This is a collection of tools that I use often and import into my other projects. diff --git a/phase2.py b/phase2.py new file mode 100644 index 0000000..77db5ec --- /dev/null +++ b/phase2.py @@ -0,0 +1,11 @@ +import shutil +import os + +def delete(folder): + try: + shutil.rmtree(folder) + except: + pass + +delete('dist') +delete('voussoirkit.egg-info') diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..79df38c --- /dev/null +++ b/setup.py @@ -0,0 +1,12 @@ +import setuptools + +setuptools.setup( + name='voussoirkit', + packages=setuptools.find_packages(), + version='0.0.26', + author='voussoir', + author_email='ethan@voussoir.net', + description='voussoir\'s toolkit', + url='https://github.com/voussoir/voussoirkit', + install_requires=['pyperclip'] +) diff --git a/voussoirkit.bat b/voussoirkit.bat new file mode 100644 index 0000000..44acee3 --- /dev/null +++ b/voussoirkit.bat @@ -0,0 +1,3 @@ +rem py setup.py register -r https://upload.pypi.org/legacy/ +py setup.py sdist upload -r https://upload.pypi.org/legacy/ +phase2 diff --git a/voussoirkit/__init__.py b/voussoirkit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/voussoirkit/basenumber.py b/voussoirkit/basenumber.py new file mode 100644 index 0000000..814087c --- /dev/null +++ b/voussoirkit/basenumber.py @@ -0,0 +1,81 @@ +import string + +ALPHABET = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' + +def from_base(number, base, alphabet=None): + if base < 2: + raise ValueError('base must be >= 2.') + if not isinstance(base, int): + raise TypeError('base must be an int.') + + if base == 10: + return int(number) + + if alphabet is None: + alphabet = ALPHABET + number = str(number) + alphabet = alphabet[:base] + + if number.count('.') > 1: + raise ValueError('Too many decimal points') + + mixed_case = any(c in string.ascii_uppercase for c in alphabet) and \ + any(c in string.ascii_lowercase for c in alphabet) + if not mixed_case: + alphabet = alphabet.upper() + number = number.upper() + + char_set = set(number.replace('.', '', 1)) + alpha_set = set(alphabet) + differences = char_set.difference(alpha_set) + if len(differences) > 0: + raise ValueError('Unknown characters for base', base, differences) + alpha_dict = {character:index for (index, character) in enumerate(alphabet)} + + try: + decimal_pos = number.index('.') + except ValueError: + decimal_pos = len(number) + + result = 0 + for (index, character) in enumerate(number): + if index == decimal_pos: + continue + power = (decimal_pos - index) + if index < decimal_pos: + power -= 1 + value = alpha_dict[character] * (base ** power) + #print(value) + result += value + return result + +def to_base(number, base, decimal_places=10, alphabet=None): + if base < 2: + raise ValueError('base must be >= 2.') + if not isinstance(base, int): + raise TypeError('base must be an int.') + + if base == 10: + return str(number) + + if alphabet is None: + alphabet = ALPHABET + + if base > len(alphabet): + raise ValueError('Not enough symbols in alphabet for base %d' % base) + + result = '' + whole_portion = int(number) + float_portion = number - whole_portion + while whole_portion > 0: + (whole_portion, remainder) = divmod(whole_portion, base) + result = alphabet[remainder] + result + if float_portion != 0: + result += '.' + for x in range(decimal_places): + float_portion *= base + whole = int(float_portion) + float_portion -= whole + result += alphabet[whole] + + return result diff --git a/voussoirkit/bytestring.py b/voussoirkit/bytestring.py new file mode 100644 index 0000000..04ed511 --- /dev/null +++ b/voussoirkit/bytestring.py @@ -0,0 +1,141 @@ +import re +import sys + +from voussoirkit import clipext + + +__VERSION__ = '0.0.1' + +BYTE = 1 +KIBIBYTE = 1024 * BYTE +MIBIBYTE = 1024 * KIBIBYTE +GIBIBYTE = 1024 * MIBIBYTE +TEBIBYTE = 1024 * GIBIBYTE +PEBIBYTE = 1024 * TEBIBYTE +EXIBYTE = 1024 * PEBIBYTE +ZEBIBYTE = 1024 * EXIBYTE +YOBIBYTE = 1024 * ZEBIBYTE + +BYTE_STRING = 'b' +KIBIBYTE_STRING = 'KiB' +MIBIBYTE_STRING = 'MiB' +GIBIBYTE_STRING = 'GiB' +TEBIBYTE_STRING = 'TiB' +PEBIBYTE_STRING = 'PiB' +EXIBYTE_STRING = 'EiB' +ZEBIBYTE_STRING = 'ZiB' +YOBIBYTE_STRING = 'YiB' + +UNIT_STRINGS = { + BYTE: BYTE_STRING, + KIBIBYTE: KIBIBYTE_STRING, + MIBIBYTE: MIBIBYTE_STRING, + GIBIBYTE: GIBIBYTE_STRING, + TEBIBYTE: TEBIBYTE_STRING, + PEBIBYTE: PEBIBYTE_STRING, + EXIBYTE: EXIBYTE_STRING, + ZEBIBYTE: ZEBIBYTE_STRING, + YOBIBYTE: YOBIBYTE_STRING, +} +REVERSED_UNIT_STRINGS = {value: key for (key, value) in UNIT_STRINGS.items()} +UNIT_SIZES = sorted(UNIT_STRINGS.keys(), reverse=True) + + +def bytestring(size, decimal_places=3, force_unit=None): + ''' + Convert a number into string. + + force_unit: + If None, an appropriate size unit is chosen automatically. + Otherwise, you can provide one of the size constants to force that divisor. + ''' + if force_unit is None: + divisor = get_appropriate_divisor(size) + else: + if isinstance(force_unit, str): + force_unit = normalize_unit_string(force_unit) + force_unit = REVERSED_UNIT_STRINGS[force_unit] + divisor = force_unit + + size_unit_string = UNIT_STRINGS[divisor] + + size_string = '{number:.0{decimal_places}f} {unit}' + size_string = size_string.format( + decimal_places=decimal_places, + number=size/divisor, + unit=size_unit_string, + ) + return size_string + +def get_appropriate_divisor(size): + ''' + Return the divisor that would be appropriate for displaying this byte size. + For example: + 1000 => 1 to display 1,000 b + 1024 => 1024 to display 1 KiB + 123456789 => 1048576 to display 117.738 MiB + ''' + size = abs(size) + for unit in UNIT_SIZES: + if size >= unit: + appropriate_unit = unit + break + else: + appropriate_unit = 1 + return appropriate_unit + +def normalize_unit_string(string): + ''' + Given a string "k" or "kb" or "kib" in any case, return "KiB", etc. + ''' + string = string.lower() + for (size, unit_string) in UNIT_STRINGS.items(): + unit_string_l = unit_string.lower() + if string in (unit_string_l, unit_string_l[0], unit_string_l.replace('i', '')): + return unit_string + raise ValueError('Unrecognized unit string "%s"' % string) + +def parsebytes(string): + ''' + Given a string like "100 kib", return the appropriate integer value. + Accepts "k", "kb", "kib" in any casing. + ''' + string = string.lower().strip() + string = string.replace(' ', '').replace(',', '') + + matches = re.findall('((\\.|-|\\d)+)', string) + if len(matches) == 0: + raise ValueError('No numbers found') + if len(matches) > 1: + raise ValueError('Too many numbers found') + byte_value = matches[0][0] + + if not string.startswith(byte_value): + raise ValueError('Number is not at start of string') + + + # if the string has no text besides the number, just return that int. + string = string.replace(byte_value, '') + byte_value = float(byte_value) + if string == '': + return int(byte_value) + + unit_string = normalize_unit_string(string) + multiplier = REVERSED_UNIT_STRINGS[unit_string] + + return int(byte_value * multiplier) + +def main(args=None): + if args is None: + args = sys.argv[1:] + + if len(args) != 1: + print('Usage: bytestring.py ') + return 1 + number = clipext.resolve(sys.argv[1]) + n = int(number) + print(bytestring(n)) + return 0 + +if __name__ == '__main__': + sys.exit(main(sys.argv[1:])) diff --git a/voussoirkit/cacheclass.py b/voussoirkit/cacheclass.py new file mode 100644 index 0000000..bde8de0 --- /dev/null +++ b/voussoirkit/cacheclass.py @@ -0,0 +1,43 @@ +import collections + +class Cache: + def __init__(self, maxlen): + self.maxlen = maxlen + self.cache = collections.OrderedDict() + + def __contains__(self, key): + return key in self.cache + + def __getitem__(self, key): + value = self.cache.pop(key) + self.cache[key] = value + return value + + def __len__(self): + return len(self.cache) + + def __setitem__(self, key, value): + try: + self.cache.pop(key) + except KeyError: + if len(self.cache) >= self.maxlen: + self.cache.popitem(last=False) + self.cache[key] = value + + def clear(self): + self.cache.clear() + + def get(self, key, fallback=None): + try: + return self[key] + except KeyError: + return fallback + + def pop(self, key): + return self.cache.pop(key) + + def remove(self, key): + try: + self.pop(key) + except KeyError: + pass diff --git a/voussoirkit/clipext.py b/voussoirkit/clipext.py new file mode 100644 index 0000000..1e5161b --- /dev/null +++ b/voussoirkit/clipext.py @@ -0,0 +1,44 @@ +import pyperclip + +CLIPBOARD_STRINGS = ['!c', '!clip', '!clipboard'] +INPUT_STRINGS = ['!i', '!in', '!input', '!stdin'] +EOF = '\x1a' + +def _input_lines(): + while True: + try: + additional = input() + except EOFError: + # If you enter nothing but ctrl-z + additional = EOF + + additional = additional.split(EOF) + has_eof = len(additional) > 1 + additional = additional[0] + + yield additional + + if has_eof: + break + +def multi_line_input(split_lines=False): + generator = _input_lines() + if split_lines: + return generator + else: + return '\n'.join(generator) + +def resolve(arg, split_lines=False): + lowered = arg.lower() + if lowered in INPUT_STRINGS: + return multi_line_input(split_lines=split_lines) + elif lowered in CLIPBOARD_STRINGS: + text = pyperclip.paste() + else: + text = arg + + if split_lines: + lines = text.splitlines() + return lines + else: + return text diff --git a/voussoirkit/downloady.py b/voussoirkit/downloady.py new file mode 100644 index 0000000..b5bbe50 --- /dev/null +++ b/voussoirkit/downloady.py @@ -0,0 +1,468 @@ +import argparse +import os +import pyperclip +import requests +import sys +import time +import urllib +import warnings + +# pip install voussoirkit +from voussoirkit import bytestring +from voussoirkit import ratelimiter +from voussoirkit import clipext + +warnings.simplefilter('ignore') + +HEADERS = { +'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.152 Safari/537.36' +} + +FILENAME_BADCHARS = '*?"<>|\r\n' + +last_request = 0 +CHUNKSIZE = 4 * bytestring.KIBIBYTE +TIMEOUT = 60 +TEMP_EXTENSION = '.downloadytemp' + +PRINT_LIMITER = ratelimiter.Ratelimiter(allowance=5, mode='reject') + +class NotEnoughBytes(Exception): + pass + +def download_file( + url, + localname=None, + auth=None, + bytespersecond=None, + callback_progress=None, + do_head=True, + headers=None, + overwrite=False, + raise_for_undersized=True, + timeout=None, + verbose=False, + verify_ssl=True, + ): + headers = headers or {} + + url = sanitize_url(url) + if localname in [None, '']: + localname = basename_from_url(url) + if os.path.isdir(localname): + localname = os.path.join(localname, basename_from_url(url)) + localname = sanitize_filename(localname) + if localname != os.devnull: + localname = os.path.abspath(localname) + + if verbose: + safeprint(' URL:', url) + safeprint('File:', localname) + + plan = prepare_plan( + url, + localname, + auth=auth, + bytespersecond=bytespersecond, + callback_progress=callback_progress, + do_head=do_head, + headers=headers, + overwrite=overwrite, + raise_for_undersized=raise_for_undersized, + timeout=timeout, + verify_ssl=verify_ssl, + ) + #print(plan) + if plan is None: + return + + return download_plan(plan) + +def download_plan(plan): + localname = plan['download_into'] + directory = os.path.split(localname)[0] + if directory != '': + os.makedirs(directory, exist_ok=True) + touch(localname) + file_handle = open(localname, 'r+b') + file_handle.seek(plan['seek_to']) + + if plan['header_range_min'] is not None: + plan['headers']['range'] = 'bytes={min}-{max}'.format( + min=plan['header_range_min'], + max=plan['header_range_max'], + ) + + if plan['plan_type'] == 'resume': + bytes_downloaded = plan['seek_to'] + elif plan['plan_type'] == 'partial': + bytes_downloaded = plan['seek_to'] + else: + bytes_downloaded = 0 + + download_stream = request( + 'get', + plan['url'], + stream=True, + auth=plan['auth'], + headers=plan['headers'], + timeout=plan['timeout'], + verify_ssl=plan['verify_ssl'], + ) + + if plan['remote_total_bytes'] is None: + # Since we didn't do a head, let's fill this in now. + plan['remote_total_bytes'] = int(download_stream.headers.get('Content-Length', 0)) + + callback_progress = plan['callback_progress'] + if callback_progress is not None: + callback_progress = callback_progress(plan['remote_total_bytes']) + + for chunk in download_stream.iter_content(chunk_size=CHUNKSIZE): + bytes_downloaded += len(chunk) + file_handle.write(chunk) + if callback_progress is not None: + callback_progress.step(bytes_downloaded) + + if plan['limiter'] is not None and bytes_downloaded < plan['remote_total_bytes']: + plan['limiter'].limit(len(chunk)) + + file_handle.close() + + # Don't try to rename /dev/null + if os.devnull not in [localname, plan['real_localname']]: + localsize = os.path.getsize(localname) + undersized = plan['plan_type'] != 'partial' and localsize < plan['remote_total_bytes'] + if plan['raise_for_undersized'] and undersized: + message = 'File does not contain expected number of bytes. Received {size} / {total}' + message = message.format(size=localsize, total=plan['remote_total_bytes']) + raise NotEnoughBytes(message) + + if localname != plan['real_localname']: + os.rename(localname, plan['real_localname']) + + return plan['real_localname'] + +def prepare_plan( + url, + localname, + auth=None, + bytespersecond=None, + callback_progress=None, + do_head=True, + headers=None, + overwrite=False, + raise_for_undersized=True, + timeout=TIMEOUT, + verify_ssl=True, + ): + # Chapter 1: File existence + headers = headers or {} + user_provided_range = 'range' in headers + real_localname = localname + temp_localname = localname + TEMP_EXTENSION + real_exists = os.path.exists(real_localname) + + if real_exists and overwrite is False and not user_provided_range: + print('File exists and overwrite is off. Nothing to do.') + return None + temp_exists = os.path.exists(temp_localname) + real_localsize = int(real_exists and os.path.getsize(real_localname)) + temp_localsize = int(temp_exists and os.path.getsize(temp_localname)) + + # Chapter 2: Ratelimiting + if bytespersecond is None: + limiter = None + elif isinstance(bytespersecond, ratelimiter.Ratelimiter): + limiter = bytespersecond + else: + limiter = ratelimiter.Ratelimiter(allowance=bytespersecond) + + # Chapter 3: Extracting range + if user_provided_range: + user_range_min = int(headers['range'].split('bytes=')[1].split('-')[0]) + user_range_max = headers['range'].split('-')[1] + if user_range_max != '': + user_range_max = int(user_range_max) + else: + user_range_min = None + user_range_max = None + + # Chapter 4: Server range support + # Always include a range on the first request to figure out whether the + # server supports it. Use 0- to get correct remote_total_bytes + temp_headers = headers + temp_headers.update({'range': 'bytes=0-'}) + + if do_head: + # I'm using a GET instead of an actual HEAD here because some servers respond + # differently, even though they're not supposed to. + head = request('get', url, stream=True, headers=temp_headers, auth=auth) + remote_total_bytes = int(head.headers.get('content-length', 0)) + server_respects_range = (head.status_code == 206 and 'content-range' in head.headers) + head.connection.close() + else: + remote_total_bytes = None + server_respects_range = False + + if user_provided_range and not server_respects_range: + if not do_head: + raise Exception('Cannot determine range support without the head request') + else: + raise Exception('Server did not respect your range header') + + # Chapter 5: Plan definitions + plan_base = { + 'url': url, + 'auth': auth, + 'callback_progress': callback_progress, + 'limiter': limiter, + 'headers': headers, + 'real_localname': real_localname, + 'raise_for_undersized': raise_for_undersized, + 'remote_total_bytes': remote_total_bytes, + 'timeout': timeout, + 'verify_ssl': verify_ssl, + } + plan_fulldownload = dict( + plan_base, + download_into=temp_localname, + header_range_min=None, + header_range_max=None, + plan_type='fulldownload', + seek_to=0, + ) + plan_resume = dict( + plan_base, + download_into=temp_localname, + header_range_min=temp_localsize, + header_range_max='', + plan_type='resume', + seek_to=temp_localsize, + ) + plan_partial = dict( + plan_base, + download_into=real_localname, + header_range_min=user_range_min, + header_range_max=user_range_max, + plan_type='partial', + seek_to=user_range_min, + ) + + # Chapter 6: Redeem your meal vouchers here + if real_exists: + if overwrite: + os.remove(real_localname) + + if user_provided_range: + return plan_partial + + return plan_fulldownload + + elif temp_exists and temp_localsize > 0: + if overwrite: + return plan_fulldownload + + if user_provided_range: + return plan_partial + + if server_respects_range: + print('Resume from byte %d' % plan_resume['seek_to']) + return plan_resume + + else: + if user_provided_range: + return plan_partial + + return plan_fulldownload + + raise Exception('No plan was chosen?') + + +class Progress1: + def __init__(self, total_bytes): + self.limiter = ratelimiter.Ratelimiter(allowance=8, mode='reject') + self.limiter.balance = 1 + self.total_bytes = max(1, total_bytes) + self.divisor = bytestring.get_appropriate_divisor(total_bytes) + self.total_format = bytestring.bytestring(total_bytes, force_unit=self.divisor) + self.downloaded_format = '{:>%d}' % len(self.total_format) + self.blank_char = ' ' + self.solid_char = '█' + + def step(self, bytes_downloaded): + #print(self.limiter.balance) + percent = bytes_downloaded / self.total_bytes + percent = min(1, percent) + if self.limiter.limit(1) is False and percent < 1: + return + + downloaded_string = bytestring.bytestring(bytes_downloaded, force_unit=self.divisor) + downloaded_string = self.downloaded_format.format(downloaded_string) + block_count = 50 + solid_blocks = self.solid_char * int(block_count * percent) + statusbar = solid_blocks.ljust(block_count, self.blank_char) + statusbar = self.solid_char + statusbar + self.solid_char + + end = '\n' if percent == 1 else '' + message = '\r{bytes_downloaded} {statusbar} {total_bytes}' + message = message.format( + bytes_downloaded=downloaded_string, + total_bytes=self.total_format, + statusbar=statusbar, + ) + print(message, end=end, flush=True) + + +class Progress2: + def __init__(self, total_bytes): + self.total_bytes = max(1, total_bytes) + self.limiter = ratelimiter.Ratelimiter(allowance=8, mode='reject') + self.limiter.balance = 1 + self.total_bytes_string = '{:,}'.format(self.total_bytes) + self.bytes_downloaded_string = '{:%d,}' % len(self.total_bytes_string) + + def step(self, bytes_downloaded): + percent = (bytes_downloaded * 100) / self.total_bytes + percent = min(100, percent) + if self.limiter.limit(1) is False and percent < 100: + return + + percent_string = '%08.4f' % percent + bytes_downloaded_string = self.bytes_downloaded_string.format(bytes_downloaded) + + end = '\n' if percent == 100 else '' + message = '\r{bytes_downloaded} / {total_bytes} / {percent}%' + message = message.format( + bytes_downloaded=bytes_downloaded_string, + total_bytes=self.total_bytes_string, + percent=percent_string, + ) + print(message, end=end, flush=True) + + +def basename_from_url(url): + ''' + Determine the local filename appropriate for a URL. + ''' + localname = urllib.parse.unquote(url) + localname = localname.rstrip('/') + localname = localname.split('?')[0] + localname = localname.split('/')[-1] + return localname + +def get_permission(prompt='y/n\n>', affirmative=['y', 'yes']): + permission = input(prompt) + return permission.lower() in affirmative + +def request(method, url, stream=False, headers=None, timeout=TIMEOUT, verify_ssl=True, **kwargs): + if headers is None: + headers = {} + for (key, value) in HEADERS.items(): + headers.setdefault(key, value) + session = requests.Session() + a = requests.adapters.HTTPAdapter(max_retries=30) + b = requests.adapters.HTTPAdapter(max_retries=30) + session.mount('http://', a) + session.mount('https://', b) + session.max_redirects = 40 + + method = { + 'get': session.get, + 'head': session.head, + 'post': session.post, + }[method] + req = method(url, stream=stream, headers=headers, timeout=timeout, verify=verify_ssl, **kwargs) + req.raise_for_status() + return req + +def safeprint(*texts, **kwargs): + texts = [str(text).encode('ascii', 'replace').decode() for text in texts] + print(*texts, **kwargs) + +def sanitize_filename(text, exclusions=''): + to_remove = FILENAME_BADCHARS + for exclude in exclusions: + to_remove = to_remove.replace(exclude, '') + + for char in to_remove: + text = text.replace(char, '') + + (drive, path) = os.path.splitdrive(text) + path = path.replace(':', '') + text = drive + path + + return text + +def sanitize_url(url): + url = url.replace('%3A//', '://') + return url + +def touch(filename): + f = open(filename, 'ab') + f.close() + return + + +def download_argparse(args): + url = args.url + + url = clipext.resolve(url) + callback = { + None: Progress1, + '1': Progress1, + '2': Progress2, + }.get(args.callback, args.callback) + + bytespersecond = args.bytespersecond + if bytespersecond is not None: + bytespersecond = bytestring.parsebytes(bytespersecond) + + headers = {} + if args.range is not None: + headers['range'] = 'bytes=%s' % args.range + + retry = args.retry + if not retry: + retry = 1 + + while retry != 0: + # Negative numbers permit infinite retries. + try: + download_file( + url=url, + localname=args.localname, + bytespersecond=bytespersecond, + callback_progress=callback, + do_head=args.no_head is False, + headers=headers, + overwrite=args.overwrite, + timeout=args.timeout, + verbose=True, + verify_ssl=args.no_ssl is False, + ) + except (NotEnoughBytes, requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError): + retry -= 1 + if retry == 0: + raise + else: + break + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + + parser.add_argument('url') + parser.add_argument('localname', nargs='?', default=None) + parser.add_argument('-c', '--callback', dest='callback', default=Progress1) + parser.add_argument('-bps', '--bytespersecond', dest='bytespersecond', default=None) + parser.add_argument('-ow', '--overwrite', dest='overwrite', action='store_true') + parser.add_argument('-r', '--range', dest='range', default=None) + parser.add_argument('--timeout', dest='timeout', type=int, default=TIMEOUT) + parser.add_argument('--retry', dest='retry', const=-1, nargs='?', type=int, default=1) + parser.add_argument('--no-head', dest='no_head', action='store_true') + parser.add_argument('--no-ssl', dest='no_ssl', action='store_true') + parser.set_defaults(func=download_argparse) + + args = parser.parse_args() + args.func(args) diff --git a/voussoirkit/eternalseptember.py b/voussoirkit/eternalseptember.py new file mode 100644 index 0000000..4322fa5 --- /dev/null +++ b/voussoirkit/eternalseptember.py @@ -0,0 +1,50 @@ +import datetime +import time + +EPOCH = datetime.datetime( + year=1993, + month=9, + day=1, + tzinfo=datetime.timezone.utc, +) + +def normalize_date(date): + if isinstance(date, datetime.datetime): + pass + elif isinstance(date, (int, float)): + date = datetime.datetime.utcfromtimestamp(date) + date = date.replace(tzinfo=datetime.timezone.utc) + else: + raise TypeError('Unrecognized date type.') + + return date + +def now(): + return datetime.datetime.now(datetime.timezone.utc) + +def september_day(date): + ''' + Return the ES day of the month for this date. + ''' + date = normalize_date(date) + diff = date - EPOCH + days = diff.days + 1 + return days + +def september_string(date, strftime): + ''' + Return the ES formatted string for this date. + ''' + date = normalize_date(date) + day = str(september_day(date)) + + strftime = strftime.replace('%a', date.strftime('%a')) + strftime = strftime.replace('%A', date.strftime('%A')) + strftime = strftime.replace('%d', day) + strftime = strftime.replace('%-d', day) + + date = date.replace(month=EPOCH.month, year=EPOCH.year) + return date.strftime(strftime) + +if __name__ == '__main__': + print(september_string(now(), '%Y %B %d %H:%M:%S')) diff --git a/voussoirkit/expressionmatch.py b/voussoirkit/expressionmatch.py new file mode 100644 index 0000000..fcebd15 --- /dev/null +++ b/voussoirkit/expressionmatch.py @@ -0,0 +1,548 @@ +import time +ESCAPE_SEQUENCES = { + '\\': '\\', + '"': '"', +} + +BINARY_OPERATORS = {'AND', 'OR', 'XOR'} +UNARY_OPERATORS = {'NOT'} +PRECEDENCE = ['NOT', 'AND', 'XOR', 'OR'] +OPERATORS = BINARY_OPERATORS | UNARY_OPERATORS + +# Sentinel values used for breaking up the tokens, so we dont' have to use +# strings '(' and ')' which can get confused with user input. +PAREN_OPEN = object() +PAREN_CLOSE = object() + +DEFAULT_MATCH_FUNCTION = str.__contains__ + +MESSAGE_WRITE_YOUR_OWN_MATCHER = ''' +The default match function is {function}. +Consider passing your own `match_function`, which accepts two +positional arguments: +1. The object being tested. +2. The Expression token, a string. +'''.strip() + +def func_and(values): + return all(values) + +def func_or(values): + return any(values) + +def func_xor(values): + values = list(values) + return values.count(True) % 2 == 1 + +def func_not(value): + value = list(value) + if len(value) != 1: + raise ValueError('NOT only takes 1 value') + return not value[0] + +OPERATOR_FUNCTIONS = { + 'AND': func_and, + 'OR': func_or, + 'XOR': func_xor, + 'NOT': func_not, +} + +class NoTokens(Exception): + pass + +class ExpressionTree: + def __init__(self, token, parent=None): + self.children = [] + self.parent = parent + self.token = token + + def __str__(self): + if self.token is None: + return '""' + + if self.token not in OPERATORS: + t = self.token + t = t.replace('"', '\\"') + t = t.replace('(', '\\(') + t = t.replace(')', '\\)') + if ' ' in t: + t = '"%s"' % t + return t + + if len(self.children) == 1: + child = self.children[0] + childstring = str(child) + if child.token in OPERATORS: + childstring = '(%s)' % childstring + return '%s%s' % (self.token, childstring) + return '%s %s' % (self.token, childstring) + + children = [] + for child in self.children: + childstring = str(child) + if child.token in OPERATORS: + childstring = '(%s)' % childstring + children.append(childstring) + #children = [str(child) for child in self.children] + + if len(children) == 1: + return '%s %s' % (self.token, children[0]) + + s = ' %s ' % self.token + s = s.join(children) + return s + + @classmethod + def parse(cls, tokens, spaces=0): + if isinstance(tokens, str): + tokens = tokenize(tokens) + + if tokens == []: + raise NoTokens() + + if isinstance(tokens[0], list): + current = cls.parse(tokens[0], spaces=spaces+1) + else: + current = cls(token=tokens[0]) + + for token in tokens[1:]: + ##print(' '*spaces, 'cur', current, current.token) + if isinstance(token, list): + new = cls.parse(token, spaces=spaces+1) + else: + new = cls(token=token) + ##print(' '*spaces, 'new', new) + + if 0 == 1: + pass + + elif current.token not in OPERATORS: + if new.token in BINARY_OPERATORS: + if len(new.children) == 0: + new.children.append(current) + current.parent = new + current = new + else: + raise Exception('Expected binary operator, got %s.' % new.token) + + elif current.token in BINARY_OPERATORS: + if new.token in BINARY_OPERATORS: + if new.token == current.token: + for child in new.children: + child.parent = current + current.children.extend(new.children) + else: + if len(new.children) == 0: + new.children.append(current) + current.parent = new + current = new + else: + current.children.append(new) + new.parent = current + + elif new.token in UNARY_OPERATORS: + if len(new.children) == 0: + current.children.append(new) + new.parent = current + current = new + else: + current.children.append(new) + new.parent = current + + elif new.token not in OPERATORS: + if len(current.children) > 0: + current.children.append(new) + new.parent = current + else: + raise Exception('Expected current children > 0.') + + elif current.token in UNARY_OPERATORS: + if len(current.children) == 0: + current.children.append(new) + new.parent = current + if current.parent is not None: + current = current.parent + elif new.token in BINARY_OPERATORS: + if len(new.children) == 0: + new.children.append(current) + current.parent = new + current = new + else: + current.children.append(new) + new.parent = current + if current.parent is not None: + current = current.parent + else: + raise Exception('Expected new to be my operand or parent binary.') + + ##print(' '*spaces, 'fin:', current.rootmost(), '\n') + + current = current.rootmost() + ##print('---', current) + return current + + def _evaluate(self, text, match_function=None): + if self.token not in OPERATORS: + if match_function is None: + match_function = DEFAULT_MATCH_FUNCTION + + value = match_function(text, self.token) + #print(self.token, value) + return value + + operator_function = OPERATOR_FUNCTIONS[self.token] + children = (child.evaluate(text, match_function=match_function) for child in self.children) + return operator_function(children) + + def diagram(self): + if self.token is None: + return '""' + t = self.token + if ' ' in t: + t = '"%s"' % t + + output = t + indent = 1 + for child in self.children: + child = child.diagram() + for line in child.splitlines(): + output += (' ' * indent) + output += line + '\n' + indent = len(t) + 1 + output = output.strip() + + return output + + def evaluate(self, text, match_function=None): + if match_function is None: + match_function = DEFAULT_MATCH_FUNCTION + + try: + return self._evaluate(text, match_function) + except Exception as e: + if match_function is DEFAULT_MATCH_FUNCTION: + message = MESSAGE_WRITE_YOUR_OWN_MATCHER.format(function=DEFAULT_MATCH_FUNCTION) + override = Exception(message) + raise override from e + raise e + + @property + def is_leaf(self): + return self.token not in OPERATORS + + def map(self, function): + ''' + Apply this function to all of the operands. + ''' + for node in self.walk_leaves(): + node.token = function(node.token) + + def prune(self): + ''' + Remove any nodes where `token` is None. + ''' + self.children = [child for child in self.children if child.token is not None] + + for child in self.children: + child.prune() + + if self.token in OPERATORS and len(self.children) == 0: + self.token = None + if self.parent is not None: + self.parent.children.remove(self) + + def rootmost(self): + current = self + while current.parent is not None: + current = current.parent + return current + + def walk(self): + yield self + for child in self.children: + yield from child.walk() + + def walk_leaves(self): + for node in self.walk(): + if node.is_leaf: + yield node + + +def implied_tokens(tokens): + ''' + 1. If two operands are directly next to each other, or an operand is followed + by a unary operator, it is implied that there is an AND between them. + '1 2' -> '1 AND 2' + '1 NOT 2' -> '1 AND NOT 2' + + 2. If an expression begins or ends with an invalid operator, remove it. + 'AND 2' -> '2' + '2 AND' -> '2' + + 3. If a parenthetical term contains only 1 item, the parentheses can be removed. + '(a)' -> 'a' + '(NOT a)' -> 'NOT a' + '(a OR)' -> '(a)' (by rule 2) -> 'a' + + 4. If two operators are next to each other, except for binary-unary, + keep only the first. + '1 OR AND 2' -> '1 OR 2' + '1 NOT AND 2' -> '1 AND NOT AND 2' (by rule 1) -> '1 AND NOT 2' + 'NOT NOT 1' -> 'NOT 1' + '1 AND NOT NOT 2' -> '1 AND NOT 2' + ''' + final_tokens = [] + has_operand = False + has_binary_operator = False + has_unary_operator = False + + if len(tokens) == 1 and not isinstance(tokens[0], str): + # [['A' 'AND' 'B']] -> ['A' 'AND' 'B'] + tokens = tokens[0] + + for token in tokens: + skip_this = False + while isinstance(token, (list, tuple)): + if len(token) == 0: + # Delete empty parentheses. + skip_this = True + break + if len(token) == 1: + # Take singular terms out of their parentheses. + token = token[0] + else: + previous = token + token = implied_tokens(token) + if previous == token: + break + + if skip_this: + continue + + #print('tk:', token, 'hu:', has_unary_operator, 'hb:', has_binary_operator, 'ho:', has_operand) + if isinstance(token, str) and token in OPERATORS: + this_binary = token in BINARY_OPERATORS + this_unary = not this_binary + + # 'NOT AND' and 'AND AND' are malformed... + if this_binary and (has_binary_operator or has_unary_operator): + continue + # ...'NOT NOT' is malformed... + if this_unary and has_unary_operator: + continue + # ...but AND NOT is okay. + + # 'AND test' is malformed + if this_binary and not has_operand: + continue + + if this_unary and has_operand: + final_tokens.append('AND') + + has_unary_operator = this_unary + has_binary_operator = this_binary + has_operand = False + + else: + if has_operand: + final_tokens.append('AND') + has_unary_operator = False + has_binary_operator = False + has_operand = True + + final_tokens.append(token) + + if has_binary_operator or has_unary_operator: + final_tokens.pop(-1) + + return final_tokens + +def order_operations(tokens): + for (index, token) in enumerate(tokens): + if isinstance(token, list): + tokens[index] = order_operations(token) + + if len(tokens) < 5: + return tokens + + index = 0 + slice_start = None + slice_end = None + precedence_stack = [] + while index < len(tokens): + #time.sleep(0.1) + token = tokens[index] + try: + precedence = PRECEDENCE.index(token) + except ValueError: + precedence = None + + if precedence is None: + index += 1 + continue + precedence_stack.append(precedence) + + + if token in UNARY_OPERATORS: + slice_start = index + slice_end = index + 2 + + elif len(precedence_stack) > 1: + if precedence_stack[-1] < precedence_stack[-2]: + slice_start = index - 1 + slice_end = None + elif precedence_stack[-2] < precedence_stack[-1]: + slice_end = index + + #print(tokens, index, token, precedence_stack, slice_start, slice_end, sep=' || ') + + if slice_start is None or slice_end is None: + index += 1 + continue + + tokens[slice_start:slice_end] = [tokens[slice_start:slice_end]] + slice_start = None + slice_end = None + for x in range(2): + if not precedence_stack: + break + + delete = precedence_stack[-1] + while precedence_stack and precedence_stack[-1] == delete: + index -= 1 + precedence_stack.pop(-1) + + index += 1 + + if slice_start is not None: + slice_end = len(tokens) + tokens[slice_start:slice_end] = [tokens[slice_start:slice_end]] + + return tokens + +def sublist_tokens(tokens, _from_index=0, depth=0): + ''' + Given a list of tokens, replace parentheses with actual sublists. + ['1', 'AND', '(', '3', 'OR', '4', ')'] -> + ['1', 'AND', ['3', 'OR', '4']] + + Unclosed parentheses are automatically closed at the end. + ''' + final_tokens = [] + index = _from_index + while index < len(tokens): + token = tokens[index] + #print(index, token) + index += 1 + if token is PAREN_OPEN: + (token, index) = sublist_tokens(tokens, _from_index=index, depth=depth+1) + if token is PAREN_CLOSE: + break + final_tokens.append(token) + if _from_index == 0: + return final_tokens + else: + return (final_tokens, index) + +def tokenize(expression): + ''' + Break the string into a list of tokens. Spaces are the delimiter unless + they are inside quotation marks. + + Quotation marks and parentheses can be escaped by preceeding with a backslash '\\' + + Opening and closing parentheses are put into their own token unless + escaped / quoted. + + Extraneous closing parentheses are ignored completely. + + '1 AND(4 OR "5 6") OR \\(test\\)' -> + ['1', 'AND', '(', '4', 'OR', '5 6', ')', 'OR', '\\(test\\)'] + ''' + current_word = [] + in_escape = False + in_quotes = False + paren_depth = 0 + tokens = [] + for character in expression: + if in_escape: + #character = ESCAPE_SEQUENCES.get(character, '\\'+character) + in_escape = False + + elif character in {'(', ')'} and not in_quotes: + if character == '(': + sentinel = PAREN_OPEN + paren_depth += 1 + elif character == ')': + sentinel = PAREN_CLOSE + paren_depth -= 1 + + if paren_depth >= 0: + tokens.append(''.join(current_word)) + tokens.append(sentinel) + current_word.clear() + continue + else: + continue + + elif character == '\\': + in_escape = True + continue + + elif character == '"': + in_quotes = not in_quotes + continue + + elif character.isspace() and not in_quotes: + tokens.append(''.join(current_word)) + current_word.clear() + continue + + current_word.append(character) + + tokens.append(''.join(current_word)) + tokens = [w for w in tokens if w != ''] + tokens = sublist_tokens(tokens) + tokens = implied_tokens(tokens) + tokens = order_operations(tokens) + return tokens + +if __name__ == '__main__': + tests = [ + #'test you AND(1 OR "harrison ford") AND (where are you) AND pg', + #'(you OR "AND ME")', + #'(3 XOR 2 OR 4', + #'1 NOT OR AND (2 OR (3 OR 4) OR (5 OR 6)))', + #'3 OR (5 OR)', + #'1 AND(4 OR "5 6")OR \\(test) 2', + #'1 2 AND (3 OR 4)', + #'AND 2', + #'1 AND 2 AND ("3 7" OR 6)AND (4 OR 5)', + #'NOT 1 AND NOT (2 OR 3)', + #'1 AND 2 AND 3 AND 4', + #'NOT 1 AND 2 OR 3 OR (5 AND 6)', + #'5 OR 6 AND 7 OR 8', + #'1 OR 2 AND 3 AND 4 OR 5 AND 6 OR 7 OR 8 AND 9', + #'2 XOR 3 AND 4', + #'1 OR (2 OR 3 AND 4)', + #'NOT XOR 4 7' + '[sci-fi] OR [pg-13]', + '([sci-fi] OR [war]) AND [r]', + '[r] XOR [sci-fi]', + '"mark hamill" "harrison ford"', + ] + teststrings = { + 'Star Wars': '[harrison ford] [george lucas] [sci-fi] [pg] [carrie fisher] [mark hamill] [space]', + 'Blade Runner': '[harrison ford] [ridley scott] [neo-noir] [dystopian] [sci-fi] [r]', + 'Indiana Jones': '[harrison ford] [steven spielberg] [adventure] [pg-13]', + 'Apocalypse Now': '[harrison ford] [francis coppola] [r] [war] [drama]' + } + for token in tests: + print('start:', token) + token = tokenize(token) + print('implied:', token) + e = ExpressionTree.parse(token) + print('tree:', e) + for (name, teststring) in teststrings.items(): + print('Matches', name, ':', e.evaluate(teststring)) + print() diff --git a/voussoirkit/fusker.py b/voussoirkit/fusker.py new file mode 100644 index 0000000..7eba573 --- /dev/null +++ b/voussoirkit/fusker.py @@ -0,0 +1,138 @@ +import collections +import itertools +import string +import sys + +from voussoirkit import basenumber + +class Landmark: + def __init__(self, opener, closer, parser): + self.opener = opener + self.closer = closer + self.parser = parser + +def barsplit(chars): + wordlist = [] + wordbuff = [] + def flush(): + if not wordbuff: + return + word = fusk_join(wordbuff) + wordlist.append(word) + wordbuff.clear() + for item in chars: + if item == '|': + flush() + else: + wordbuff.append(item) + flush() + return wordlist + +def fusk_join(items): + form = '' + fusks = [] + result = [] + for item in items: + if isinstance(item, str): + form += item + else: + form += '{}' + fusks.append(item) + product = itertools.product(*fusks) + for group in product: + f = form.format(*group) + result.append(f) + return result + +def fusk_spinner(items): + for item in items: + if isinstance(item, str): + yield item + else: + yield from item + +def parse_spinner(characters): + words = barsplit(characters) + spinner = fusk_spinner(words) + return spinner + +def fusk_range(lo, hi, padto=0, base=10, lower=False): + for x in range(lo, hi+1): + x = basenumber.to_base(x, base) + x = x.rjust(padto, '0') + if lower: + x = x.lower() + yield x + +def parse_range(characters): + r = ''.join(characters) + (lo, hi) = r.split('-') + lo = lo.strip() + hi = hi.strip() + + lowers = string.digits + string.ascii_lowercase + uppers = string.digits + string.ascii_uppercase + lohi = lo + hi + lower = False + if all(c in string.digits for c in lohi): + base = 10 + elif all(c in lowers for c in lohi): + lower = True + base = 36 + elif all(c in uppers for c in lohi): + base = 36 + else: + base = 62 + + if (not lo) or (not hi): + raise ValueError('Invalid range', r) + if len(lo) > 1 and lo.startswith('0'): + padto = len(lo) + if len(hi) != padto: + raise ValueError('Inconsistent padding', lo, hi) + else: + padto = 0 + lo = basenumber.from_base(lo, base) + hi = basenumber.from_base(hi, base) + + frange = fusk_range(lo, hi, padto=padto, base=base, lower=lower) + return frange + + +landmarks = { + '{': Landmark('{', '}', parse_spinner), + '[': Landmark('[', ']', parse_range), +} + +def fusker(fstring, landmark=None, depth=0): + escaped = False + result = [] + buff = [] + + if isinstance(fstring, str): + fstring = collections.deque(fstring) + while fstring: + character = fstring.popleft() + if escaped: + buff.append('\\' + character) + escaped = False + elif character == '\\': + escaped = True + elif landmark and character == landmark.closer: + buff = [landmark.parser(buff)] + break + elif character in landmarks: + subtotal = fusker(fstring, landmark=landmarks[character]) + buff.extend(subtotal) + else: + buff.append(character) + if not landmark: + buff = parse_spinner(buff) + return buff + return result + +if __name__ == '__main__': + pattern = sys.argv[1] + fusk = fusker(pattern) + for result in fusk: + print(result) diff --git a/voussoirkit/passwordy.py b/voussoirkit/passwordy.py new file mode 100644 index 0000000..1839ba5 --- /dev/null +++ b/voussoirkit/passwordy.py @@ -0,0 +1,185 @@ +import string +import random +import sys + +DEFAULT_LENGTH = 32 +DEFAULT_SENTENCE = 5 +HELP_MESSAGE = ''' +=============================================================================== +Generates a randomized password. + +> passwordy [length] [options] + + length: How many characters. Default %03d. + options: + h : consist entirely of hexadecimal characters. + b : consist entirely of binary characters. + dd : consist entirely of decimal characters. + default : consist entirely of upper+lower letters. + + p : allow punctuation in conjunction with above. + d : allow digits in conjunction with above. + + l : convert to lowercase. + u : convert to uppercase. + nd : no duplicates. Each character can only appear once. + +Examples: +> passwordy 32 h l +98f17b6016cf08cc00f2aeecc8d8afeb + +> passwordy 32 h u +2AA706866BF7A5C18328BF866136A261 + +> passwordy 32 u +JHEPTKCEFZRFXILMASHNPSTFFNWQHTTN + +> passwordy 32 p +Q+:iSKX!Nt)ewUvlE*!+^D}hp+| passwordy 32 l p +m*'otz/"!qo?-^wwdu@fasf:|ldkosi` + +=============================================================================== + +Generates a randomized sentence of words. + +> passwordy sent [length] [join] + + length : How many words. Default %03d. + join : The character that will join words together. + Default space. + +Examples: +> passwordy sent +arrowroot sheared rustproof undo propionic acid + +> passwordy sent 8 +cipher competition solid angle rigmarole lachrymal social class critter consequently + +> passwordy sent 8 _ +Kahn_secondary_emission_unskilled_superior_court_straight_ticket_voltameter_hopper_crass + +=============================================================================== + '''.strip() % (DEFAULT_LENGTH, DEFAULT_SENTENCE) + + +def listget(li, index, fallback=None): + try: + return li[index] + except IndexError: + return fallback + +def make_password(length=None, passtype='standard'): + ''' + Returns a string of length `length` consisting of a random selection + of uppercase and lowercase letters, as well as punctuation and digits + if parameters permit + ''' + if length is None: + length = DEFAULT_LENGTH + + alphabet = '' + + if 'standard' in passtype: + alphabet = string.ascii_letters + elif 'digit_only' in passtype: + alphabet = string.digits + elif 'hex' in passtype: + alphabet = '0123456789abcdef' + elif 'binary' in passtype: + alphabet = '01' + + if '+digits' in passtype: + alphabet += string.digits + if '+punctuation' in passtype: + alphabet += string.punctuation + if '+lowercase' in passtype: + alphabet = alphabet.lower() + elif '+uppercase' in passtype: + alphabet = alphabet.upper() + + alphabet = list(set(alphabet)) + + if '+noduplicates' in passtype: + if len(alphabet) < length: + message = 'Alphabet "%s" is not long enough to support no-dupe password of length %d' + message = message % (alphabet, length) + raise Exception(message) + password = '' + for x in range(length): + random.shuffle(alphabet) + password += alphabet.pop(0) + else: + password = ''.join([random.choice(alphabet) for x in range(length)]) + return password + +def make_sentence(length=None, joiner=' '): + ''' + Returns a string containing `length` words, which come from + dictionary.common. + ''' + import dictionary.common as common + if length is None: + length = DEFAULT_LENGTH + words = [random.choice(common.words) for x in range(length)] + words = [w.replace(' ', joiner) for w in words] + result = joiner.join(words) + return result + +if __name__ == '__main__': + args = sys.argv[1:] + argc = len(args) + + mode = listget(args, 0, 'password') + if 'help' in mode: + print(HELP_MESSAGE) + quit() + + if 'sent' not in mode: + length = listget(args, 0, str(DEFAULT_LENGTH)) + options = [a.lower() for a in args[1:]] + + if '-' in length: + length = length.replace(' ', '') + length = [int(x) for x in length.split('-', 1)] + length = random.randint(*length) + + elif not length.isdigit() and options == []: + options = [length] + length = DEFAULT_LENGTH + + length = int(length) + + passtype = 'standard' + if 'dd' in options: + passtype = 'digit_only' + if 'b' in options: + passtype = 'binary' + if 'h' in options: + passtype = 'hex' + + if 'l' in options: + passtype += '+lowercase' + elif 'u' in options: + passtype += '+uppercase' + if 'p' in options: + passtype += '+punctuation' + if 'd' in options: + passtype += '+digits' + if 'nd' in options: + passtype += '+noduplicates' + + print(make_password(length, passtype=passtype)) + + else: + length = listget(args, 1, str(DEFAULT_SENTENCE)) + joiner = listget(args, 2, ' ') + + if not length.isdigit(): + joiner = length + length = DEFAULT_SENTENCE + + length = int(length) + + print(make_sentence(length, joiner)) \ No newline at end of file diff --git a/voussoirkit/pathclass.py b/voussoirkit/pathclass.py new file mode 100644 index 0000000..6e0b6fc --- /dev/null +++ b/voussoirkit/pathclass.py @@ -0,0 +1,267 @@ +import glob +import os +import re + + +class PathclassException(Exception): + pass + + +class NotDirectory(PathclassException): + pass + + +class NotFile(PathclassException): + pass + + +class Path: + ''' + I started to use pathlib.Path, but it was too much of a pain. + ''' + def __init__(self, path): + if isinstance(path, Path): + self.absolute_path = path.absolute_path + else: + path = path.strip() + if re.search('[A-Za-z]:$', path): + # Bare Windows drive letter. + path += os.sep + path = normalize_sep(path) + path = os.path.normpath(path) + path = os.path.abspath(path) + self.absolute_path = path + + def __contains__(self, other): + if isinstance(other, Path): + other = other.normcase + return other.startswith(self.normcase) + + def __eq__(self, other): + if not hasattr(other, 'absolute_path'): + return False + return self.normcase == other.normcase + + def __hash__(self): + return hash(self.normcase) + + def __repr__(self): + return '{c}({path})'.format(c=self.__class__.__name__, path=repr(self.absolute_path)) + + def assert_is_file(self): + if not self.is_file: + raise NotFile(self) + + def assert_is_directory(self): + if not self.is_dir: + raise NotDirectory(self) + + @property + def basename(self): + return os.path.basename(self.absolute_path) + + def correct_case(self): + self.absolute_path = get_path_casing(self.absolute_path) + return self.absolute_path + + @property + def depth(self): + return len(self.absolute_path.split(os.sep)) + + @property + def exists(self): + return os.path.exists(self.absolute_path) + + @property + def extension(self): + return os.path.splitext(self.absolute_path)[1].lstrip('.') + + @property + def is_dir(self): + return os.path.isdir(self.absolute_path) + + @property + def is_file(self): + return os.path.isfile(self.absolute_path) + + @property + def is_link(self): + return os.path.islink(self.absolute_path) + + def join(self, subpath): + if not isinstance(subpath, str): + raise TypeError('subpath must be a string') + return Path(os.path.join(self.absolute_path, subpath)) + + def listdir(self): + children = os.listdir(self.absolute_path) + children = [self.with_child(child) for child in children] + return children + + @property + def normcase(self): + return os.path.normcase(self.absolute_path) + + @property + def parent(self): + parent = os.path.dirname(self.absolute_path) + parent = self.__class__(parent) + return parent + + @property + def relative_path(self): + return self.relative_to(os.getcwd()) + + def relative_to(self, other): + other = Path(other) + other.correct_case() + self.correct_case() + + if self == other: + return '.' + + if self in other: + return self.absolute_path.replace(other.absolute_path, '.') + + common = common_path([other.absolute_path, self.absolute_path], fallback=None) + print(common) + if common is None: + return self.absolute_path + backsteps = other.depth - common.depth + backsteps = os.sep.join('..' for x in range(backsteps)) + return self.absolute_path.replace(common.absolute_path, backsteps) + + def replace_extension(self, extension): + extension = extension.rsplit('.', 1)[-1] + base = os.path.splitext(self.absolute_path)[0] + + if extension == '': + return Path(base) + + return Path(base + '.' + extension) + + @property + def size(self): + if self.is_file: + return os.path.getsize(self.absolute_path) + else: + return None + + @property + def stat(self): + return os.stat(self.absolute_path) + + def with_child(self, basename): + return self.join(os.path.basename(basename)) + + +def common_path(paths, fallback): + ''' + Given a list of file paths, determine the deepest path which all + have in common. + ''' + if isinstance(paths, (str, Path)): + raise TypeError('`paths` must be a collection') + paths = [Path(f) for f in paths] + + if len(paths) == 0: + raise ValueError('Empty list') + + if hasattr(paths, 'pop'): + model = paths.pop() + else: + model = paths[0] + paths = paths[1:] + + while True: + if all(f in model for f in paths): + return model + parent = model.parent + if parent == model: + # We just processed the root, and now we're stuck at the root. + # Which means there was no common path. + return fallback + model = parent + +def get_path_casing(path): + ''' + Take what is perhaps incorrectly cased input and get the path's actual + casing according to the filesystem. + + Thank you: + Ethan Furman http://stackoverflow.com/a/7133137/5430534 + xvorsx http://stackoverflow.com/a/14742779/5430534 + ''' + if not isinstance(path, Path): + path = Path(path) + + # Nonexistent paths don't glob correctly. If the input is a nonexistent + # subpath of an existing path, we have to glob the existing portion first, + # and then attach the fake portion again at the end. + input_path = path + while not path.exists: + parent = path.parent + if path == parent: + # We're stuck at a fake root. + return input_path.absolute_path + path = parent + + path = path.absolute_path + + (drive, subpath) = os.path.splitdrive(path) + drive = drive.upper() + subpath = subpath.lstrip(os.sep) + + pattern = [glob_patternize(piece) for piece in subpath.split(os.sep)] + pattern = os.sep.join(pattern) + pattern = drive + os.sep + pattern + + try: + cased = glob.glob(pattern)[0] + except IndexError: + return input_path.absolute_path + + imaginary_portion = input_path.absolute_path + imaginary_portion = imaginary_portion[len(cased):] + #real_portion = os.path.normcase(cased) + #imaginary_portion = imaginary_portion.replace(real_portion, '') + imaginary_portion = imaginary_portion.lstrip(os.sep) + cased = os.path.join(cased, imaginary_portion) + cased = cased.rstrip(os.sep) + if not os.sep in cased: + cased += os.sep + return cased + +def glob_patternize(piece): + ''' + Create a pattern like "[u]ser" from "user", forcing glob to look up the + correct path name, while guaranteeing that the only result will be the correct path. + + Special cases are: + `!` + because in glob syntax, [!x] tells glob to look for paths that don't contain + "x", and [!] is invalid syntax. + `[`, `]` + because this starts a glob capture group + + so we pick the first non-special character to put in the brackets. + If the path consists entirely of these special characters, then the + casing doesn't need to be corrected anyway. + ''' + piece = glob.escape(piece) + for character in piece: + if character not in '![]': + replacement = '[%s]' % character + #print(piece, character, replacement) + piece = piece.replace(character, replacement, 1) + break + return piece + +def normalize_sep(path): + for char in ('\\', '/'): + if char != os.sep: + path = path.replace(char, os.sep) + return path + +def system_root(): + return os.path.abspath(os.sep) diff --git a/voussoirkit/quickid.py b/voussoirkit/quickid.py new file mode 100644 index 0000000..4d23bb7 --- /dev/null +++ b/voussoirkit/quickid.py @@ -0,0 +1,57 @@ +''' +This module is designed to provide a GOOD ENOUGH means of identifying duplicate +files very quickly, so that more in-depth checks can be done on likely matches. +''' + +import hashlib +import os +import sys + +SEEK_END = 2 +CHUNK_SIZE = 2 * 2**20 +FORMAT = '{size}_{chunk_size}_{hash}' + +def equal(handle1, handle2, *args, **kwargs): + size1 = handle1.seek(0, SEEK_END) + size2 = handle2.seek(0, SEEK_END) + handle1.seek(0) + handle2.seek(0) + if size1 != size2: + return False + return quickid(handle1, *args, **kwargs) == quickid(handle2, *args, **kwargs) + +def equal_file(filename1, filename2, *args, **kwargs): + filename1 = os.path.abspath(filename1) + filename2 = os.path.abspath(filename2) + with open(filename1, 'rb') as handle1, open(filename2, 'rb') as handle2: + return equal(handle1, handle2, *args, **kwargs) + +def quickid(handle, hashclass=None, chunk_size=None): + if hashclass is None: + hashclass = hashlib.md5 + if chunk_size is None: + chunk_size = CHUNK_SIZE + + hasher = hashclass() + size = handle.seek(0, SEEK_END) + handle.seek(0) + + if size <= 2 * chunk_size: + hasher.update(handle.read()) + else: + hasher.update(handle.read(chunk_size)) + handle.seek(-1 * chunk_size, SEEK_END) + hasher.update(handle.read()) + + return FORMAT.format(size=size, chunk_size=chunk_size, hash=hasher.hexdigest()) + +def quickid_file(filename, *args, **kwargs): + filename = os.path.abspath(filename) + with open(filename, 'rb') as handle: + return quickid(handle, *args, **kwargs) + +def main(argv): + print(quickid_file(argv[0])) + +if __name__ == '__main__': + raise SystemExit(main(sys.argv[1:])) diff --git a/voussoirkit/ratelimiter.py b/voussoirkit/ratelimiter.py new file mode 100644 index 0000000..a2ccc35 --- /dev/null +++ b/voussoirkit/ratelimiter.py @@ -0,0 +1,66 @@ +import time + + +class Ratelimiter: + def __init__(self, allowance, period=1, operation_cost=1, mode='sleep'): + ''' + allowance: + Our spending balance per `period` seconds. + + period: + The number of seconds over which we can perform `allowance` operations. + + operation_cost: + The default amount to remove from our balance after each operation. + Pass a `cost` parameter to `self.limit` to use a nondefault value. + + mode: + 'sleep': + If we do not have the balance for an operation, sleep until we do. + Return True every time. + + 'reject': + If we do not have the balance for an operation, return False. + The cost is not subtracted, so hopefully we have enough next time. + ''' + if mode not in ('sleep', 'reject'): + raise ValueError('Invalid mode %s' % repr(mode)) + + self.allowance = allowance + self.period = period + self.operation_cost = operation_cost + self.mode = mode + + self.last_operation = time.time() + self.balance = 0 + + @property + def gain_rate(self): + return self.allowance / self.period + + def limit(self, cost=None): + ''' + See the main class docstring for info about cost and mode behavior. + ''' + if cost is None: + cost = self.operation_cost + + time_diff = time.time() - self.last_operation + self.balance += time_diff * self.gain_rate + self.balance = min(self.balance, self.allowance) + + if self.balance >= cost: + self.balance -= cost + succesful = True + else: + if self.mode == 'reject': + succesful = False + else: + deficit = cost - self.balance + time_needed = deficit / self.gain_rate + time.sleep(time_needed) + self.balance = 0 + succesful = True + + self.last_operation = time.time() + return succesful diff --git a/voussoirkit/ratemeter.py b/voussoirkit/ratemeter.py new file mode 100644 index 0000000..4b4b58d --- /dev/null +++ b/voussoirkit/ratemeter.py @@ -0,0 +1,64 @@ +import collections +import math +import time + +class RateMeter: + def __init__(self, span): + ''' + This class is used to calculate a rolling average of + units per second over `span` seconds. + + Set `span` to None to calculate unit/s over the lifetime of the object + after the first digest, rather than over a span. + This saves the effort of tracking timestamps. Don't just use a large number! + ''' + self.sum = 0 + self.span = span + + self.tracking = collections.deque() + self.first_digest = None + + def digest(self, value): + now = time.time() + self.sum += value + + if self.span is None: + if self.first_digest is None: + self.first_digest = now + return + + earlier = now - self.span + while len(self.tracking) > 0 and self.tracking[0][0] < earlier: + (timestamp, pop_value) = self.tracking.popleft() + self.sum -= pop_value + + if len(self.tracking) == 0 or self.tracking[-1] != now: + self.tracking.append([now, value]) + else: + self.tracking[-1][1] += value + + def report(self): + ''' + Return a tuple containing the running sum, the time span + over which the rate is being calculated, and the rate in + units per second. + + (sum, time_interval, rate) + ''' + # Flush the old values, ensure self.first_digest exists. + self.digest(0) + + if self.span is None: + now = math.ceil(time.time()) + time_interval = now - self.first_digest + else: + # No risk of IndexError because the digest(0) ensures we have + # at least one entry. + time_interval = self.tracking[-1][0] - self.tracking[0][0] + + if time_interval == 0: + return (self.sum, 0, self.sum) + rate = self.sum / time_interval + time_interval = round(time_interval, 3) + rate = round(rate, 3) + return (self.sum, time_interval, rate) diff --git a/voussoirkit/safeprint.py b/voussoirkit/safeprint.py new file mode 100644 index 0000000..196e565 --- /dev/null +++ b/voussoirkit/safeprint.py @@ -0,0 +1,18 @@ +''' +This function is slow and ugly, but I need a way to safely print unicode strings +on systems that don't support it without crippling those who do. +''' +def safeprint(text, file_handle=None, end='\n'): + for character in text: + try: + if file_handle: + file_handle.write(character) + else: + print(character, end='', flush=False) + except UnicodeError: + if file_handle: + file_handle.write('?') + else: + print('?', end='', flush=False) + if not file_handle: + print(end, end='', flush=True) diff --git a/voussoirkit/spinal.py b/voussoirkit/spinal.py new file mode 100644 index 0000000..897cc0d --- /dev/null +++ b/voussoirkit/spinal.py @@ -0,0 +1,713 @@ +import collections +import hashlib +import logging +import os +import shutil +import sys + +# pip install voussoirkit +from voussoirkit import bytestring +from voussoirkit import pathclass +from voussoirkit import ratelimiter + +logging.basicConfig(level=logging.CRITICAL) +log = logging.getLogger(__name__) + +CHUNK_SIZE = 2 * bytestring.MIBIBYTE +# Number of bytes to read and write at a time + +HASH_CLASS = hashlib.md5 + +class DestinationIsDirectory(Exception): + pass + +class DestinationIsFile(Exception): + pass + +class RecursiveDirectory(Exception): + pass + +class SourceNotDirectory(Exception): + pass + +class SourceNotFile(Exception): + pass + +class SpinalError(Exception): + pass + +class ValidationError(Exception): + pass + +def callback_exclusion_v1(name, path_type): + ''' + Example of an exclusion callback function. + ''' + print('Excluding', path_type, name) + +def callback_v1(fpobj, written_bytes, total_bytes): + ''' + Example of a copy callback function. + + Prints "filename written/total (percent%)" + ''' + filename = fpobj.absolute_path.encode('ascii', 'replace').decode() + if written_bytes >= total_bytes: + ends = '\r\n' + else: + ends = '' + percent = (100 * written_bytes) / max(total_bytes, 1) + percent = '%07.3f' % percent + written = '{:,}'.format(written_bytes) + total = '{:,}'.format(total_bytes) + written = written.rjust(len(total), ' ') + status = '{filename} {written}/{total} ({percent}%)\r' + status = status.format(filename=filename, written=written, total=total, percent=percent) + print(status, end=ends) + sys.stdout.flush() + +def copy(source, file_args=None, file_kwargs=None, dir_args=None, dir_kwargs=None): + ''' + Perform copy_dir or copy_file as appropriate for the source path. + ''' + source = str_to_fp(source) + if source.is_file: + file_args = file_args or tuple() + file_kwargs = file_kwargs or dict() + return copy_file(source, *file_args, **file_kwargs) + elif source.is_dir: + dir_args = dir_args or tuple() + dir_kwargs = dir_kwargs or dict() + return copy_dir(source, *dir_args, **dir_kwargs) + raise SpinalError('Neither file nor dir: %s' % source) + +def copy_dir( + source, + destination=None, + *, + bytes_per_second=None, + callback_directory=None, + callback_exclusion=None, + callback_file=None, + callback_permission_denied=None, + destination_new_root=None, + dry_run=False, + exclude_directories=None, + exclude_filenames=None, + files_per_second=None, + overwrite_old=True, + precalcsize=False, + validate_hash=False, + ): + ''' + Copy all of the contents from source to destination, + including subdirectories. + + source: + The directory which will be copied. + + destination: + The directory in which copied files are placed. Alternatively, use + destination_new_root. + + bytes_per_second: + Restrict file copying to this many bytes per second. Can be an integer + or an existing Ratelimiter object. + The BYTE, KIBIBYTE, etc constants from module 'bytestring' may help. + + Default = None + + callback_directory: + This function will be called after each file copy with three parameters: + name of file copied, number of bytes written to destination directory + so far, total bytes needed (based on precalcsize). + If `precalcsize` is False, this function will receive written bytes + for both written and total, showing 100% always. + + Default = None + + callback_exclusion: + Passed directly into `walk_generator`. + + Default = None + + callback_file: + Will be passed into each individual `copy_file` operation as the + `callback` for that file. + + Default = None + + callback_permission_denied: + Will be passed into each individual `copy_file` operation as the + `callback_permission_denied` for that file. + + Default = None + + destination_new_root: + Determine the destination path by calling + `new_root(source, destination_new_root)`. + Thus, this path acts as a root and the rest of the path is matched. + + `destination` and `destination_new_root` are mutually exclusive. + + dry_run: + Do everything except the actual file copying. + + Default = False + + exclude_filenames: + Passed directly into `walk_generator`. + + Default = None + + exclude_directories: + Passed directly into `walk_generator`. + + Default = None + + files_per_second: + Maximum number of files to be processed per second. Helps to keep CPU + usage low. + + Default = None + + overwrite_old: + If True, overwrite the destination file if the source file + has a more recent "last modified" timestamp. + + Default = True + + precalcsize: + If True, calculate the size of source before beginning the + operation. This number can be used in the callback_directory function. + Else, callback_directory will receive written bytes as total bytes + (showing 100% always). + This can take a long time. + + Default = False + + validate_hash: + Passed directly into each `copy_file`. + + Returns: [destination path, number of bytes written to destination] + (Written bytes is 0 if all files already existed.) + ''' + # Prepare parameters + if not is_xor(destination, destination_new_root): + message = 'One and only one of `destination` and ' + message += '`destination_new_root` can be passed.' + raise ValueError(message) + + source = str_to_fp(source) + + if destination_new_root is not None: + source.correct_case() + destination = new_root(source, destination_new_root) + destination = str_to_fp(destination) + + if destination in source: + raise RecursiveDirectory(source, destination) + + if not source.is_dir: + raise SourceNotDirectory(source) + + if destination.is_file: + raise DestinationIsFile(destination) + + if precalcsize is True: + total_bytes = get_dir_size(source) + else: + total_bytes = 0 + + callback_directory = callback_directory or do_nothing + bytes_per_second = limiter_or_none(bytes_per_second) + files_per_second = limiter_or_none(files_per_second) + + # Copy + written_bytes = 0 + walker = walk_generator( + source, + callback_exclusion=callback_exclusion, + exclude_directories=exclude_directories, + exclude_filenames=exclude_filenames, + ) + for source_abspath in walker: + # Terminology: + # abspath: C:\folder\subfolder\filename.txt + # location: C:\folder\subfolder + # base_name: filename.txt + # folder: subfolder + + destination_abspath = source_abspath.absolute_path.replace( + source.absolute_path, + destination.absolute_path + ) + destination_abspath = str_to_fp(destination_abspath) + + if destination_abspath.is_dir: + raise DestinationIsDirectory(destination_abspath) + + destination_location = os.path.split(destination_abspath.absolute_path)[0] + if not dry_run: + os.makedirs(destination_location, exist_ok=True) + + copied = copy_file( + source_abspath, + destination_abspath, + bytes_per_second=bytes_per_second, + callback_progress=callback_file, + callback_permission_denied=callback_permission_denied, + dry_run=dry_run, + overwrite_old=overwrite_old, + validate_hash=validate_hash, + ) + + copiedname = copied[0] + written_bytes += copied[1] + + if precalcsize is False: + callback_directory(copiedname, written_bytes, written_bytes) + else: + callback_directory(copiedname, written_bytes, total_bytes) + + if files_per_second is not None: + files_per_second.limit(1) + + return [destination, written_bytes] + +def copy_file( + source, + destination=None, + *, + destination_new_root=None, + bytes_per_second=None, + callback_progress=None, + callback_permission_denied=None, + callback_validate_hash=None, + dry_run=False, + overwrite_old=True, + validate_hash=False, + ): + ''' + Copy a file from one place to another. + + source: + The file to copy. + + destination: + The filename of the new copy. Alternatively, use + destination_new_root. + + destination_new_root: + Determine the destination path by calling + `new_root(source_dir, destination_new_root)`. + Thus, this path acts as a root and the rest of the path is matched. + + bytes_per_second: + Restrict file copying to this many bytes per second. Can be an integer + or an existing Ratelimiter object. + The provided BYTE, KIBIBYTE, etc constants may help. + + Default = None + + callback_permission_denied: + If provided, this function will be called when a source file denies + read access, with the file path and the exception object as parameters. + THE OPERATION WILL RETURN NORMALLY. + + If not provided, the PermissionError is raised. + + Default = None + + callback_progress: + If provided, this function will be called after writing + each CHUNK_SIZE bytes to destination with three parameters: + the Path object being copied, number of bytes written so far, + total number of bytes needed. + + Default = None + + callback_validate_hash: + Passed directly into `verify_hash` + + Default = None + + dry_run: + Do everything except the actual file copying. + + Default = False + + overwrite_old: + If True, overwrite the destination file if the source file + has a more recent "last modified" timestamp. + + Default = True + + validate_hash: + If True, verify the file hash of the resulting file, using the + `HASH_CLASS` global. + + Default = False + + Returns: [destination filename, number of bytes written to destination] + (Written bytes is 0 if the file already existed.) + ''' + # Prepare parameters + if not is_xor(destination, destination_new_root): + message = 'One and only one of `destination` and ' + message += '`destination_new_root` can be passed' + raise ValueError(message) + + source = str_to_fp(source) + + if not source.is_file: + raise SourceNotFile(source) + + if destination_new_root is not None: + source.correct_case() + destination = new_root(source, destination_new_root) + destination = str_to_fp(destination) + + callback_progress = callback_progress or do_nothing + + if destination.is_dir: + destination = destination.with_child(source.basename) + + bytes_per_second = limiter_or_none(bytes_per_second) + + # Determine overwrite + if destination.exists: + if overwrite_old is False: + return [destination, 0] + + source_modtime = source.stat.st_mtime + if source_modtime == destination.stat.st_mtime: + return [destination, 0] + + # Copy + if dry_run: + if callback_progress is not None: + callback_progress(destination, 0, 0) + return [destination, 0] + + source_bytes = source.size + destination_location = os.path.split(destination.absolute_path)[0] + os.makedirs(destination_location, exist_ok=True) + + def handlehelper(path, mode): + try: + handle = open(path.absolute_path, mode) + return handle + except PermissionError as exception: + if callback_permission_denied is not None: + callback_permission_denied(path, exception) + return None + else: + raise + + log.debug('Opening handles.') + source_handle = handlehelper(source, 'rb') + destination_handle = handlehelper(destination, 'wb') + if None in (source_handle, destination_handle): + return [destination, 0] + + if validate_hash: + hasher = HASH_CLASS() + + written_bytes = 0 + while True: + try: + data_chunk = source_handle.read(CHUNK_SIZE) + except PermissionError as e: + print(source) + raise + data_bytes = len(data_chunk) + if data_bytes == 0: + break + + if validate_hash: + hasher.update(data_chunk) + + destination_handle.write(data_chunk) + written_bytes += data_bytes + + if bytes_per_second is not None: + bytes_per_second.limit(data_bytes) + + callback_progress(destination, written_bytes, source_bytes) + + if written_bytes == 0: + # For zero-length files, we want to get at least one call in there. + callback_progress(destination, written_bytes, source_bytes) + + # Fin + log.debug('Closing source handle.') + source_handle.close() + log.debug('Closing dest handle.') + destination_handle.close() + log.debug('Copying metadata') + shutil.copystat(source.absolute_path, destination.absolute_path) + + if validate_hash: + verify_hash( + destination, + callback=callback_validate_hash, + known_size=source_bytes, + known_hash=hasher.hexdigest(), + ) + + return [destination, written_bytes] + +def do_nothing(*args): + ''' + Used by other functions as the default callback. + ''' + return + +def get_dir_size(path): + ''' + Calculate the total number of bytes across all files in this directory + and its subdirectories. + ''' + path = str_to_fp(path) + + if not path.is_dir: + raise SourceNotDirectory(path) + + total_bytes = 0 + for filepath in walk_generator(path): + total_bytes += filepath.size + + return total_bytes + +def is_subfolder(parent, child): + ''' + Determine whether parent contains child. + ''' + parent = normalize(str_to_fp(parent).absolute_path) + os.sep + child = normalize(str_to_fp(child).absolute_path) + os.sep + return child.startswith(parent) + +def is_xor(*args): + ''' + Return True if and only if one arg is truthy. + ''' + return [bool(a) for a in args].count(True) == 1 + +def limiter_or_none(value): + if isinstance(value, str): + value = bytestring.parsebytes(value) + if isinstance(value, ratelimiter.Ratelimiter): + limiter = value + elif value is not None: + limiter = ratelimiter.Ratelimiter(allowance=value, period=1) + else: + limiter = None + return limiter + +def new_root(filepath, root): + ''' + Prepend `root` to `filepath`, drive letter included. For example: + "C:\\folder\\subfolder\\file.txt" and "C:\\backups" becomes + "C:\\backups\\C\\folder\\subfolder\\file.txt" + + I use this so that my G: drive can have backups from my C: and D: drives + while preserving directory structure in G:\\D and G:\\C. + ''' + filepath = str_to_fp(filepath).absolute_path + root = str_to_fp(root).absolute_path + filepath = filepath.replace(':', os.sep) + filepath = os.path.normpath(filepath) + filepath = os.path.join(root, filepath) + return str_to_fp(filepath) + +def normalize(text): + ''' + Apply os.path.normpath and os.path.normcase. + ''' + return os.path.normpath(os.path.normcase(text)) + +def str_to_fp(path): + ''' + If `path` is a string, create a Path object, otherwise just return it. + ''' + if isinstance(path, str): + path = pathclass.Path(path) + return path + +def verify_hash(path, known_size, known_hash, callback=None): + ''' + callback: + A function that takes three parameters: + path object, bytes ingested so far, bytes total + ''' + path = str_to_fp(path) + log.debug('Validating hash for "%s" against %s', path.absolute_path, known_hash) + file_size = os.path.getsize(path.absolute_path) + if file_size != known_size: + raise ValidationError('File size %d != known size %d' % (file_size, known_size)) + handle = open(path.absolute_path, 'rb') + hasher = HASH_CLASS() + checked_bytes = 0 + with handle: + while True: + chunk = handle.read(CHUNK_SIZE) + if not chunk: + break + hasher.update(chunk) + checked_bytes += len(chunk) + if callback is not None: + callback(path, checked_bytes, file_size) + + file_hash = hasher.hexdigest() + if file_hash != known_hash: + raise ValidationError('File hash "%s" != known hash "%s"' % (file_hash, known_hash)) + log.debug('Hash validation passed.') + + +def walk_generator( + path='.', + *, + callback_exclusion=None, + callback_permission_denied=None, + depth_first=True, + exclude_directories=None, + exclude_filenames=None, + recurse=True, + yield_directories=False, + yield_files=True, + yield_style='flat', + ): + ''' + Yield Path objects for files in the file tree, similar to os.walk. + + callback_exclusion: + This function will be called when a file or directory is excluded with + two parameters: the path, and 'file' or 'directory'. + + Default = None + + exclude_filenames: + A set of filenames that will not be copied. Entries can be absolute + paths to exclude that particular file, or plain names to exclude + all matches. For example: + {'C:\\folder\\file.txt', 'desktop.ini'} + + Default = None + + exclude_directories: + A set of directories that will not be copied. Entries can be + absolute paths to exclude that particular directory, or plain names + to exclude all matches. For example: + {'C:\\folder', 'thumbnails'} + + Default = None + + recurse: + Yield from subdirectories. If False, only immediate files are returned. + + yield_directories: + Should the generator produce directories? Has no effect in nested yield style. + + yield_files: + Should the generator produce files? Has no effect in nested yield style. + + yield_style: + If 'flat', yield individual files one by one in a constant stream. + If 'nested', yield tuple(root, directories, files) like os.walk does, + except I use Path objects with absolute paths for everything. + ''' + if not yield_directories and not yield_files: + raise ValueError('yield_directories and yield_files cannot both be False') + + if yield_style not in ['flat', 'nested']: + raise ValueError('Invalid yield_style %s. Either "flat" or "nested".' % repr(yield_style)) + + if exclude_directories is None: + exclude_directories = set() + + if exclude_filenames is None: + exclude_filenames = set() + + callback_exclusion = callback_exclusion or do_nothing + callback_permission_denied = callback_permission_denied or do_nothing + + exclude_filenames = {normalize(f) for f in exclude_filenames} + exclude_directories = {normalize(f) for f in exclude_directories} + + path = str_to_fp(path) + path.correct_case() + + # Considering full paths + if normalize(path.absolute_path) in exclude_directories: + callback_exclusion(path.absolute_path, 'directory') + return + + # Considering folder names + if normalize(path.basename) in exclude_directories: + callback_exclusion(path.absolute_path, 'directory') + return + + directory_queue = collections.deque() + directory_queue.append(path) + + # This is a recursion-free workplace. + # Thank you for your cooperation. + while len(directory_queue) > 0: + current_location = directory_queue.popleft() + log.debug('listdir: %s', current_location.absolute_path) + try: + contents = os.listdir(current_location.absolute_path) + except PermissionError as exception: + callback_permission_denied(current_location, exception) + continue + log.debug('received %d items', len(contents)) + + if yield_style == 'flat' and yield_directories: + yield current_location + + directories = [] + files = [] + for base_name in contents: + absolute_name = os.path.join(current_location.absolute_path, base_name) + + if os.path.isdir(absolute_name): + exclude = ( + normalize(absolute_name) in exclude_directories or + normalize(base_name) in exclude_directories + ) + if exclude: + callback_exclusion(absolute_name, 'directory') + continue + + directory = str_to_fp(absolute_name) + directories.append(directory) + + elif yield_style == 'flat' and not yield_files: + continue + + else: + exclude = normalize(absolute_name) in exclude_filenames + exclude |= normalize(base_name) in exclude_filenames + if exclude: + callback_exclusion(absolute_name, 'file') + continue + + fp = str_to_fp(absolute_name) + if yield_style == 'flat': + yield fp + else: + files.append(fp) + + if yield_style == 'nested': + yield (current_location, directories, files) + + if not recurse: + break + + if depth_first: + # Extendleft causes them to get reversed, so flip it first. + directories.reverse() + directory_queue.extendleft(directories) + else: + directory_queue.extend(directories) diff --git a/voussoirkit/sqlhelpers.py b/voussoirkit/sqlhelpers.py new file mode 100644 index 0000000..9a19964 --- /dev/null +++ b/voussoirkit/sqlhelpers.py @@ -0,0 +1,109 @@ +def delete_filler(pairs): + ''' + Manually aligning the bindings for DELETE statements is annoying. + Given a dictionary of {column: value}, return the "WHERE ..." portion of + the query and the bindings in the correct order. + + Example: + pairs={'test': 'toast', 'ping': 'pong'} + -> + returns ('WHERE test = ? AND ping = ?', ['toast', 'pong']) + + In context: + (qmarks, bindings) = delete_filler(pairs) + query = 'DELETE FROM table %s' % qmarks + cur.execute(query, bindings) + ''' + qmarks = [] + bindings = [] + for (key, value) in pairs.items(): + qmarks.append('%s = ?' % key) + bindings.append(value) + qmarks = ' AND '.join(qmarks) + qmarks = 'WHERE %s' % qmarks + return (qmarks, bindings) + +def insert_filler(column_names, values, require_all=True): + ''' + Manually aligning the bindings for INSERT statements is annoying. + Given the table's column names and a dictionary of {column: value}, + return the question marks and the list of bindings in the right order. + + require_all: + If `values` does not contain one of the column names, should we raise + an exception? + Otherwise, that column will simply receive None. + + Example: + column_names=['id', 'name', 'score'], + values={'score': 20, 'id': '1111', 'name': 'James'} + -> + returns ('?, ?, ?', ['1111', 'James', 20]) + + In context: + (qmarks, bindings) = insert_filler(COLUMN_NAMES, data) + query = 'INSERT INTO table VALUES(%s)' % qmarks + cur.execute(query, bindings) + ''' + values = values.copy() + for column in column_names: + if column in values: + continue + if require_all: + raise ValueError('Missing column "%s"' % column) + else: + values[column] = None + qmarks = '?' * len(column_names) + qmarks = ', '.join(qmarks) + bindings = [values[column] for column in column_names] + return (qmarks, bindings) + +def update_filler(pairs, where_key): + ''' + Manually aligning the bindings for UPDATE statements is annoying. + Given a dictionary of {column: value} as well as the name of the column + to be used as the WHERE, return the "SET ..." portion of the query and the + bindings in the correct order. + + If the where_key needs to be reassigned also, let its value be a 2-tuple + where [0] is the current value used for WHERE, and [1] is the new value + used for SET. + + Example: + pairs={'id': '1111', 'name': 'James', 'score': 20}, + where_key='id' + -> + returns ('SET name = ?, score = ? WHERE id == ?', ['James', 20, '1111']) + + Example: + pairs={'filepath': ('/oldplace', '/newplace')}, + where_key='filepath' + -> + returns ('SET filepath = ? WHERE filepath == ?', ['/newplace', '/oldplace']) + + In context: + (qmarks, bindings) = update_filler(data, where_key) + query = 'UPDATE table %s' % qmarks + cur.execute(query, bindings) + ''' + pairs = pairs.copy() + where_value = pairs.pop(where_key) + if isinstance(where_value, tuple): + (where_value, pairs[where_key]) = where_value + if isinstance(where_value, dict): + where_value = where_value['old'] + pairs[where_key] = where_value['new'] + + if len(pairs) == 0: + raise ValueError('No pairs left after where_key.') + + qmarks = [] + bindings = [] + for (key, value) in pairs.items(): + qmarks.append('%s = ?' % key) + bindings.append(value) + bindings.append(where_value) + setters = ', '.join(qmarks) + qmarks = 'SET {setters} WHERE {where_key} == ?' + qmarks = qmarks.format(setters=setters, where_key=where_key) + return (qmarks, bindings) diff --git a/voussoirkit/treeclass.py b/voussoirkit/treeclass.py new file mode 100644 index 0000000..5af09f2 --- /dev/null +++ b/voussoirkit/treeclass.py @@ -0,0 +1,83 @@ +import os + +class ExistingChild(Exception): + pass + +class InvalidIdentifier(Exception): + pass + +class Tree: + def __init__(self, identifier, data=None): + self.assert_identifier_ok(identifier) + self.identifier = identifier + self.data = data + self.parent = None + self.children = {} + + def __eq__(self, other): + return isinstance(other, Tree) and self.abspath() == other.abspath() + + def __getitem__(self, key): + return self.children[key] + + def __hash__(self): + return hash(self.abspath()) + + def __repr__(self): + return 'Tree(%s)' % self.identifier + + @staticmethod + def assert_identifier_ok(identifier): + if not isinstance(identifier, str): + raise InvalidIdentifier(f'Identifier {identifier} must be a string.') + + if '/' in identifier or '\\' in identifier: + raise InvalidIdentifier('Identifier cannot contain slashes') + + def abspath(self): + node = self + nodes = [node] + while nodes[-1].parent is not None: + nodes.append(nodes[-1].parent) + nodes.reverse() + nodes = [node.identifier for node in nodes] + return '\\'.join(nodes) + + def add_child(self, other_node, overwrite_parent=False): + self.assert_child_available(other_node.identifier) + if other_node.parent is not None and not overwrite_parent: + raise ValueError('That node already has a parent. Try `overwrite_parent=True`') + + other_node.parent = self + self.children[other_node.identifier] = other_node + return other_node + + def assert_child_available(self, identifier): + if identifier in self.children: + raise ExistingChild(f'Node {self.identifier} already has child {identifier}') + + def detach(self): + if self.parent is None: + return + + del self.parent.children[self.identifier] + self.parent = None + + def list_children(self, sort=None): + children = list(self.children.values()) + if sort is None: + children.sort(key=lambda node: (node.identifier.lower(), node.identifier)) + else: + children.sort(key=sort) + return children + + def walk(self, sort=None): + yield self + for child in self.list_children(sort=sort): + yield from child.walk(sort=sort) + + def walk_parents(self): + parent = self.parent + while parent is not None: + yield parent + parent = parent.parent