Initial migratory commit.

This commit is contained in:
Ethan Dalool 2018-12-17 22:10:00 -08:00
commit b4b51e3604
22 changed files with 3105 additions and 0 deletions

4
README.md Normal file
View file

@ -0,0 +1,4 @@
voussoirkit
===========
This is a collection of tools that I use often and import into my other projects.

11
phase2.py Normal file
View file

@ -0,0 +1,11 @@
import shutil
import os
def delete(folder):
try:
shutil.rmtree(folder)
except:
pass
delete('dist')
delete('voussoirkit.egg-info')

12
setup.py Normal file
View file

@ -0,0 +1,12 @@
import setuptools
setuptools.setup(
name='voussoirkit',
packages=setuptools.find_packages(),
version='0.0.26',
author='voussoir',
author_email='ethan@voussoir.net',
description='voussoir\'s toolkit',
url='https://github.com/voussoir/voussoirkit',
install_requires=['pyperclip']
)

3
voussoirkit.bat Normal file
View file

@ -0,0 +1,3 @@
rem py setup.py register -r https://upload.pypi.org/legacy/
py setup.py sdist upload -r https://upload.pypi.org/legacy/
phase2

0
voussoirkit/__init__.py Normal file
View file

81
voussoirkit/basenumber.py Normal file
View file

@ -0,0 +1,81 @@
import string
ALPHABET = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
def from_base(number, base, alphabet=None):
if base < 2:
raise ValueError('base must be >= 2.')
if not isinstance(base, int):
raise TypeError('base must be an int.')
if base == 10:
return int(number)
if alphabet is None:
alphabet = ALPHABET
number = str(number)
alphabet = alphabet[:base]
if number.count('.') > 1:
raise ValueError('Too many decimal points')
mixed_case = any(c in string.ascii_uppercase for c in alphabet) and \
any(c in string.ascii_lowercase for c in alphabet)
if not mixed_case:
alphabet = alphabet.upper()
number = number.upper()
char_set = set(number.replace('.', '', 1))
alpha_set = set(alphabet)
differences = char_set.difference(alpha_set)
if len(differences) > 0:
raise ValueError('Unknown characters for base', base, differences)
alpha_dict = {character:index for (index, character) in enumerate(alphabet)}
try:
decimal_pos = number.index('.')
except ValueError:
decimal_pos = len(number)
result = 0
for (index, character) in enumerate(number):
if index == decimal_pos:
continue
power = (decimal_pos - index)
if index < decimal_pos:
power -= 1
value = alpha_dict[character] * (base ** power)
#print(value)
result += value
return result
def to_base(number, base, decimal_places=10, alphabet=None):
if base < 2:
raise ValueError('base must be >= 2.')
if not isinstance(base, int):
raise TypeError('base must be an int.')
if base == 10:
return str(number)
if alphabet is None:
alphabet = ALPHABET
if base > len(alphabet):
raise ValueError('Not enough symbols in alphabet for base %d' % base)
result = ''
whole_portion = int(number)
float_portion = number - whole_portion
while whole_portion > 0:
(whole_portion, remainder) = divmod(whole_portion, base)
result = alphabet[remainder] + result
if float_portion != 0:
result += '.'
for x in range(decimal_places):
float_portion *= base
whole = int(float_portion)
float_portion -= whole
result += alphabet[whole]
return result

141
voussoirkit/bytestring.py Normal file
View file

@ -0,0 +1,141 @@
import re
import sys
from voussoirkit import clipext
__VERSION__ = '0.0.1'
BYTE = 1
KIBIBYTE = 1024 * BYTE
MIBIBYTE = 1024 * KIBIBYTE
GIBIBYTE = 1024 * MIBIBYTE
TEBIBYTE = 1024 * GIBIBYTE
PEBIBYTE = 1024 * TEBIBYTE
EXIBYTE = 1024 * PEBIBYTE
ZEBIBYTE = 1024 * EXIBYTE
YOBIBYTE = 1024 * ZEBIBYTE
BYTE_STRING = 'b'
KIBIBYTE_STRING = 'KiB'
MIBIBYTE_STRING = 'MiB'
GIBIBYTE_STRING = 'GiB'
TEBIBYTE_STRING = 'TiB'
PEBIBYTE_STRING = 'PiB'
EXIBYTE_STRING = 'EiB'
ZEBIBYTE_STRING = 'ZiB'
YOBIBYTE_STRING = 'YiB'
UNIT_STRINGS = {
BYTE: BYTE_STRING,
KIBIBYTE: KIBIBYTE_STRING,
MIBIBYTE: MIBIBYTE_STRING,
GIBIBYTE: GIBIBYTE_STRING,
TEBIBYTE: TEBIBYTE_STRING,
PEBIBYTE: PEBIBYTE_STRING,
EXIBYTE: EXIBYTE_STRING,
ZEBIBYTE: ZEBIBYTE_STRING,
YOBIBYTE: YOBIBYTE_STRING,
}
REVERSED_UNIT_STRINGS = {value: key for (key, value) in UNIT_STRINGS.items()}
UNIT_SIZES = sorted(UNIT_STRINGS.keys(), reverse=True)
def bytestring(size, decimal_places=3, force_unit=None):
'''
Convert a number into string.
force_unit:
If None, an appropriate size unit is chosen automatically.
Otherwise, you can provide one of the size constants to force that divisor.
'''
if force_unit is None:
divisor = get_appropriate_divisor(size)
else:
if isinstance(force_unit, str):
force_unit = normalize_unit_string(force_unit)
force_unit = REVERSED_UNIT_STRINGS[force_unit]
divisor = force_unit
size_unit_string = UNIT_STRINGS[divisor]
size_string = '{number:.0{decimal_places}f} {unit}'
size_string = size_string.format(
decimal_places=decimal_places,
number=size/divisor,
unit=size_unit_string,
)
return size_string
def get_appropriate_divisor(size):
'''
Return the divisor that would be appropriate for displaying this byte size.
For example:
1000 => 1 to display 1,000 b
1024 => 1024 to display 1 KiB
123456789 => 1048576 to display 117.738 MiB
'''
size = abs(size)
for unit in UNIT_SIZES:
if size >= unit:
appropriate_unit = unit
break
else:
appropriate_unit = 1
return appropriate_unit
def normalize_unit_string(string):
'''
Given a string "k" or "kb" or "kib" in any case, return "KiB", etc.
'''
string = string.lower()
for (size, unit_string) in UNIT_STRINGS.items():
unit_string_l = unit_string.lower()
if string in (unit_string_l, unit_string_l[0], unit_string_l.replace('i', '')):
return unit_string
raise ValueError('Unrecognized unit string "%s"' % string)
def parsebytes(string):
'''
Given a string like "100 kib", return the appropriate integer value.
Accepts "k", "kb", "kib" in any casing.
'''
string = string.lower().strip()
string = string.replace(' ', '').replace(',', '')
matches = re.findall('((\\.|-|\\d)+)', string)
if len(matches) == 0:
raise ValueError('No numbers found')
if len(matches) > 1:
raise ValueError('Too many numbers found')
byte_value = matches[0][0]
if not string.startswith(byte_value):
raise ValueError('Number is not at start of string')
# if the string has no text besides the number, just return that int.
string = string.replace(byte_value, '')
byte_value = float(byte_value)
if string == '':
return int(byte_value)
unit_string = normalize_unit_string(string)
multiplier = REVERSED_UNIT_STRINGS[unit_string]
return int(byte_value * multiplier)
def main(args=None):
if args is None:
args = sys.argv[1:]
if len(args) != 1:
print('Usage: bytestring.py <number>')
return 1
number = clipext.resolve(sys.argv[1])
n = int(number)
print(bytestring(n))
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))

43
voussoirkit/cacheclass.py Normal file
View file

@ -0,0 +1,43 @@
import collections
class Cache:
def __init__(self, maxlen):
self.maxlen = maxlen
self.cache = collections.OrderedDict()
def __contains__(self, key):
return key in self.cache
def __getitem__(self, key):
value = self.cache.pop(key)
self.cache[key] = value
return value
def __len__(self):
return len(self.cache)
def __setitem__(self, key, value):
try:
self.cache.pop(key)
except KeyError:
if len(self.cache) >= self.maxlen:
self.cache.popitem(last=False)
self.cache[key] = value
def clear(self):
self.cache.clear()
def get(self, key, fallback=None):
try:
return self[key]
except KeyError:
return fallback
def pop(self, key):
return self.cache.pop(key)
def remove(self, key):
try:
self.pop(key)
except KeyError:
pass

44
voussoirkit/clipext.py Normal file
View file

@ -0,0 +1,44 @@
import pyperclip
CLIPBOARD_STRINGS = ['!c', '!clip', '!clipboard']
INPUT_STRINGS = ['!i', '!in', '!input', '!stdin']
EOF = '\x1a'
def _input_lines():
while True:
try:
additional = input()
except EOFError:
# If you enter nothing but ctrl-z
additional = EOF
additional = additional.split(EOF)
has_eof = len(additional) > 1
additional = additional[0]
yield additional
if has_eof:
break
def multi_line_input(split_lines=False):
generator = _input_lines()
if split_lines:
return generator
else:
return '\n'.join(generator)
def resolve(arg, split_lines=False):
lowered = arg.lower()
if lowered in INPUT_STRINGS:
return multi_line_input(split_lines=split_lines)
elif lowered in CLIPBOARD_STRINGS:
text = pyperclip.paste()
else:
text = arg
if split_lines:
lines = text.splitlines()
return lines
else:
return text

468
voussoirkit/downloady.py Normal file
View file

@ -0,0 +1,468 @@
import argparse
import os
import pyperclip
import requests
import sys
import time
import urllib
import warnings
# pip install voussoirkit
from voussoirkit import bytestring
from voussoirkit import ratelimiter
from voussoirkit import clipext
warnings.simplefilter('ignore')
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.152 Safari/537.36'
}
FILENAME_BADCHARS = '*?"<>|\r\n'
last_request = 0
CHUNKSIZE = 4 * bytestring.KIBIBYTE
TIMEOUT = 60
TEMP_EXTENSION = '.downloadytemp'
PRINT_LIMITER = ratelimiter.Ratelimiter(allowance=5, mode='reject')
class NotEnoughBytes(Exception):
pass
def download_file(
url,
localname=None,
auth=None,
bytespersecond=None,
callback_progress=None,
do_head=True,
headers=None,
overwrite=False,
raise_for_undersized=True,
timeout=None,
verbose=False,
verify_ssl=True,
):
headers = headers or {}
url = sanitize_url(url)
if localname in [None, '']:
localname = basename_from_url(url)
if os.path.isdir(localname):
localname = os.path.join(localname, basename_from_url(url))
localname = sanitize_filename(localname)
if localname != os.devnull:
localname = os.path.abspath(localname)
if verbose:
safeprint(' URL:', url)
safeprint('File:', localname)
plan = prepare_plan(
url,
localname,
auth=auth,
bytespersecond=bytespersecond,
callback_progress=callback_progress,
do_head=do_head,
headers=headers,
overwrite=overwrite,
raise_for_undersized=raise_for_undersized,
timeout=timeout,
verify_ssl=verify_ssl,
)
#print(plan)
if plan is None:
return
return download_plan(plan)
def download_plan(plan):
localname = plan['download_into']
directory = os.path.split(localname)[0]
if directory != '':
os.makedirs(directory, exist_ok=True)
touch(localname)
file_handle = open(localname, 'r+b')
file_handle.seek(plan['seek_to'])
if plan['header_range_min'] is not None:
plan['headers']['range'] = 'bytes={min}-{max}'.format(
min=plan['header_range_min'],
max=plan['header_range_max'],
)
if plan['plan_type'] == 'resume':
bytes_downloaded = plan['seek_to']
elif plan['plan_type'] == 'partial':
bytes_downloaded = plan['seek_to']
else:
bytes_downloaded = 0
download_stream = request(
'get',
plan['url'],
stream=True,
auth=plan['auth'],
headers=plan['headers'],
timeout=plan['timeout'],
verify_ssl=plan['verify_ssl'],
)
if plan['remote_total_bytes'] is None:
# Since we didn't do a head, let's fill this in now.
plan['remote_total_bytes'] = int(download_stream.headers.get('Content-Length', 0))
callback_progress = plan['callback_progress']
if callback_progress is not None:
callback_progress = callback_progress(plan['remote_total_bytes'])
for chunk in download_stream.iter_content(chunk_size=CHUNKSIZE):
bytes_downloaded += len(chunk)
file_handle.write(chunk)
if callback_progress is not None:
callback_progress.step(bytes_downloaded)
if plan['limiter'] is not None and bytes_downloaded < plan['remote_total_bytes']:
plan['limiter'].limit(len(chunk))
file_handle.close()
# Don't try to rename /dev/null
if os.devnull not in [localname, plan['real_localname']]:
localsize = os.path.getsize(localname)
undersized = plan['plan_type'] != 'partial' and localsize < plan['remote_total_bytes']
if plan['raise_for_undersized'] and undersized:
message = 'File does not contain expected number of bytes. Received {size} / {total}'
message = message.format(size=localsize, total=plan['remote_total_bytes'])
raise NotEnoughBytes(message)
if localname != plan['real_localname']:
os.rename(localname, plan['real_localname'])
return plan['real_localname']
def prepare_plan(
url,
localname,
auth=None,
bytespersecond=None,
callback_progress=None,
do_head=True,
headers=None,
overwrite=False,
raise_for_undersized=True,
timeout=TIMEOUT,
verify_ssl=True,
):
# Chapter 1: File existence
headers = headers or {}
user_provided_range = 'range' in headers
real_localname = localname
temp_localname = localname + TEMP_EXTENSION
real_exists = os.path.exists(real_localname)
if real_exists and overwrite is False and not user_provided_range:
print('File exists and overwrite is off. Nothing to do.')
return None
temp_exists = os.path.exists(temp_localname)
real_localsize = int(real_exists and os.path.getsize(real_localname))
temp_localsize = int(temp_exists and os.path.getsize(temp_localname))
# Chapter 2: Ratelimiting
if bytespersecond is None:
limiter = None
elif isinstance(bytespersecond, ratelimiter.Ratelimiter):
limiter = bytespersecond
else:
limiter = ratelimiter.Ratelimiter(allowance=bytespersecond)
# Chapter 3: Extracting range
if user_provided_range:
user_range_min = int(headers['range'].split('bytes=')[1].split('-')[0])
user_range_max = headers['range'].split('-')[1]
if user_range_max != '':
user_range_max = int(user_range_max)
else:
user_range_min = None
user_range_max = None
# Chapter 4: Server range support
# Always include a range on the first request to figure out whether the
# server supports it. Use 0- to get correct remote_total_bytes
temp_headers = headers
temp_headers.update({'range': 'bytes=0-'})
if do_head:
# I'm using a GET instead of an actual HEAD here because some servers respond
# differently, even though they're not supposed to.
head = request('get', url, stream=True, headers=temp_headers, auth=auth)
remote_total_bytes = int(head.headers.get('content-length', 0))
server_respects_range = (head.status_code == 206 and 'content-range' in head.headers)
head.connection.close()
else:
remote_total_bytes = None
server_respects_range = False
if user_provided_range and not server_respects_range:
if not do_head:
raise Exception('Cannot determine range support without the head request')
else:
raise Exception('Server did not respect your range header')
# Chapter 5: Plan definitions
plan_base = {
'url': url,
'auth': auth,
'callback_progress': callback_progress,
'limiter': limiter,
'headers': headers,
'real_localname': real_localname,
'raise_for_undersized': raise_for_undersized,
'remote_total_bytes': remote_total_bytes,
'timeout': timeout,
'verify_ssl': verify_ssl,
}
plan_fulldownload = dict(
plan_base,
download_into=temp_localname,
header_range_min=None,
header_range_max=None,
plan_type='fulldownload',
seek_to=0,
)
plan_resume = dict(
plan_base,
download_into=temp_localname,
header_range_min=temp_localsize,
header_range_max='',
plan_type='resume',
seek_to=temp_localsize,
)
plan_partial = dict(
plan_base,
download_into=real_localname,
header_range_min=user_range_min,
header_range_max=user_range_max,
plan_type='partial',
seek_to=user_range_min,
)
# Chapter 6: Redeem your meal vouchers here
if real_exists:
if overwrite:
os.remove(real_localname)
if user_provided_range:
return plan_partial
return plan_fulldownload
elif temp_exists and temp_localsize > 0:
if overwrite:
return plan_fulldownload
if user_provided_range:
return plan_partial
if server_respects_range:
print('Resume from byte %d' % plan_resume['seek_to'])
return plan_resume
else:
if user_provided_range:
return plan_partial
return plan_fulldownload
raise Exception('No plan was chosen?')
class Progress1:
def __init__(self, total_bytes):
self.limiter = ratelimiter.Ratelimiter(allowance=8, mode='reject')
self.limiter.balance = 1
self.total_bytes = max(1, total_bytes)
self.divisor = bytestring.get_appropriate_divisor(total_bytes)
self.total_format = bytestring.bytestring(total_bytes, force_unit=self.divisor)
self.downloaded_format = '{:>%d}' % len(self.total_format)
self.blank_char = ' '
self.solid_char = ''
def step(self, bytes_downloaded):
#print(self.limiter.balance)
percent = bytes_downloaded / self.total_bytes
percent = min(1, percent)
if self.limiter.limit(1) is False and percent < 1:
return
downloaded_string = bytestring.bytestring(bytes_downloaded, force_unit=self.divisor)
downloaded_string = self.downloaded_format.format(downloaded_string)
block_count = 50
solid_blocks = self.solid_char * int(block_count * percent)
statusbar = solid_blocks.ljust(block_count, self.blank_char)
statusbar = self.solid_char + statusbar + self.solid_char
end = '\n' if percent == 1 else ''
message = '\r{bytes_downloaded} {statusbar} {total_bytes}'
message = message.format(
bytes_downloaded=downloaded_string,
total_bytes=self.total_format,
statusbar=statusbar,
)
print(message, end=end, flush=True)
class Progress2:
def __init__(self, total_bytes):
self.total_bytes = max(1, total_bytes)
self.limiter = ratelimiter.Ratelimiter(allowance=8, mode='reject')
self.limiter.balance = 1
self.total_bytes_string = '{:,}'.format(self.total_bytes)
self.bytes_downloaded_string = '{:%d,}' % len(self.total_bytes_string)
def step(self, bytes_downloaded):
percent = (bytes_downloaded * 100) / self.total_bytes
percent = min(100, percent)
if self.limiter.limit(1) is False and percent < 100:
return
percent_string = '%08.4f' % percent
bytes_downloaded_string = self.bytes_downloaded_string.format(bytes_downloaded)
end = '\n' if percent == 100 else ''
message = '\r{bytes_downloaded} / {total_bytes} / {percent}%'
message = message.format(
bytes_downloaded=bytes_downloaded_string,
total_bytes=self.total_bytes_string,
percent=percent_string,
)
print(message, end=end, flush=True)
def basename_from_url(url):
'''
Determine the local filename appropriate for a URL.
'''
localname = urllib.parse.unquote(url)
localname = localname.rstrip('/')
localname = localname.split('?')[0]
localname = localname.split('/')[-1]
return localname
def get_permission(prompt='y/n\n>', affirmative=['y', 'yes']):
permission = input(prompt)
return permission.lower() in affirmative
def request(method, url, stream=False, headers=None, timeout=TIMEOUT, verify_ssl=True, **kwargs):
if headers is None:
headers = {}
for (key, value) in HEADERS.items():
headers.setdefault(key, value)
session = requests.Session()
a = requests.adapters.HTTPAdapter(max_retries=30)
b = requests.adapters.HTTPAdapter(max_retries=30)
session.mount('http://', a)
session.mount('https://', b)
session.max_redirects = 40
method = {
'get': session.get,
'head': session.head,
'post': session.post,
}[method]
req = method(url, stream=stream, headers=headers, timeout=timeout, verify=verify_ssl, **kwargs)
req.raise_for_status()
return req
def safeprint(*texts, **kwargs):
texts = [str(text).encode('ascii', 'replace').decode() for text in texts]
print(*texts, **kwargs)
def sanitize_filename(text, exclusions=''):
to_remove = FILENAME_BADCHARS
for exclude in exclusions:
to_remove = to_remove.replace(exclude, '')
for char in to_remove:
text = text.replace(char, '')
(drive, path) = os.path.splitdrive(text)
path = path.replace(':', '')
text = drive + path
return text
def sanitize_url(url):
url = url.replace('%3A//', '://')
return url
def touch(filename):
f = open(filename, 'ab')
f.close()
return
def download_argparse(args):
url = args.url
url = clipext.resolve(url)
callback = {
None: Progress1,
'1': Progress1,
'2': Progress2,
}.get(args.callback, args.callback)
bytespersecond = args.bytespersecond
if bytespersecond is not None:
bytespersecond = bytestring.parsebytes(bytespersecond)
headers = {}
if args.range is not None:
headers['range'] = 'bytes=%s' % args.range
retry = args.retry
if not retry:
retry = 1
while retry != 0:
# Negative numbers permit infinite retries.
try:
download_file(
url=url,
localname=args.localname,
bytespersecond=bytespersecond,
callback_progress=callback,
do_head=args.no_head is False,
headers=headers,
overwrite=args.overwrite,
timeout=args.timeout,
verbose=True,
verify_ssl=args.no_ssl is False,
)
except (NotEnoughBytes, requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError):
retry -= 1
if retry == 0:
raise
else:
break
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('url')
parser.add_argument('localname', nargs='?', default=None)
parser.add_argument('-c', '--callback', dest='callback', default=Progress1)
parser.add_argument('-bps', '--bytespersecond', dest='bytespersecond', default=None)
parser.add_argument('-ow', '--overwrite', dest='overwrite', action='store_true')
parser.add_argument('-r', '--range', dest='range', default=None)
parser.add_argument('--timeout', dest='timeout', type=int, default=TIMEOUT)
parser.add_argument('--retry', dest='retry', const=-1, nargs='?', type=int, default=1)
parser.add_argument('--no-head', dest='no_head', action='store_true')
parser.add_argument('--no-ssl', dest='no_ssl', action='store_true')
parser.set_defaults(func=download_argparse)
args = parser.parse_args()
args.func(args)

View file

@ -0,0 +1,50 @@
import datetime
import time
EPOCH = datetime.datetime(
year=1993,
month=9,
day=1,
tzinfo=datetime.timezone.utc,
)
def normalize_date(date):
if isinstance(date, datetime.datetime):
pass
elif isinstance(date, (int, float)):
date = datetime.datetime.utcfromtimestamp(date)
date = date.replace(tzinfo=datetime.timezone.utc)
else:
raise TypeError('Unrecognized date type.')
return date
def now():
return datetime.datetime.now(datetime.timezone.utc)
def september_day(date):
'''
Return the ES day of the month for this date.
'''
date = normalize_date(date)
diff = date - EPOCH
days = diff.days + 1
return days
def september_string(date, strftime):
'''
Return the ES formatted string for this date.
'''
date = normalize_date(date)
day = str(september_day(date))
strftime = strftime.replace('%a', date.strftime('%a'))
strftime = strftime.replace('%A', date.strftime('%A'))
strftime = strftime.replace('%d', day)
strftime = strftime.replace('%-d', day)
date = date.replace(month=EPOCH.month, year=EPOCH.year)
return date.strftime(strftime)
if __name__ == '__main__':
print(september_string(now(), '%Y %B %d %H:%M:%S'))

View file

@ -0,0 +1,548 @@
import time
ESCAPE_SEQUENCES = {
'\\': '\\',
'"': '"',
}
BINARY_OPERATORS = {'AND', 'OR', 'XOR'}
UNARY_OPERATORS = {'NOT'}
PRECEDENCE = ['NOT', 'AND', 'XOR', 'OR']
OPERATORS = BINARY_OPERATORS | UNARY_OPERATORS
# Sentinel values used for breaking up the tokens, so we dont' have to use
# strings '(' and ')' which can get confused with user input.
PAREN_OPEN = object()
PAREN_CLOSE = object()
DEFAULT_MATCH_FUNCTION = str.__contains__
MESSAGE_WRITE_YOUR_OWN_MATCHER = '''
The default match function is {function}.
Consider passing your own `match_function`, which accepts two
positional arguments:
1. The object being tested.
2. The Expression token, a string.
'''.strip()
def func_and(values):
return all(values)
def func_or(values):
return any(values)
def func_xor(values):
values = list(values)
return values.count(True) % 2 == 1
def func_not(value):
value = list(value)
if len(value) != 1:
raise ValueError('NOT only takes 1 value')
return not value[0]
OPERATOR_FUNCTIONS = {
'AND': func_and,
'OR': func_or,
'XOR': func_xor,
'NOT': func_not,
}
class NoTokens(Exception):
pass
class ExpressionTree:
def __init__(self, token, parent=None):
self.children = []
self.parent = parent
self.token = token
def __str__(self):
if self.token is None:
return '""'
if self.token not in OPERATORS:
t = self.token
t = t.replace('"', '\\"')
t = t.replace('(', '\\(')
t = t.replace(')', '\\)')
if ' ' in t:
t = '"%s"' % t
return t
if len(self.children) == 1:
child = self.children[0]
childstring = str(child)
if child.token in OPERATORS:
childstring = '(%s)' % childstring
return '%s%s' % (self.token, childstring)
return '%s %s' % (self.token, childstring)
children = []
for child in self.children:
childstring = str(child)
if child.token in OPERATORS:
childstring = '(%s)' % childstring
children.append(childstring)
#children = [str(child) for child in self.children]
if len(children) == 1:
return '%s %s' % (self.token, children[0])
s = ' %s ' % self.token
s = s.join(children)
return s
@classmethod
def parse(cls, tokens, spaces=0):
if isinstance(tokens, str):
tokens = tokenize(tokens)
if tokens == []:
raise NoTokens()
if isinstance(tokens[0], list):
current = cls.parse(tokens[0], spaces=spaces+1)
else:
current = cls(token=tokens[0])
for token in tokens[1:]:
##print(' '*spaces, 'cur', current, current.token)
if isinstance(token, list):
new = cls.parse(token, spaces=spaces+1)
else:
new = cls(token=token)
##print(' '*spaces, 'new', new)
if 0 == 1:
pass
elif current.token not in OPERATORS:
if new.token in BINARY_OPERATORS:
if len(new.children) == 0:
new.children.append(current)
current.parent = new
current = new
else:
raise Exception('Expected binary operator, got %s.' % new.token)
elif current.token in BINARY_OPERATORS:
if new.token in BINARY_OPERATORS:
if new.token == current.token:
for child in new.children:
child.parent = current
current.children.extend(new.children)
else:
if len(new.children) == 0:
new.children.append(current)
current.parent = new
current = new
else:
current.children.append(new)
new.parent = current
elif new.token in UNARY_OPERATORS:
if len(new.children) == 0:
current.children.append(new)
new.parent = current
current = new
else:
current.children.append(new)
new.parent = current
elif new.token not in OPERATORS:
if len(current.children) > 0:
current.children.append(new)
new.parent = current
else:
raise Exception('Expected current children > 0.')
elif current.token in UNARY_OPERATORS:
if len(current.children) == 0:
current.children.append(new)
new.parent = current
if current.parent is not None:
current = current.parent
elif new.token in BINARY_OPERATORS:
if len(new.children) == 0:
new.children.append(current)
current.parent = new
current = new
else:
current.children.append(new)
new.parent = current
if current.parent is not None:
current = current.parent
else:
raise Exception('Expected new to be my operand or parent binary.')
##print(' '*spaces, 'fin:', current.rootmost(), '\n')
current = current.rootmost()
##print('---', current)
return current
def _evaluate(self, text, match_function=None):
if self.token not in OPERATORS:
if match_function is None:
match_function = DEFAULT_MATCH_FUNCTION
value = match_function(text, self.token)
#print(self.token, value)
return value
operator_function = OPERATOR_FUNCTIONS[self.token]
children = (child.evaluate(text, match_function=match_function) for child in self.children)
return operator_function(children)
def diagram(self):
if self.token is None:
return '""'
t = self.token
if ' ' in t:
t = '"%s"' % t
output = t
indent = 1
for child in self.children:
child = child.diagram()
for line in child.splitlines():
output += (' ' * indent)
output += line + '\n'
indent = len(t) + 1
output = output.strip()
return output
def evaluate(self, text, match_function=None):
if match_function is None:
match_function = DEFAULT_MATCH_FUNCTION
try:
return self._evaluate(text, match_function)
except Exception as e:
if match_function is DEFAULT_MATCH_FUNCTION:
message = MESSAGE_WRITE_YOUR_OWN_MATCHER.format(function=DEFAULT_MATCH_FUNCTION)
override = Exception(message)
raise override from e
raise e
@property
def is_leaf(self):
return self.token not in OPERATORS
def map(self, function):
'''
Apply this function to all of the operands.
'''
for node in self.walk_leaves():
node.token = function(node.token)
def prune(self):
'''
Remove any nodes where `token` is None.
'''
self.children = [child for child in self.children if child.token is not None]
for child in self.children:
child.prune()
if self.token in OPERATORS and len(self.children) == 0:
self.token = None
if self.parent is not None:
self.parent.children.remove(self)
def rootmost(self):
current = self
while current.parent is not None:
current = current.parent
return current
def walk(self):
yield self
for child in self.children:
yield from child.walk()
def walk_leaves(self):
for node in self.walk():
if node.is_leaf:
yield node
def implied_tokens(tokens):
'''
1. If two operands are directly next to each other, or an operand is followed
by a unary operator, it is implied that there is an AND between them.
'1 2' -> '1 AND 2'
'1 NOT 2' -> '1 AND NOT 2'
2. If an expression begins or ends with an invalid operator, remove it.
'AND 2' -> '2'
'2 AND' -> '2'
3. If a parenthetical term contains only 1 item, the parentheses can be removed.
'(a)' -> 'a'
'(NOT a)' -> 'NOT a'
'(a OR)' -> '(a)' (by rule 2) -> 'a'
4. If two operators are next to each other, except for binary-unary,
keep only the first.
'1 OR AND 2' -> '1 OR 2'
'1 NOT AND 2' -> '1 AND NOT AND 2' (by rule 1) -> '1 AND NOT 2'
'NOT NOT 1' -> 'NOT 1'
'1 AND NOT NOT 2' -> '1 AND NOT 2'
'''
final_tokens = []
has_operand = False
has_binary_operator = False
has_unary_operator = False
if len(tokens) == 1 and not isinstance(tokens[0], str):
# [['A' 'AND' 'B']] -> ['A' 'AND' 'B']
tokens = tokens[0]
for token in tokens:
skip_this = False
while isinstance(token, (list, tuple)):
if len(token) == 0:
# Delete empty parentheses.
skip_this = True
break
if len(token) == 1:
# Take singular terms out of their parentheses.
token = token[0]
else:
previous = token
token = implied_tokens(token)
if previous == token:
break
if skip_this:
continue
#print('tk:', token, 'hu:', has_unary_operator, 'hb:', has_binary_operator, 'ho:', has_operand)
if isinstance(token, str) and token in OPERATORS:
this_binary = token in BINARY_OPERATORS
this_unary = not this_binary
# 'NOT AND' and 'AND AND' are malformed...
if this_binary and (has_binary_operator or has_unary_operator):
continue
# ...'NOT NOT' is malformed...
if this_unary and has_unary_operator:
continue
# ...but AND NOT is okay.
# 'AND test' is malformed
if this_binary and not has_operand:
continue
if this_unary and has_operand:
final_tokens.append('AND')
has_unary_operator = this_unary
has_binary_operator = this_binary
has_operand = False
else:
if has_operand:
final_tokens.append('AND')
has_unary_operator = False
has_binary_operator = False
has_operand = True
final_tokens.append(token)
if has_binary_operator or has_unary_operator:
final_tokens.pop(-1)
return final_tokens
def order_operations(tokens):
for (index, token) in enumerate(tokens):
if isinstance(token, list):
tokens[index] = order_operations(token)
if len(tokens) < 5:
return tokens
index = 0
slice_start = None
slice_end = None
precedence_stack = []
while index < len(tokens):
#time.sleep(0.1)
token = tokens[index]
try:
precedence = PRECEDENCE.index(token)
except ValueError:
precedence = None
if precedence is None:
index += 1
continue
precedence_stack.append(precedence)
if token in UNARY_OPERATORS:
slice_start = index
slice_end = index + 2
elif len(precedence_stack) > 1:
if precedence_stack[-1] < precedence_stack[-2]:
slice_start = index - 1
slice_end = None
elif precedence_stack[-2] < precedence_stack[-1]:
slice_end = index
#print(tokens, index, token, precedence_stack, slice_start, slice_end, sep=' || ')
if slice_start is None or slice_end is None:
index += 1
continue
tokens[slice_start:slice_end] = [tokens[slice_start:slice_end]]
slice_start = None
slice_end = None
for x in range(2):
if not precedence_stack:
break
delete = precedence_stack[-1]
while precedence_stack and precedence_stack[-1] == delete:
index -= 1
precedence_stack.pop(-1)
index += 1
if slice_start is not None:
slice_end = len(tokens)
tokens[slice_start:slice_end] = [tokens[slice_start:slice_end]]
return tokens
def sublist_tokens(tokens, _from_index=0, depth=0):
'''
Given a list of tokens, replace parentheses with actual sublists.
['1', 'AND', '(', '3', 'OR', '4', ')'] ->
['1', 'AND', ['3', 'OR', '4']]
Unclosed parentheses are automatically closed at the end.
'''
final_tokens = []
index = _from_index
while index < len(tokens):
token = tokens[index]
#print(index, token)
index += 1
if token is PAREN_OPEN:
(token, index) = sublist_tokens(tokens, _from_index=index, depth=depth+1)
if token is PAREN_CLOSE:
break
final_tokens.append(token)
if _from_index == 0:
return final_tokens
else:
return (final_tokens, index)
def tokenize(expression):
'''
Break the string into a list of tokens. Spaces are the delimiter unless
they are inside quotation marks.
Quotation marks and parentheses can be escaped by preceeding with a backslash '\\'
Opening and closing parentheses are put into their own token unless
escaped / quoted.
Extraneous closing parentheses are ignored completely.
'1 AND(4 OR "5 6") OR \\(test\\)' ->
['1', 'AND', '(', '4', 'OR', '5 6', ')', 'OR', '\\(test\\)']
'''
current_word = []
in_escape = False
in_quotes = False
paren_depth = 0
tokens = []
for character in expression:
if in_escape:
#character = ESCAPE_SEQUENCES.get(character, '\\'+character)
in_escape = False
elif character in {'(', ')'} and not in_quotes:
if character == '(':
sentinel = PAREN_OPEN
paren_depth += 1
elif character == ')':
sentinel = PAREN_CLOSE
paren_depth -= 1
if paren_depth >= 0:
tokens.append(''.join(current_word))
tokens.append(sentinel)
current_word.clear()
continue
else:
continue
elif character == '\\':
in_escape = True
continue
elif character == '"':
in_quotes = not in_quotes
continue
elif character.isspace() and not in_quotes:
tokens.append(''.join(current_word))
current_word.clear()
continue
current_word.append(character)
tokens.append(''.join(current_word))
tokens = [w for w in tokens if w != '']
tokens = sublist_tokens(tokens)
tokens = implied_tokens(tokens)
tokens = order_operations(tokens)
return tokens
if __name__ == '__main__':
tests = [
#'test you AND(1 OR "harrison ford") AND (where are you) AND pg',
#'(you OR "AND ME")',
#'(3 XOR 2 OR 4',
#'1 NOT OR AND (2 OR (3 OR 4) OR (5 OR 6)))',
#'3 OR (5 OR)',
#'1 AND(4 OR "5 6")OR \\(test) 2',
#'1 2 AND (3 OR 4)',
#'AND 2',
#'1 AND 2 AND ("3 7" OR 6)AND (4 OR 5)',
#'NOT 1 AND NOT (2 OR 3)',
#'1 AND 2 AND 3 AND 4',
#'NOT 1 AND 2 OR 3 OR (5 AND 6)',
#'5 OR 6 AND 7 OR 8',
#'1 OR 2 AND 3 AND 4 OR 5 AND 6 OR 7 OR 8 AND 9',
#'2 XOR 3 AND 4',
#'1 OR (2 OR 3 AND 4)',
#'NOT XOR 4 7'
'[sci-fi] OR [pg-13]',
'([sci-fi] OR [war]) AND [r]',
'[r] XOR [sci-fi]',
'"mark hamill" "harrison ford"',
]
teststrings = {
'Star Wars': '[harrison ford] [george lucas] [sci-fi] [pg] [carrie fisher] [mark hamill] [space]',
'Blade Runner': '[harrison ford] [ridley scott] [neo-noir] [dystopian] [sci-fi] [r]',
'Indiana Jones': '[harrison ford] [steven spielberg] [adventure] [pg-13]',
'Apocalypse Now': '[harrison ford] [francis coppola] [r] [war] [drama]'
}
for token in tests:
print('start:', token)
token = tokenize(token)
print('implied:', token)
e = ExpressionTree.parse(token)
print('tree:', e)
for (name, teststring) in teststrings.items():
print('Matches', name, ':', e.evaluate(teststring))
print()

138
voussoirkit/fusker.py Normal file
View file

@ -0,0 +1,138 @@
import collections
import itertools
import string
import sys
from voussoirkit import basenumber
class Landmark:
def __init__(self, opener, closer, parser):
self.opener = opener
self.closer = closer
self.parser = parser
def barsplit(chars):
wordlist = []
wordbuff = []
def flush():
if not wordbuff:
return
word = fusk_join(wordbuff)
wordlist.append(word)
wordbuff.clear()
for item in chars:
if item == '|':
flush()
else:
wordbuff.append(item)
flush()
return wordlist
def fusk_join(items):
form = ''
fusks = []
result = []
for item in items:
if isinstance(item, str):
form += item
else:
form += '{}'
fusks.append(item)
product = itertools.product(*fusks)
for group in product:
f = form.format(*group)
result.append(f)
return result
def fusk_spinner(items):
for item in items:
if isinstance(item, str):
yield item
else:
yield from item
def parse_spinner(characters):
words = barsplit(characters)
spinner = fusk_spinner(words)
return spinner
def fusk_range(lo, hi, padto=0, base=10, lower=False):
for x in range(lo, hi+1):
x = basenumber.to_base(x, base)
x = x.rjust(padto, '0')
if lower:
x = x.lower()
yield x
def parse_range(characters):
r = ''.join(characters)
(lo, hi) = r.split('-')
lo = lo.strip()
hi = hi.strip()
lowers = string.digits + string.ascii_lowercase
uppers = string.digits + string.ascii_uppercase
lohi = lo + hi
lower = False
if all(c in string.digits for c in lohi):
base = 10
elif all(c in lowers for c in lohi):
lower = True
base = 36
elif all(c in uppers for c in lohi):
base = 36
else:
base = 62
if (not lo) or (not hi):
raise ValueError('Invalid range', r)
if len(lo) > 1 and lo.startswith('0'):
padto = len(lo)
if len(hi) != padto:
raise ValueError('Inconsistent padding', lo, hi)
else:
padto = 0
lo = basenumber.from_base(lo, base)
hi = basenumber.from_base(hi, base)
frange = fusk_range(lo, hi, padto=padto, base=base, lower=lower)
return frange
landmarks = {
'{': Landmark('{', '}', parse_spinner),
'[': Landmark('[', ']', parse_range),
}
def fusker(fstring, landmark=None, depth=0):
escaped = False
result = []
buff = []
if isinstance(fstring, str):
fstring = collections.deque(fstring)
while fstring:
character = fstring.popleft()
if escaped:
buff.append('\\' + character)
escaped = False
elif character == '\\':
escaped = True
elif landmark and character == landmark.closer:
buff = [landmark.parser(buff)]
break
elif character in landmarks:
subtotal = fusker(fstring, landmark=landmarks[character])
buff.extend(subtotal)
else:
buff.append(character)
if not landmark:
buff = parse_spinner(buff)
return buff
return result
if __name__ == '__main__':
pattern = sys.argv[1]
fusk = fusker(pattern)
for result in fusk:
print(result)

185
voussoirkit/passwordy.py Normal file
View file

@ -0,0 +1,185 @@
import string
import random
import sys
DEFAULT_LENGTH = 32
DEFAULT_SENTENCE = 5
HELP_MESSAGE = '''
===============================================================================
Generates a randomized password.
> passwordy [length] [options]
length: How many characters. Default %03d.
options:
h : consist entirely of hexadecimal characters.
b : consist entirely of binary characters.
dd : consist entirely of decimal characters.
default : consist entirely of upper+lower letters.
p : allow punctuation in conjunction with above.
d : allow digits in conjunction with above.
l : convert to lowercase.
u : convert to uppercase.
nd : no duplicates. Each character can only appear once.
Examples:
> passwordy 32 h l
98f17b6016cf08cc00f2aeecc8d8afeb
> passwordy 32 h u
2AA706866BF7A5C18328BF866136A261
> passwordy 32 u
JHEPTKCEFZRFXILMASHNPSTFFNWQHTTN
> passwordy 32 p
Q+:iSKX!Nt)ewUvlE*!+^D}hp+|<wpJ}
> passwordy 32 l p
m*'otz/"!qo?-^wwdu@fasf:|ldkosi`
===============================================================================
Generates a randomized sentence of words.
> passwordy sent [length] [join]
length : How many words. Default %03d.
join : The character that will join words together.
Default space.
Examples:
> passwordy sent
arrowroot sheared rustproof undo propionic acid
> passwordy sent 8
cipher competition solid angle rigmarole lachrymal social class critter consequently
> passwordy sent 8 _
Kahn_secondary_emission_unskilled_superior_court_straight_ticket_voltameter_hopper_crass
===============================================================================
'''.strip() % (DEFAULT_LENGTH, DEFAULT_SENTENCE)
def listget(li, index, fallback=None):
try:
return li[index]
except IndexError:
return fallback
def make_password(length=None, passtype='standard'):
'''
Returns a string of length `length` consisting of a random selection
of uppercase and lowercase letters, as well as punctuation and digits
if parameters permit
'''
if length is None:
length = DEFAULT_LENGTH
alphabet = ''
if 'standard' in passtype:
alphabet = string.ascii_letters
elif 'digit_only' in passtype:
alphabet = string.digits
elif 'hex' in passtype:
alphabet = '0123456789abcdef'
elif 'binary' in passtype:
alphabet = '01'
if '+digits' in passtype:
alphabet += string.digits
if '+punctuation' in passtype:
alphabet += string.punctuation
if '+lowercase' in passtype:
alphabet = alphabet.lower()
elif '+uppercase' in passtype:
alphabet = alphabet.upper()
alphabet = list(set(alphabet))
if '+noduplicates' in passtype:
if len(alphabet) < length:
message = 'Alphabet "%s" is not long enough to support no-dupe password of length %d'
message = message % (alphabet, length)
raise Exception(message)
password = ''
for x in range(length):
random.shuffle(alphabet)
password += alphabet.pop(0)
else:
password = ''.join([random.choice(alphabet) for x in range(length)])
return password
def make_sentence(length=None, joiner=' '):
'''
Returns a string containing `length` words, which come from
dictionary.common.
'''
import dictionary.common as common
if length is None:
length = DEFAULT_LENGTH
words = [random.choice(common.words) for x in range(length)]
words = [w.replace(' ', joiner) for w in words]
result = joiner.join(words)
return result
if __name__ == '__main__':
args = sys.argv[1:]
argc = len(args)
mode = listget(args, 0, 'password')
if 'help' in mode:
print(HELP_MESSAGE)
quit()
if 'sent' not in mode:
length = listget(args, 0, str(DEFAULT_LENGTH))
options = [a.lower() for a in args[1:]]
if '-' in length:
length = length.replace(' ', '')
length = [int(x) for x in length.split('-', 1)]
length = random.randint(*length)
elif not length.isdigit() and options == []:
options = [length]
length = DEFAULT_LENGTH
length = int(length)
passtype = 'standard'
if 'dd' in options:
passtype = 'digit_only'
if 'b' in options:
passtype = 'binary'
if 'h' in options:
passtype = 'hex'
if 'l' in options:
passtype += '+lowercase'
elif 'u' in options:
passtype += '+uppercase'
if 'p' in options:
passtype += '+punctuation'
if 'd' in options:
passtype += '+digits'
if 'nd' in options:
passtype += '+noduplicates'
print(make_password(length, passtype=passtype))
else:
length = listget(args, 1, str(DEFAULT_SENTENCE))
joiner = listget(args, 2, ' ')
if not length.isdigit():
joiner = length
length = DEFAULT_SENTENCE
length = int(length)
print(make_sentence(length, joiner))

267
voussoirkit/pathclass.py Normal file
View file

@ -0,0 +1,267 @@
import glob
import os
import re
class PathclassException(Exception):
pass
class NotDirectory(PathclassException):
pass
class NotFile(PathclassException):
pass
class Path:
'''
I started to use pathlib.Path, but it was too much of a pain.
'''
def __init__(self, path):
if isinstance(path, Path):
self.absolute_path = path.absolute_path
else:
path = path.strip()
if re.search('[A-Za-z]:$', path):
# Bare Windows drive letter.
path += os.sep
path = normalize_sep(path)
path = os.path.normpath(path)
path = os.path.abspath(path)
self.absolute_path = path
def __contains__(self, other):
if isinstance(other, Path):
other = other.normcase
return other.startswith(self.normcase)
def __eq__(self, other):
if not hasattr(other, 'absolute_path'):
return False
return self.normcase == other.normcase
def __hash__(self):
return hash(self.normcase)
def __repr__(self):
return '{c}({path})'.format(c=self.__class__.__name__, path=repr(self.absolute_path))
def assert_is_file(self):
if not self.is_file:
raise NotFile(self)
def assert_is_directory(self):
if not self.is_dir:
raise NotDirectory(self)
@property
def basename(self):
return os.path.basename(self.absolute_path)
def correct_case(self):
self.absolute_path = get_path_casing(self.absolute_path)
return self.absolute_path
@property
def depth(self):
return len(self.absolute_path.split(os.sep))
@property
def exists(self):
return os.path.exists(self.absolute_path)
@property
def extension(self):
return os.path.splitext(self.absolute_path)[1].lstrip('.')
@property
def is_dir(self):
return os.path.isdir(self.absolute_path)
@property
def is_file(self):
return os.path.isfile(self.absolute_path)
@property
def is_link(self):
return os.path.islink(self.absolute_path)
def join(self, subpath):
if not isinstance(subpath, str):
raise TypeError('subpath must be a string')
return Path(os.path.join(self.absolute_path, subpath))
def listdir(self):
children = os.listdir(self.absolute_path)
children = [self.with_child(child) for child in children]
return children
@property
def normcase(self):
return os.path.normcase(self.absolute_path)
@property
def parent(self):
parent = os.path.dirname(self.absolute_path)
parent = self.__class__(parent)
return parent
@property
def relative_path(self):
return self.relative_to(os.getcwd())
def relative_to(self, other):
other = Path(other)
other.correct_case()
self.correct_case()
if self == other:
return '.'
if self in other:
return self.absolute_path.replace(other.absolute_path, '.')
common = common_path([other.absolute_path, self.absolute_path], fallback=None)
print(common)
if common is None:
return self.absolute_path
backsteps = other.depth - common.depth
backsteps = os.sep.join('..' for x in range(backsteps))
return self.absolute_path.replace(common.absolute_path, backsteps)
def replace_extension(self, extension):
extension = extension.rsplit('.', 1)[-1]
base = os.path.splitext(self.absolute_path)[0]
if extension == '':
return Path(base)
return Path(base + '.' + extension)
@property
def size(self):
if self.is_file:
return os.path.getsize(self.absolute_path)
else:
return None
@property
def stat(self):
return os.stat(self.absolute_path)
def with_child(self, basename):
return self.join(os.path.basename(basename))
def common_path(paths, fallback):
'''
Given a list of file paths, determine the deepest path which all
have in common.
'''
if isinstance(paths, (str, Path)):
raise TypeError('`paths` must be a collection')
paths = [Path(f) for f in paths]
if len(paths) == 0:
raise ValueError('Empty list')
if hasattr(paths, 'pop'):
model = paths.pop()
else:
model = paths[0]
paths = paths[1:]
while True:
if all(f in model for f in paths):
return model
parent = model.parent
if parent == model:
# We just processed the root, and now we're stuck at the root.
# Which means there was no common path.
return fallback
model = parent
def get_path_casing(path):
'''
Take what is perhaps incorrectly cased input and get the path's actual
casing according to the filesystem.
Thank you:
Ethan Furman http://stackoverflow.com/a/7133137/5430534
xvorsx http://stackoverflow.com/a/14742779/5430534
'''
if not isinstance(path, Path):
path = Path(path)
# Nonexistent paths don't glob correctly. If the input is a nonexistent
# subpath of an existing path, we have to glob the existing portion first,
# and then attach the fake portion again at the end.
input_path = path
while not path.exists:
parent = path.parent
if path == parent:
# We're stuck at a fake root.
return input_path.absolute_path
path = parent
path = path.absolute_path
(drive, subpath) = os.path.splitdrive(path)
drive = drive.upper()
subpath = subpath.lstrip(os.sep)
pattern = [glob_patternize(piece) for piece in subpath.split(os.sep)]
pattern = os.sep.join(pattern)
pattern = drive + os.sep + pattern
try:
cased = glob.glob(pattern)[0]
except IndexError:
return input_path.absolute_path
imaginary_portion = input_path.absolute_path
imaginary_portion = imaginary_portion[len(cased):]
#real_portion = os.path.normcase(cased)
#imaginary_portion = imaginary_portion.replace(real_portion, '')
imaginary_portion = imaginary_portion.lstrip(os.sep)
cased = os.path.join(cased, imaginary_portion)
cased = cased.rstrip(os.sep)
if not os.sep in cased:
cased += os.sep
return cased
def glob_patternize(piece):
'''
Create a pattern like "[u]ser" from "user", forcing glob to look up the
correct path name, while guaranteeing that the only result will be the correct path.
Special cases are:
`!`
because in glob syntax, [!x] tells glob to look for paths that don't contain
"x", and [!] is invalid syntax.
`[`, `]`
because this starts a glob capture group
so we pick the first non-special character to put in the brackets.
If the path consists entirely of these special characters, then the
casing doesn't need to be corrected anyway.
'''
piece = glob.escape(piece)
for character in piece:
if character not in '![]':
replacement = '[%s]' % character
#print(piece, character, replacement)
piece = piece.replace(character, replacement, 1)
break
return piece
def normalize_sep(path):
for char in ('\\', '/'):
if char != os.sep:
path = path.replace(char, os.sep)
return path
def system_root():
return os.path.abspath(os.sep)

57
voussoirkit/quickid.py Normal file
View file

@ -0,0 +1,57 @@
'''
This module is designed to provide a GOOD ENOUGH means of identifying duplicate
files very quickly, so that more in-depth checks can be done on likely matches.
'''
import hashlib
import os
import sys
SEEK_END = 2
CHUNK_SIZE = 2 * 2**20
FORMAT = '{size}_{chunk_size}_{hash}'
def equal(handle1, handle2, *args, **kwargs):
size1 = handle1.seek(0, SEEK_END)
size2 = handle2.seek(0, SEEK_END)
handle1.seek(0)
handle2.seek(0)
if size1 != size2:
return False
return quickid(handle1, *args, **kwargs) == quickid(handle2, *args, **kwargs)
def equal_file(filename1, filename2, *args, **kwargs):
filename1 = os.path.abspath(filename1)
filename2 = os.path.abspath(filename2)
with open(filename1, 'rb') as handle1, open(filename2, 'rb') as handle2:
return equal(handle1, handle2, *args, **kwargs)
def quickid(handle, hashclass=None, chunk_size=None):
if hashclass is None:
hashclass = hashlib.md5
if chunk_size is None:
chunk_size = CHUNK_SIZE
hasher = hashclass()
size = handle.seek(0, SEEK_END)
handle.seek(0)
if size <= 2 * chunk_size:
hasher.update(handle.read())
else:
hasher.update(handle.read(chunk_size))
handle.seek(-1 * chunk_size, SEEK_END)
hasher.update(handle.read())
return FORMAT.format(size=size, chunk_size=chunk_size, hash=hasher.hexdigest())
def quickid_file(filename, *args, **kwargs):
filename = os.path.abspath(filename)
with open(filename, 'rb') as handle:
return quickid(handle, *args, **kwargs)
def main(argv):
print(quickid_file(argv[0]))
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -0,0 +1,66 @@
import time
class Ratelimiter:
def __init__(self, allowance, period=1, operation_cost=1, mode='sleep'):
'''
allowance:
Our spending balance per `period` seconds.
period:
The number of seconds over which we can perform `allowance` operations.
operation_cost:
The default amount to remove from our balance after each operation.
Pass a `cost` parameter to `self.limit` to use a nondefault value.
mode:
'sleep':
If we do not have the balance for an operation, sleep until we do.
Return True every time.
'reject':
If we do not have the balance for an operation, return False.
The cost is not subtracted, so hopefully we have enough next time.
'''
if mode not in ('sleep', 'reject'):
raise ValueError('Invalid mode %s' % repr(mode))
self.allowance = allowance
self.period = period
self.operation_cost = operation_cost
self.mode = mode
self.last_operation = time.time()
self.balance = 0
@property
def gain_rate(self):
return self.allowance / self.period
def limit(self, cost=None):
'''
See the main class docstring for info about cost and mode behavior.
'''
if cost is None:
cost = self.operation_cost
time_diff = time.time() - self.last_operation
self.balance += time_diff * self.gain_rate
self.balance = min(self.balance, self.allowance)
if self.balance >= cost:
self.balance -= cost
succesful = True
else:
if self.mode == 'reject':
succesful = False
else:
deficit = cost - self.balance
time_needed = deficit / self.gain_rate
time.sleep(time_needed)
self.balance = 0
succesful = True
self.last_operation = time.time()
return succesful

64
voussoirkit/ratemeter.py Normal file
View file

@ -0,0 +1,64 @@
import collections
import math
import time
class RateMeter:
def __init__(self, span):
'''
This class is used to calculate a rolling average of
units per second over `span` seconds.
Set `span` to None to calculate unit/s over the lifetime of the object
after the first digest, rather than over a span.
This saves the effort of tracking timestamps. Don't just use a large number!
'''
self.sum = 0
self.span = span
self.tracking = collections.deque()
self.first_digest = None
def digest(self, value):
now = time.time()
self.sum += value
if self.span is None:
if self.first_digest is None:
self.first_digest = now
return
earlier = now - self.span
while len(self.tracking) > 0 and self.tracking[0][0] < earlier:
(timestamp, pop_value) = self.tracking.popleft()
self.sum -= pop_value
if len(self.tracking) == 0 or self.tracking[-1] != now:
self.tracking.append([now, value])
else:
self.tracking[-1][1] += value
def report(self):
'''
Return a tuple containing the running sum, the time span
over which the rate is being calculated, and the rate in
units per second.
(sum, time_interval, rate)
'''
# Flush the old values, ensure self.first_digest exists.
self.digest(0)
if self.span is None:
now = math.ceil(time.time())
time_interval = now - self.first_digest
else:
# No risk of IndexError because the digest(0) ensures we have
# at least one entry.
time_interval = self.tracking[-1][0] - self.tracking[0][0]
if time_interval == 0:
return (self.sum, 0, self.sum)
rate = self.sum / time_interval
time_interval = round(time_interval, 3)
rate = round(rate, 3)
return (self.sum, time_interval, rate)

18
voussoirkit/safeprint.py Normal file
View file

@ -0,0 +1,18 @@
'''
This function is slow and ugly, but I need a way to safely print unicode strings
on systems that don't support it without crippling those who do.
'''
def safeprint(text, file_handle=None, end='\n'):
for character in text:
try:
if file_handle:
file_handle.write(character)
else:
print(character, end='', flush=False)
except UnicodeError:
if file_handle:
file_handle.write('?')
else:
print('?', end='', flush=False)
if not file_handle:
print(end, end='', flush=True)

713
voussoirkit/spinal.py Normal file
View file

@ -0,0 +1,713 @@
import collections
import hashlib
import logging
import os
import shutil
import sys
# pip install voussoirkit
from voussoirkit import bytestring
from voussoirkit import pathclass
from voussoirkit import ratelimiter
logging.basicConfig(level=logging.CRITICAL)
log = logging.getLogger(__name__)
CHUNK_SIZE = 2 * bytestring.MIBIBYTE
# Number of bytes to read and write at a time
HASH_CLASS = hashlib.md5
class DestinationIsDirectory(Exception):
pass
class DestinationIsFile(Exception):
pass
class RecursiveDirectory(Exception):
pass
class SourceNotDirectory(Exception):
pass
class SourceNotFile(Exception):
pass
class SpinalError(Exception):
pass
class ValidationError(Exception):
pass
def callback_exclusion_v1(name, path_type):
'''
Example of an exclusion callback function.
'''
print('Excluding', path_type, name)
def callback_v1(fpobj, written_bytes, total_bytes):
'''
Example of a copy callback function.
Prints "filename written/total (percent%)"
'''
filename = fpobj.absolute_path.encode('ascii', 'replace').decode()
if written_bytes >= total_bytes:
ends = '\r\n'
else:
ends = ''
percent = (100 * written_bytes) / max(total_bytes, 1)
percent = '%07.3f' % percent
written = '{:,}'.format(written_bytes)
total = '{:,}'.format(total_bytes)
written = written.rjust(len(total), ' ')
status = '{filename} {written}/{total} ({percent}%)\r'
status = status.format(filename=filename, written=written, total=total, percent=percent)
print(status, end=ends)
sys.stdout.flush()
def copy(source, file_args=None, file_kwargs=None, dir_args=None, dir_kwargs=None):
'''
Perform copy_dir or copy_file as appropriate for the source path.
'''
source = str_to_fp(source)
if source.is_file:
file_args = file_args or tuple()
file_kwargs = file_kwargs or dict()
return copy_file(source, *file_args, **file_kwargs)
elif source.is_dir:
dir_args = dir_args or tuple()
dir_kwargs = dir_kwargs or dict()
return copy_dir(source, *dir_args, **dir_kwargs)
raise SpinalError('Neither file nor dir: %s' % source)
def copy_dir(
source,
destination=None,
*,
bytes_per_second=None,
callback_directory=None,
callback_exclusion=None,
callback_file=None,
callback_permission_denied=None,
destination_new_root=None,
dry_run=False,
exclude_directories=None,
exclude_filenames=None,
files_per_second=None,
overwrite_old=True,
precalcsize=False,
validate_hash=False,
):
'''
Copy all of the contents from source to destination,
including subdirectories.
source:
The directory which will be copied.
destination:
The directory in which copied files are placed. Alternatively, use
destination_new_root.
bytes_per_second:
Restrict file copying to this many bytes per second. Can be an integer
or an existing Ratelimiter object.
The BYTE, KIBIBYTE, etc constants from module 'bytestring' may help.
Default = None
callback_directory:
This function will be called after each file copy with three parameters:
name of file copied, number of bytes written to destination directory
so far, total bytes needed (based on precalcsize).
If `precalcsize` is False, this function will receive written bytes
for both written and total, showing 100% always.
Default = None
callback_exclusion:
Passed directly into `walk_generator`.
Default = None
callback_file:
Will be passed into each individual `copy_file` operation as the
`callback` for that file.
Default = None
callback_permission_denied:
Will be passed into each individual `copy_file` operation as the
`callback_permission_denied` for that file.
Default = None
destination_new_root:
Determine the destination path by calling
`new_root(source, destination_new_root)`.
Thus, this path acts as a root and the rest of the path is matched.
`destination` and `destination_new_root` are mutually exclusive.
dry_run:
Do everything except the actual file copying.
Default = False
exclude_filenames:
Passed directly into `walk_generator`.
Default = None
exclude_directories:
Passed directly into `walk_generator`.
Default = None
files_per_second:
Maximum number of files to be processed per second. Helps to keep CPU
usage low.
Default = None
overwrite_old:
If True, overwrite the destination file if the source file
has a more recent "last modified" timestamp.
Default = True
precalcsize:
If True, calculate the size of source before beginning the
operation. This number can be used in the callback_directory function.
Else, callback_directory will receive written bytes as total bytes
(showing 100% always).
This can take a long time.
Default = False
validate_hash:
Passed directly into each `copy_file`.
Returns: [destination path, number of bytes written to destination]
(Written bytes is 0 if all files already existed.)
'''
# Prepare parameters
if not is_xor(destination, destination_new_root):
message = 'One and only one of `destination` and '
message += '`destination_new_root` can be passed.'
raise ValueError(message)
source = str_to_fp(source)
if destination_new_root is not None:
source.correct_case()
destination = new_root(source, destination_new_root)
destination = str_to_fp(destination)
if destination in source:
raise RecursiveDirectory(source, destination)
if not source.is_dir:
raise SourceNotDirectory(source)
if destination.is_file:
raise DestinationIsFile(destination)
if precalcsize is True:
total_bytes = get_dir_size(source)
else:
total_bytes = 0
callback_directory = callback_directory or do_nothing
bytes_per_second = limiter_or_none(bytes_per_second)
files_per_second = limiter_or_none(files_per_second)
# Copy
written_bytes = 0
walker = walk_generator(
source,
callback_exclusion=callback_exclusion,
exclude_directories=exclude_directories,
exclude_filenames=exclude_filenames,
)
for source_abspath in walker:
# Terminology:
# abspath: C:\folder\subfolder\filename.txt
# location: C:\folder\subfolder
# base_name: filename.txt
# folder: subfolder
destination_abspath = source_abspath.absolute_path.replace(
source.absolute_path,
destination.absolute_path
)
destination_abspath = str_to_fp(destination_abspath)
if destination_abspath.is_dir:
raise DestinationIsDirectory(destination_abspath)
destination_location = os.path.split(destination_abspath.absolute_path)[0]
if not dry_run:
os.makedirs(destination_location, exist_ok=True)
copied = copy_file(
source_abspath,
destination_abspath,
bytes_per_second=bytes_per_second,
callback_progress=callback_file,
callback_permission_denied=callback_permission_denied,
dry_run=dry_run,
overwrite_old=overwrite_old,
validate_hash=validate_hash,
)
copiedname = copied[0]
written_bytes += copied[1]
if precalcsize is False:
callback_directory(copiedname, written_bytes, written_bytes)
else:
callback_directory(copiedname, written_bytes, total_bytes)
if files_per_second is not None:
files_per_second.limit(1)
return [destination, written_bytes]
def copy_file(
source,
destination=None,
*,
destination_new_root=None,
bytes_per_second=None,
callback_progress=None,
callback_permission_denied=None,
callback_validate_hash=None,
dry_run=False,
overwrite_old=True,
validate_hash=False,
):
'''
Copy a file from one place to another.
source:
The file to copy.
destination:
The filename of the new copy. Alternatively, use
destination_new_root.
destination_new_root:
Determine the destination path by calling
`new_root(source_dir, destination_new_root)`.
Thus, this path acts as a root and the rest of the path is matched.
bytes_per_second:
Restrict file copying to this many bytes per second. Can be an integer
or an existing Ratelimiter object.
The provided BYTE, KIBIBYTE, etc constants may help.
Default = None
callback_permission_denied:
If provided, this function will be called when a source file denies
read access, with the file path and the exception object as parameters.
THE OPERATION WILL RETURN NORMALLY.
If not provided, the PermissionError is raised.
Default = None
callback_progress:
If provided, this function will be called after writing
each CHUNK_SIZE bytes to destination with three parameters:
the Path object being copied, number of bytes written so far,
total number of bytes needed.
Default = None
callback_validate_hash:
Passed directly into `verify_hash`
Default = None
dry_run:
Do everything except the actual file copying.
Default = False
overwrite_old:
If True, overwrite the destination file if the source file
has a more recent "last modified" timestamp.
Default = True
validate_hash:
If True, verify the file hash of the resulting file, using the
`HASH_CLASS` global.
Default = False
Returns: [destination filename, number of bytes written to destination]
(Written bytes is 0 if the file already existed.)
'''
# Prepare parameters
if not is_xor(destination, destination_new_root):
message = 'One and only one of `destination` and '
message += '`destination_new_root` can be passed'
raise ValueError(message)
source = str_to_fp(source)
if not source.is_file:
raise SourceNotFile(source)
if destination_new_root is not None:
source.correct_case()
destination = new_root(source, destination_new_root)
destination = str_to_fp(destination)
callback_progress = callback_progress or do_nothing
if destination.is_dir:
destination = destination.with_child(source.basename)
bytes_per_second = limiter_or_none(bytes_per_second)
# Determine overwrite
if destination.exists:
if overwrite_old is False:
return [destination, 0]
source_modtime = source.stat.st_mtime
if source_modtime == destination.stat.st_mtime:
return [destination, 0]
# Copy
if dry_run:
if callback_progress is not None:
callback_progress(destination, 0, 0)
return [destination, 0]
source_bytes = source.size
destination_location = os.path.split(destination.absolute_path)[0]
os.makedirs(destination_location, exist_ok=True)
def handlehelper(path, mode):
try:
handle = open(path.absolute_path, mode)
return handle
except PermissionError as exception:
if callback_permission_denied is not None:
callback_permission_denied(path, exception)
return None
else:
raise
log.debug('Opening handles.')
source_handle = handlehelper(source, 'rb')
destination_handle = handlehelper(destination, 'wb')
if None in (source_handle, destination_handle):
return [destination, 0]
if validate_hash:
hasher = HASH_CLASS()
written_bytes = 0
while True:
try:
data_chunk = source_handle.read(CHUNK_SIZE)
except PermissionError as e:
print(source)
raise
data_bytes = len(data_chunk)
if data_bytes == 0:
break
if validate_hash:
hasher.update(data_chunk)
destination_handle.write(data_chunk)
written_bytes += data_bytes
if bytes_per_second is not None:
bytes_per_second.limit(data_bytes)
callback_progress(destination, written_bytes, source_bytes)
if written_bytes == 0:
# For zero-length files, we want to get at least one call in there.
callback_progress(destination, written_bytes, source_bytes)
# Fin
log.debug('Closing source handle.')
source_handle.close()
log.debug('Closing dest handle.')
destination_handle.close()
log.debug('Copying metadata')
shutil.copystat(source.absolute_path, destination.absolute_path)
if validate_hash:
verify_hash(
destination,
callback=callback_validate_hash,
known_size=source_bytes,
known_hash=hasher.hexdigest(),
)
return [destination, written_bytes]
def do_nothing(*args):
'''
Used by other functions as the default callback.
'''
return
def get_dir_size(path):
'''
Calculate the total number of bytes across all files in this directory
and its subdirectories.
'''
path = str_to_fp(path)
if not path.is_dir:
raise SourceNotDirectory(path)
total_bytes = 0
for filepath in walk_generator(path):
total_bytes += filepath.size
return total_bytes
def is_subfolder(parent, child):
'''
Determine whether parent contains child.
'''
parent = normalize(str_to_fp(parent).absolute_path) + os.sep
child = normalize(str_to_fp(child).absolute_path) + os.sep
return child.startswith(parent)
def is_xor(*args):
'''
Return True if and only if one arg is truthy.
'''
return [bool(a) for a in args].count(True) == 1
def limiter_or_none(value):
if isinstance(value, str):
value = bytestring.parsebytes(value)
if isinstance(value, ratelimiter.Ratelimiter):
limiter = value
elif value is not None:
limiter = ratelimiter.Ratelimiter(allowance=value, period=1)
else:
limiter = None
return limiter
def new_root(filepath, root):
'''
Prepend `root` to `filepath`, drive letter included. For example:
"C:\\folder\\subfolder\\file.txt" and "C:\\backups" becomes
"C:\\backups\\C\\folder\\subfolder\\file.txt"
I use this so that my G: drive can have backups from my C: and D: drives
while preserving directory structure in G:\\D and G:\\C.
'''
filepath = str_to_fp(filepath).absolute_path
root = str_to_fp(root).absolute_path
filepath = filepath.replace(':', os.sep)
filepath = os.path.normpath(filepath)
filepath = os.path.join(root, filepath)
return str_to_fp(filepath)
def normalize(text):
'''
Apply os.path.normpath and os.path.normcase.
'''
return os.path.normpath(os.path.normcase(text))
def str_to_fp(path):
'''
If `path` is a string, create a Path object, otherwise just return it.
'''
if isinstance(path, str):
path = pathclass.Path(path)
return path
def verify_hash(path, known_size, known_hash, callback=None):
'''
callback:
A function that takes three parameters:
path object, bytes ingested so far, bytes total
'''
path = str_to_fp(path)
log.debug('Validating hash for "%s" against %s', path.absolute_path, known_hash)
file_size = os.path.getsize(path.absolute_path)
if file_size != known_size:
raise ValidationError('File size %d != known size %d' % (file_size, known_size))
handle = open(path.absolute_path, 'rb')
hasher = HASH_CLASS()
checked_bytes = 0
with handle:
while True:
chunk = handle.read(CHUNK_SIZE)
if not chunk:
break
hasher.update(chunk)
checked_bytes += len(chunk)
if callback is not None:
callback(path, checked_bytes, file_size)
file_hash = hasher.hexdigest()
if file_hash != known_hash:
raise ValidationError('File hash "%s" != known hash "%s"' % (file_hash, known_hash))
log.debug('Hash validation passed.')
def walk_generator(
path='.',
*,
callback_exclusion=None,
callback_permission_denied=None,
depth_first=True,
exclude_directories=None,
exclude_filenames=None,
recurse=True,
yield_directories=False,
yield_files=True,
yield_style='flat',
):
'''
Yield Path objects for files in the file tree, similar to os.walk.
callback_exclusion:
This function will be called when a file or directory is excluded with
two parameters: the path, and 'file' or 'directory'.
Default = None
exclude_filenames:
A set of filenames that will not be copied. Entries can be absolute
paths to exclude that particular file, or plain names to exclude
all matches. For example:
{'C:\\folder\\file.txt', 'desktop.ini'}
Default = None
exclude_directories:
A set of directories that will not be copied. Entries can be
absolute paths to exclude that particular directory, or plain names
to exclude all matches. For example:
{'C:\\folder', 'thumbnails'}
Default = None
recurse:
Yield from subdirectories. If False, only immediate files are returned.
yield_directories:
Should the generator produce directories? Has no effect in nested yield style.
yield_files:
Should the generator produce files? Has no effect in nested yield style.
yield_style:
If 'flat', yield individual files one by one in a constant stream.
If 'nested', yield tuple(root, directories, files) like os.walk does,
except I use Path objects with absolute paths for everything.
'''
if not yield_directories and not yield_files:
raise ValueError('yield_directories and yield_files cannot both be False')
if yield_style not in ['flat', 'nested']:
raise ValueError('Invalid yield_style %s. Either "flat" or "nested".' % repr(yield_style))
if exclude_directories is None:
exclude_directories = set()
if exclude_filenames is None:
exclude_filenames = set()
callback_exclusion = callback_exclusion or do_nothing
callback_permission_denied = callback_permission_denied or do_nothing
exclude_filenames = {normalize(f) for f in exclude_filenames}
exclude_directories = {normalize(f) for f in exclude_directories}
path = str_to_fp(path)
path.correct_case()
# Considering full paths
if normalize(path.absolute_path) in exclude_directories:
callback_exclusion(path.absolute_path, 'directory')
return
# Considering folder names
if normalize(path.basename) in exclude_directories:
callback_exclusion(path.absolute_path, 'directory')
return
directory_queue = collections.deque()
directory_queue.append(path)
# This is a recursion-free workplace.
# Thank you for your cooperation.
while len(directory_queue) > 0:
current_location = directory_queue.popleft()
log.debug('listdir: %s', current_location.absolute_path)
try:
contents = os.listdir(current_location.absolute_path)
except PermissionError as exception:
callback_permission_denied(current_location, exception)
continue
log.debug('received %d items', len(contents))
if yield_style == 'flat' and yield_directories:
yield current_location
directories = []
files = []
for base_name in contents:
absolute_name = os.path.join(current_location.absolute_path, base_name)
if os.path.isdir(absolute_name):
exclude = (
normalize(absolute_name) in exclude_directories or
normalize(base_name) in exclude_directories
)
if exclude:
callback_exclusion(absolute_name, 'directory')
continue
directory = str_to_fp(absolute_name)
directories.append(directory)
elif yield_style == 'flat' and not yield_files:
continue
else:
exclude = normalize(absolute_name) in exclude_filenames
exclude |= normalize(base_name) in exclude_filenames
if exclude:
callback_exclusion(absolute_name, 'file')
continue
fp = str_to_fp(absolute_name)
if yield_style == 'flat':
yield fp
else:
files.append(fp)
if yield_style == 'nested':
yield (current_location, directories, files)
if not recurse:
break
if depth_first:
# Extendleft causes them to get reversed, so flip it first.
directories.reverse()
directory_queue.extendleft(directories)
else:
directory_queue.extend(directories)

109
voussoirkit/sqlhelpers.py Normal file
View file

@ -0,0 +1,109 @@
def delete_filler(pairs):
'''
Manually aligning the bindings for DELETE statements is annoying.
Given a dictionary of {column: value}, return the "WHERE ..." portion of
the query and the bindings in the correct order.
Example:
pairs={'test': 'toast', 'ping': 'pong'}
->
returns ('WHERE test = ? AND ping = ?', ['toast', 'pong'])
In context:
(qmarks, bindings) = delete_filler(pairs)
query = 'DELETE FROM table %s' % qmarks
cur.execute(query, bindings)
'''
qmarks = []
bindings = []
for (key, value) in pairs.items():
qmarks.append('%s = ?' % key)
bindings.append(value)
qmarks = ' AND '.join(qmarks)
qmarks = 'WHERE %s' % qmarks
return (qmarks, bindings)
def insert_filler(column_names, values, require_all=True):
'''
Manually aligning the bindings for INSERT statements is annoying.
Given the table's column names and a dictionary of {column: value},
return the question marks and the list of bindings in the right order.
require_all:
If `values` does not contain one of the column names, should we raise
an exception?
Otherwise, that column will simply receive None.
Example:
column_names=['id', 'name', 'score'],
values={'score': 20, 'id': '1111', 'name': 'James'}
->
returns ('?, ?, ?', ['1111', 'James', 20])
In context:
(qmarks, bindings) = insert_filler(COLUMN_NAMES, data)
query = 'INSERT INTO table VALUES(%s)' % qmarks
cur.execute(query, bindings)
'''
values = values.copy()
for column in column_names:
if column in values:
continue
if require_all:
raise ValueError('Missing column "%s"' % column)
else:
values[column] = None
qmarks = '?' * len(column_names)
qmarks = ', '.join(qmarks)
bindings = [values[column] for column in column_names]
return (qmarks, bindings)
def update_filler(pairs, where_key):
'''
Manually aligning the bindings for UPDATE statements is annoying.
Given a dictionary of {column: value} as well as the name of the column
to be used as the WHERE, return the "SET ..." portion of the query and the
bindings in the correct order.
If the where_key needs to be reassigned also, let its value be a 2-tuple
where [0] is the current value used for WHERE, and [1] is the new value
used for SET.
Example:
pairs={'id': '1111', 'name': 'James', 'score': 20},
where_key='id'
->
returns ('SET name = ?, score = ? WHERE id == ?', ['James', 20, '1111'])
Example:
pairs={'filepath': ('/oldplace', '/newplace')},
where_key='filepath'
->
returns ('SET filepath = ? WHERE filepath == ?', ['/newplace', '/oldplace'])
In context:
(qmarks, bindings) = update_filler(data, where_key)
query = 'UPDATE table %s' % qmarks
cur.execute(query, bindings)
'''
pairs = pairs.copy()
where_value = pairs.pop(where_key)
if isinstance(where_value, tuple):
(where_value, pairs[where_key]) = where_value
if isinstance(where_value, dict):
where_value = where_value['old']
pairs[where_key] = where_value['new']
if len(pairs) == 0:
raise ValueError('No pairs left after where_key.')
qmarks = []
bindings = []
for (key, value) in pairs.items():
qmarks.append('%s = ?' % key)
bindings.append(value)
bindings.append(where_value)
setters = ', '.join(qmarks)
qmarks = 'SET {setters} WHERE {where_key} == ?'
qmarks = qmarks.format(setters=setters, where_key=where_key)
return (qmarks, bindings)

83
voussoirkit/treeclass.py Normal file
View file

@ -0,0 +1,83 @@
import os
class ExistingChild(Exception):
pass
class InvalidIdentifier(Exception):
pass
class Tree:
def __init__(self, identifier, data=None):
self.assert_identifier_ok(identifier)
self.identifier = identifier
self.data = data
self.parent = None
self.children = {}
def __eq__(self, other):
return isinstance(other, Tree) and self.abspath() == other.abspath()
def __getitem__(self, key):
return self.children[key]
def __hash__(self):
return hash(self.abspath())
def __repr__(self):
return 'Tree(%s)' % self.identifier
@staticmethod
def assert_identifier_ok(identifier):
if not isinstance(identifier, str):
raise InvalidIdentifier(f'Identifier {identifier} must be a string.')
if '/' in identifier or '\\' in identifier:
raise InvalidIdentifier('Identifier cannot contain slashes')
def abspath(self):
node = self
nodes = [node]
while nodes[-1].parent is not None:
nodes.append(nodes[-1].parent)
nodes.reverse()
nodes = [node.identifier for node in nodes]
return '\\'.join(nodes)
def add_child(self, other_node, overwrite_parent=False):
self.assert_child_available(other_node.identifier)
if other_node.parent is not None and not overwrite_parent:
raise ValueError('That node already has a parent. Try `overwrite_parent=True`')
other_node.parent = self
self.children[other_node.identifier] = other_node
return other_node
def assert_child_available(self, identifier):
if identifier in self.children:
raise ExistingChild(f'Node {self.identifier} already has child {identifier}')
def detach(self):
if self.parent is None:
return
del self.parent.children[self.identifier]
self.parent = None
def list_children(self, sort=None):
children = list(self.children.values())
if sort is None:
children.sort(key=lambda node: (node.identifier.lower(), node.identifier))
else:
children.sort(key=sort)
return children
def walk(self, sort=None):
yield self
for child in self.list_children(sort=sort):
yield from child.walk(sort=sort)
def walk_parents(self):
parent = self.parent
while parent is not None:
yield parent
parent = parent.parent