This commit is contained in:
unknown 2016-02-20 23:13:50 -08:00
parent 001a8d970f
commit 4101e70d25
18 changed files with 1049 additions and 516 deletions

View file

@ -2,11 +2,16 @@ from PIL import Image
import os
import sys
CLOSE_ENOUGH_THRESHOLD = 10
close_enough_threshold = 90
filename = sys.argv[1]
try:
close_enough_threshold = int(sys.argv[2])
except:
pass
def close_enough(a, b):
for (a_channel, b_channel) in zip(a, b):
if abs(a_channel - b_channel) > CLOSE_ENOUGH_THRESHOLD:
if abs(a_channel - b_channel) > close_enough_threshold:
return False
return True
@ -16,9 +21,9 @@ def deletterbox(filename):
for x in range(4):
image = trim_top(image)
image = image.rotate(90)
(base, ext) = os.path.splitext(filename)
#(base, ext) = os.path.splitext(filename)
#filename = base + 'X' + ext
image.save(filename)
image.save(filename, quality=100)
def trim_top(image):
letterbox_color = image.getpixel((0, 0))
@ -26,17 +31,19 @@ def trim_top(image):
solid = True
for x in range(image.size[0]):
pixel = image.getpixel((x, y))
#print(pixel)
if not close_enough(letterbox_color, pixel):
solid = False
#print(y,pixel)
break
if not solid:
break
bounds = (0, y, image.size[0], image.size[1])
#print(bounds)
print(bounds)
image = image.crop(bounds)
return image
filenames = sys.argv[1:]
for filename in filenames:
deletterbox(filename)
deletterbox(filename)

View file

@ -3,4 +3,8 @@ Open Dir DL
Requires `pip install beautifulsoup4`
See inside opendirdl.py for usage instructions.
See inside opendirdl.py for usage instructions.
2016 02 08
- Fixed bug where server:port urls did not create db files.
- Moved db commits to only happen at the end of a digest.

View file

@ -16,7 +16,6 @@ DIGEST:
-dv "x.db" | --databasename "x.db" : Use a custom database filename. By default, databases
are named after the web domain.
DOWNLOAD:
Download the files whose URLs are enabled in the database.
@ -74,6 +73,7 @@ import os
import ratelimiter
## import re
import requests
import shutil
import sqlite3
## import sys
## tkinter
@ -81,36 +81,48 @@ import urllib.parse
FILENAME_BADCHARS = '/\\:*?"<>|'
TERMINAL_WIDTH = shutil.get_terminal_size().columns
# When doing a basic scan, we will not send HEAD requests to URLs that end in these strings,
# because they're probably files.
# This isn't meant to be a comprehensive filetype library, but it covers enough of the
# typical opendir to speed things up.
SKIPPABLE_FILETYPES = [
'.aac',
'.avi',
'.bin',
'.bmp',
'.bz2',
'.epub',
'.exe',
'.db',
'.flac',
'.gif',
'.gz'
'.gz',
'.ico',
'.iso',
'.jpeg',
'.jpg',
'.m3u',
'.m4a',
'.m4v',
'.mka',
'.mkv',
'.mov',
'.mp3',
'.mp4',
'.nfo',
'.ogg',
'.ott',
'.pdf',
'.png',
'.rar',
'.srt',
'.tar',
'.ttf',
'.txt',
'.webm',
'.wma',
'.zip',
]
SKIPPABLE_FILETYPES = set(x.lower() for x in SKIPPABLE_FILETYPES)
@ -227,9 +239,10 @@ class Walker:
if walkurl[-1] != '/':
walkurl += '/'
self.walkurl = walkurl
if databasename is None:
if databasename is None or databasename == "":
self.domain = url_to_filepath(walkurl)[0]
databasename = self.domain + '.db'
databasename = databasename.replace(':', '')
self.databasename = databasename
self.sql = sqlite3.connect(self.databasename)
@ -292,7 +305,7 @@ class Walker:
if not url.startswith(self.walkurl):
# Don't follow external links or parent directory.
print('Skipping "%s" due to external url.' % url)
safeprint('Skipping "%s" due to external url.' % url)
return
urll = url.lower()
@ -300,7 +313,7 @@ class Walker:
skippable = any(urll.endswith(ext) for ext in SKIPPABLE_FILETYPES)
if skippable:
safeprint('Skipping "%s" due to extension.' % url)
self.smart_insert(url=url)
self.smart_insert(url=url, commit=False)
return
self.cur.execute('SELECT * FROM urls WHERE url == ?', [url])
skippable = self.cur.fetchone() is not None
@ -335,13 +348,22 @@ class Walker:
print('Queued %d urls' % added)
else:
# This is not an index page, so save it.
self.smart_insert(head=head)
self.smart_insert(head=head, commit=False)
def walk(self, url=None):
self.queue.append(url)
while len(self.queue) > 0:
url = self.queue.pop(0)
self.process_url(url)
try:
while len(self.queue) > 0:
# Popping from right helps keep the queue short because it handles the files
# early.
url = self.queue.pop(-1)
self.process_url(url)
line = '{:,} Remaining'.format(len(self.queue))
print(line)
except:
self.sql.commit()
raise
self.sql.commit()
## ##
## WALKER ##########################################################################################
@ -384,7 +406,7 @@ def do_request(message, method, url):
safeprint(message, end='')
sys.stdout.flush()
response = method(url)
safeprint(response)
safeprint(response.status_code)
response.raise_for_status()
return response
@ -619,16 +641,24 @@ def list_basenames(args):
cur = sql.cursor()
cur.execute('SELECT basename FROM urls WHERE do_download == 1 ORDER BY LENGTH(basename) DESC LIMIT 1')
longest = len(cur.fetchone()[0])
cur.execute('SELECT * FROM urls WHERE do_download == 1 ORDER BY basename')
form = '{bn:<%ds} : {url}' % longest
fetch = cur.fetchone()
if fetch is None:
return
longest = len(fetch[0])
cur.execute('SELECT * FROM urls WHERE do_download == 1 ORDER BY LOWER(basename)')
form = '{bn:<%ds} : {url} : {byt}' % longest
if outputfile:
outputfile = open(outputfile, 'w', encoding='utf-8')
while True:
fetch = cur.fetchone()
if fetch is None:
break
line = form.format(bn=fetch[SQL_BASENAME], url=fetch[SQL_URL])
byt = fetch[SQL_CONTENT_LENGTH]
if byt is None:
byt = ''
else:
byt = '{:,}'.format(byt)
line = form.format(bn=fetch[SQL_BASENAME], url=fetch[SQL_URL], byt=byt)
if outputfile:
outputfile.write(line + '\n')
else:

View file

@ -6,11 +6,14 @@ import sqlite3
import string
import warnings
# UIDs consist of hex characters, so keyspace is 16 ** UID_CHARACTERS.
UID_CHARACTERS = 16
ID_LENGTH = 22
VALID_TAG_CHARS = string.ascii_lowercase + string.digits + '_-'
MAX_TAG_NAME_LENGTH = 32
SQL_LASTID_COLUMNCOUNT = 2
SQL_LASTID_TAB = 0
SQL_LASTID_ID = 1
SQL_PHOTO_COLUMNCOUNT = 8
SQL_PHOTO_ID = 0
SQL_PHOTO_FILEPATH = 1
@ -56,6 +59,10 @@ CREATE TABLE IF NOT EXISTS tag_synonyms(
name TEXT,
mastername TEXT
);
CREATE TABLE IF NOT EXISTS id_numbers(
tab TEXT,
last_id TEXT
);
CREATE INDEX IF NOT EXISTS index_photo_id on photos(id);
CREATE INDEX IF NOT EXISTS index_photo_path on photos(filepath);
CREATE INDEX IF NOT EXISTS index_photo_created on photos(created);
@ -69,6 +76,15 @@ CREATE INDEX IF NOT EXISTS index_tagrel_tagid on photo_tag_rel(tagid);
CREATE INDEX IF NOT EXISTS index_tagsyn_name on tag_synonyms(name);
'''
def assert_lower(*args):
previous = args[0]
for element in args[1:]:
if element is None:
continue
if element < previous:
raise ValueError('Min and Max out of order')
previous = element
def basex(number, base, alphabet='0123456789abcdefghijklmnopqrstuvwxyz'):
'''
Converts an integer to a different base string.
@ -93,6 +109,13 @@ def basex(number, base, alphabet='0123456789abcdefghijklmnopqrstuvwxyz'):
based = alphabet[i] + based
return sign + based
def fetch_generator(cursor):
while True:
fetch = cursor.fetchone()
if fetch is None:
break
yield fetch
def getnow(timestamp=True):
'''
Return the current UTC timestamp or datetime object.
@ -102,12 +125,18 @@ def getnow(timestamp=True):
return now.timestamp()
return now
def is_xor(x, y):
def is_xor(*args):
'''
Return True if and only if one of (x, y) is truthy.
Return True if and only if one arg is truthy.
'''
same = (bool(x) == bool(y))
return not same
return [bool(a) for a in args].count(True) == 1
def min_max_query_builder(name, sign, value):
if value is None:
return
value = str(int(value))
name = normalize_tagname(name)
return ' '.join([name, sign, value])
def normalize_tagname(tagname):
'''
@ -121,31 +150,31 @@ def normalize_tagname(tagname):
tagname = (c for c in tagname if c in VALID_TAG_CHARS)
tagname = ''.join(tagname)
if len(tagname) == 0:
raise ValueError('Normalized tagname of length 0.')
raise TagTooShort(tagname)
if len(tagname) > MAX_TAG_NAME_LENGTH:
raise TagTooLong(tagname)
return tagname
def not_implemented(function):
'''
Great for keeping track of which functions still need to be filled out.
Decorator for keeping track of which functions still need to be filled out.
'''
warnings.warn('%s is not implemented' % function.__name__)
return function
def uid(length=None):
'''
Generate a u-random hex string..
'''
if length is None:
length = UID_CHARACTERS
identifier = ''.join('{:02x}'.format(x) for x in os.urandom(math.ceil(length / 2)))
if len(identifier) > length:
identifier = identifier[:length]
return identifier
def raise_nosuchtag(tagid=None, tagname=None, comment=''):
if tagid is not None:
message = 'ID: %s. %s' % (tagid, comment)
elif tagname is not None:
message = 'Name: %s. %s' % (tagname, comment)
raise NoSuchTag(message)
class NoSuchPhoto(Exception):
pass
class NoSuchSynonym(Exception):
pass
class NoSuchTag(Exception):
pass
@ -155,6 +184,12 @@ class PhotoExists(Exception):
class TagExists(Exception):
pass
class TagTooLong(Exception):
pass
class TagTooShort(Exception):
pass
class XORException(Exception):
pass
@ -187,66 +222,131 @@ class PhotoDB:
Note that the entries in this table do not contain ID numbers.
The rationale here is that "coco" is a synonym for "chocolate" regardless
of the "chocolate" tag's ID, or the fact that you decided to rename your
"chocolate" tag to "candy" after applying it to a few photos.
of the "chocolate" tag's ID, and that if a tag is renamed, its synonyms
do not necessarily follow.
The `rename_tag` method includes a parameter `apply_to_synonyms` if you do
want them to follow.
'''
def __init__(self, databasename='phototagger.db'):
def __init__(self, databasename='phototagger.db', id_length=None):
if id_length is None:
self.id_length = ID_LENGTH
self.databasename = databasename
self.sql = sqlite3.connect(databasename)
self.cur = self.sql.cursor()
statements = DB_INIT.split(';')
for statement in statements:
self.cur.execute(statement)
self._last_ids = {}
def __repr__(self):
return 'PhotoDB(databasename={dbname})'.format(dbname=repr(self.databasename))
def add_photo_tag(self, photoid, tag=None, commit=True):
def apply_photo_tag(self, photoid, tagid=None, tagname=None, commit=True):
'''
Apply a tag to a photo. `tag` may be the name of the tag or a Tag
object from the same PhotoDB.
`tag` may NOT be the tag's ID, since an ID would also have been a valid name.
`tag` may NOT be the tag's ID, since we can't tell if a given string is
an ID or a name.
Returns True if the tag was applied, False if the photo already had this tag.
Raises NoSuchTag and NoSuchPhoto as appropriate.
'''
if isinstance(tag, Tag) and tag.photodb is self:
tagid = tag.id
else:
tag = self.get_tag_by_name(tag)
if tag is None:
raise NoSuchTag(tag)
tagid = tag.id
tag = self.get_tag(tagid=tagid, tagname=tagname, resolve_synonyms=True)
if tag is None:
raise_nosuchtag(tagid=tagid, tagname=tagname)
self.cur.execute('SELECT * FROM photos WHERE id == ?', [photoid])
if self.cur.fetchone() is None:
raise NoSuchPhoto(photoid)
self.cur.execute('SELECT * FROM photo_tag_rel WHERE photoid == ? AND tagid == ?', [photoid, tagid])
self.cur.execute('SELECT * FROM photo_tag_rel WHERE photoid == ? AND tagid == ?', [photoid, tag.id])
if self.cur.fetchone() is not None:
warning = 'Photo {photoid} already has tag {tagid}'.format(photoid=photoid, tagid=tagid)
warnings.warn(warning)
return
return False
self.cur.execute('INSERT INTO photo_tag_rel VALUES(?, ?)', [photoid, tagid])
self.cur.execute('INSERT INTO photo_tag_rel VALUES(?, ?)', [photoid, tag.id])
if commit:
self.sql.commit()
return True
@not_implemented
def convert_tag_to_synonym(self, tagname, mastertag):
def convert_tag_to_synonym(self, oldtagname, mastertagname):
'''
Convert an independent tag into a synonym for a different tag.
Convert an independent tag into a synonym for a different independent tag.
All photos which possess the current tag will have it replaced
with the master tag.
with the master tag. All synonyms of the old tag will point to the new tag.
Good for when two tags need to be merged under a single name.
'''
photos = self.get_photos_by_tag(musts=[tagname])
oldtagname = normalize_tagname(oldtagname)
mastertagname = normalize_tagname(mastertagname)
oldtag = self.get_tag_by_name(oldtagname, resolve_synonyms=False)
if oldtag is None:
raise NoSuchTag(oldtagname)
mastertag = self.get_tag_by_name(mastertagname, resolve_synonyms=False)
if mastertag is None:
raise NoSuchTag(mastertagname)
# Migrate the old tag's synonyms to the new one
# UPDATE is safe for this operation because there is no chance of duplicates.
self.cur.execute('UPDATE tag_synonyms SET mastername = ? WHERE mastername == ?', [mastertagname, oldtagname])
# Iterate over all photos with the old tag, and relate them to the new tag
# if they aren't already.
temp_cur = self.sql.cursor()
temp_cur.execute('SELECT * FROM photo_tag_rel WHERE tagid == ?', [oldtag.id])
for relationship in fetch_generator(temp_cur):
photoid = relationship[SQL_PHOTOTAG_PHOTOID]
self.cur.execute('SELECT * FROM photo_tag_rel WHERE tagid == ?', [mastertag.id])
if self.cur.fetchone() is not None:
continue
self.cur.execute('INSERT INTO photo_tag_rel VALUES(?, ?)', [photoid, mastertag.id])
# Then delete the relationships with the old tag
self.cur.execute('DELETE FROM photo_tag_rel WHERE tagid == ?', [oldtag.id])
self.cur.execute('DELETE FROM tags WHERE id == ?', [oldtag.id])
# Enjoy your new life as a monk.
self.new_tag_synonym(oldtag.name, mastertag.name, commit=False)
self.sql.commit()
def generate_id(self, table):
'''
Create a new ID number that is unique to the given table.
Note that this method does not commit the database. We'll wait for that
to happen in whoever is calling us, so we know the ID is actually used.
'''
table = table.lower()
if table not in ['photos', 'tags']:
raise ValueError('Invalid table requested: %s.', table)
do_update = False
if table in self._last_ids:
# Use cache value
new_id = self._last_ids[table] + 1
do_update = True
else:
self.cur.execute('SELECT * FROM id_numbers WHERE tab == ?', [table])
fetch = self.cur.fetchone()
if fetch is None:
# Register new value
new_id = 1
else:
# Use database value
new_id = int(fetch[SQL_LASTID_ID]) + 1
do_update = True
new_id_s = str(new_id).rjust(self.id_length, '0')
if do_update:
self.cur.execute('UPDATE id_numbers SET last_id = ? WHERE tab == ?', [new_id_s, table])
else:
self.cur.execute('INSERT INTO id_numbers VALUES(?, ?)', [table, new_id_s])
self._last_ids[table] = new_id
return new_id_s
def get_photo_by_id(self, photoid):
'''
Return this Photo object, or None if it does not exist.
'''
self.cur.execute('SELECT * FROM photos WHERE id == ?', [photoid])
photo = cur.fetchone()
photo = self.cur.fetchone()
if photo is None:
return None
photo = self.tuple_to_photo(photo)
@ -264,50 +364,112 @@ class PhotoDB:
photo = self.tuple_to_photo(photo)
return photo
def get_photos_by_recent(self):
def get_photos_by_recent(self, count=None):
'''
Yield photo objects in order of creation time.
'''
if count is not None and count <= 0:
return
# We're going to use a second cursor because the first one may
# get used for something else, deactivating this query.
cur2 = self.sql.cursor()
cur2.execute('SELECT * FROM photos ORDER BY created DESC')
temp_cur = self.sql.cursor()
temp_cur.execute('SELECT * FROM photos ORDER BY created DESC')
while True:
f = cur2.fetchone()
f = temp_cur.fetchone()
if f is None:
return
photo = self.tuple_to_photo(f)
yield photo
if count is None:
continue
count -= 1
if count <= 0:
return
@not_implemented
def get_photos_by_tag(
def get_photos_by_search(
self,
musts=None,
mays=None,
forbids=None,
forbid_unspecified=False,
extension=None,
maximums={},
minimums={},
tag_musts=None,
tag_mays=None,
tag_forbids=None,
tag_forbid_unspecified=False,
):
'''
Given one or multiple tags, yield photos possessing those tags.
Parameters:
musts :
A list of strings or Tag objects.
extension :
A string or list of strings of acceptable file extensions.
maximums :
A dictionary, where the key is an attribute of the photo,
(area, bytes, created, height, id, or width)
and the value is the maximum desired value for that field.
minimums :
A dictionary like `maximums` where the value is the minimum
desired value for that field.
tag_musts :
A list of tag names or Tag objects.
Photos MUST have ALL tags in this list.
mays :
A list of strings or Tag objects.
tag_mays :
A list of tag names or Tag objects.
If `forbid_unspecified` is True, then Photos MUST have AT LEAST ONE tag in this list.
If `forbid_unspecified` is False, then Photos MAY or MAY NOT have ANY tag in this list.
forbids :
A list of strings or Tag objects.
tag_forbids :
A list of tag names or Tag objects.
Photos MUST NOT have ANY tag in the list.
forbid_unspecified :
tag_forbid_unspecified :
True or False.
If False, Photos need only comply with the `musts`.
If True, Photos need to comply with both `musts` and `mays`.
If False, Photos need only comply with the `tag_musts`.
If True, Photos need to comply with both `tag_musts` and `tag_mays`.
'''
if all(arg is None for arg in (musts, mays, forbids)):
raise TypeError('All arguments cannot be None')
conditions = []
minmaxers = {'<=':maximums, '>=': minimums}
for (comparator, minmaxer) in minmaxers.items():
for (field, value) in minmaxer.items():
if field not in Photo.int_properties:
raise ValueError('Unknown Photo property: %s' % field)
query = min_max_query_builder(field, comparator, value)
conditions.append(query)
if extension is not None:
if isinstance(extension, str):
extension = [extension]
# Don't inject me bro
extension = [normalize_tagname(e) for e in extension]
extension = ['extension == "%s"' % e for e in extension]
extension = ' OR '.join(extension)
extension = '(%s)' % extension
conditions.append(extension)
conditions = [query for query in conditions if query is not None]
if len(conditions) == 0:
raise ValueError('No search query provided')
conditions = ' AND '.join(conditions)
print(conditions)
query = 'SELECT * FROM photos WHERE %s' % conditions
def get_tag(self, tagid=None, tagname=None, resolve_synonyms=True):
'''
Redirect to get_tag_by_id or get_tag_by_name after xor-checking the parameters.
'''
if not is_xor(tagid, tagname):
raise XORException('One and only one of `tagid`, `tagname` can be passed.')
if tagid is not None:
return self.get_tag_by_id(tagid)
elif tagname is not None:
return self.get_tag_by_name(tagname, resolve_synonyms=resolve_synonyms)
return None
def get_tag_by_id(self, tagid):
self.cur.execute('SELECT * FROM tags WHERE id == ?', [tagid])
@ -317,21 +479,19 @@ class PhotoDB:
tag = self.tuple_to_tag(tag)
return tag
def get_tag_by_name(self, tagname):
'''
Return the Tag object that the given tagname resolves to.
If the given tagname is a synonym, the master tag will be returned.
'''
def get_tag_by_name(self, tagname, resolve_synonyms=True):
if isinstance(tagname, Tag):
return tagname
self.cur.execute('SELECT * FROM tag_synonyms WHERE name == ?', [tagname])
fetch = self.cur.fetchone()
if fetch is not None:
mastertagid = fetch[SQL_SYN_MASTER]
tag = self.get_tag_by_id(mastertagid)
return tag
tagname = normalize_tagname(tagname)
if resolve_synonyms is True:
self.cur.execute('SELECT * FROM tag_synonyms WHERE name == ?', [tagname])
fetch = self.cur.fetchone()
if fetch is not None:
mastertagname = fetch[SQL_SYN_MASTER]
tag = self.get_tag_by_name(mastertagname)
return tag
self.cur.execute('SELECT * FROM tags WHERE name == ?', [tagname])
fetch = self.cur.fetchone()
@ -345,15 +505,13 @@ class PhotoDB:
'''
Return the tags assigned to the given photo.
'''
self.cur.execute('SELECT * FROM photo_tag_rel WHERE photoid == ?', [photoid])
tags = self.cur.fetchall()
temp_cur = self.sql.cursor()
temp_cur.execute('SELECT * FROM photo_tag_rel WHERE photoid == ?', [photoid])
tags = fetch_generator(temp_cur)
tagobjects = []
for tag in tags:
tagid = tag[SQL_PHOTOTAG_TAGID]
tagobj = self.get_tag_by_id(tagid)
if tagobj is None:
warnings.warn('Photo {photid} contains unkown tagid {tagid}'.format(photoid=photoid, tagid=tagid))
continue
tagobjects.append(tagobj)
return tagobjects
@ -364,6 +522,8 @@ class PhotoDB:
If `allow_duplicates` is False, we will first check the database for any files
with the same path and raise PhotoExists if found.
Returns the Photo object.
'''
filename = os.path.abspath(filename)
if not allow_duplicates:
@ -376,11 +536,13 @@ class PhotoDB:
extension = os.path.splitext(filename)[1]
extension = extension.replace('.', '')
extension = normalize_tagname(extension)
(width, height) = image.size
area = width * height
bytes = os.path.getsize(filename)
created = int(getnow())
photoid = self.new_uid('photos')
photoid = self.generate_id('photos')
data = [None] * SQL_PHOTO_COLUMNCOUNT
data[SQL_PHOTO_ID] = photoid
data[SQL_PHOTO_FILEPATH] = filename
@ -391,30 +553,32 @@ class PhotoDB:
data[SQL_PHOTO_BYTES] = bytes
data[SQL_PHOTO_CREATED] = created
photo = self.tuple_to_photo(data)
self.cur.execute('INSERT INTO photos VALUES(?, ?, ?, ?, ?, ?, ?, ?)', data)
for tag in tags:
try:
self.add_photo_tag(photoid, tag, commit=False)
self.apply_photo_tag(photoid, tagname=tag, commit=False)
except NoSuchTag:
self.sql.rollback()
raise
self.sql.commit()
image.close()
return photo
def new_tag(self, tagname):
'''
Register a new tag.
Register a new tag in the database and return the Tag object.
'''
tagname = normalize_tagname(tagname)
if self.get_tag_by_name(tagname) is not None:
raise TagExists(tagname)
tagid = self.new_uid('tags')
tagid = self.generate_id('tags')
self.cur.execute('INSERT INTO tags VALUES(?, ?)', [tagid, tagname])
self.sql.commit()
tag = self.tuple_to_tag([tagid, tagname])
return tag
def new_tag_synonym(self, tagname, mastertagname):
def new_tag_synonym(self, tagname, mastertagname, commit=True):
'''
Register a new synonym for an existing tag.
'''
@ -422,69 +586,79 @@ class PhotoDB:
mastertagname = normalize_tagname(mastertagname)
if tagname == mastertagname:
raise TagExists(tagname)
raise ValueError('Cannot assign synonym to itself.')
tag = self.get_tag_by_name(tagname)
# We leave resolve_synonyms as True, so that if this function returns
# anything, we know the given tagname is already a synonym or master.
tag = self.get_tag_by_name(tagname, resolve_synonyms=True)
if tag is not None:
raise TagExists(tagname)
mastertag = self.get_tag_by_name(mastertagname)
mastertag = self.get_tag_by_name(mastertagname, resolve_synonyms=True)
if mastertag is None:
raise NoSuchTag(mastertagname)
mastertagname = mastertag.name
self.cur.execute('INSERT INTO tag_synonyms VALUES(?, ?)', [tagname, mastertagname])
self.cur.execute('INSERT INTO tag_synonyms VALUES(?, ?)', [tagname, mastertag.name])
if commit:
self.sql.commit()
return mastertag
def photo_has_tag(self, photoid, tagid=None, tagname=None):
tag = self.get_tag(tagid=tagid, tagname=tagname, resolve_synonyms=True)
if tag is None:
raise_nosuchtag(tagid=tagid, tagname=tagname)
exe = self.cur.execute
exe('SELECT * FROM photo_tag_rel WHERE photoid == ? AND tagid == ?', [photoid, tag.id])
fetch = self.cur.fetchone()
return fetch is not None
def remove_photo(self, photoid):
'''
Delete a photo and its relation to any tags.
'''
photo = self.get_photo_by_id(photoid)
if photo is None:
raise NoSuchPhoto(photoid)
self.cur.execute('DELETE FROM photos WHERE id == ?', [photoid])
self.cur.execute('DELETE FROM photo_tag_rel WHERE photoid == ?', [photoid])
self.sql.commit()
def new_uid(self, table):
'''
Create a new UID that is unique to the given table.
'''
result = None
# Well at least we won't get sql injection this way.
table = normalize_tagname(table)
query = 'SELECT * FROM {table} WHERE id == ?'.format(table=table)
while result is None:
i = uid()
# Just gotta be sure, man.
self.cur.execute(query, [i])
if self.cur.fetchone() is None:
result = i
return result
@not_implemented
def remove_photo(self):
pass
@not_implemented
def remove_tag(self, tagid=None, tagname=None):
'''
Delete a tag and its relation to any photos.
Delete a tag, its synonyms, and its relation to any photos.
'''
if not is_xor(tagid, tagname):
raise XORException('One and only one of `tagid`, `tagname` can be passed.')
if tagid is not None:
self.cur.execute('SELECT * FROM tags WHERE id == ?', [tagid])
tag = self.cur.fetchone()
elif tagname is not None:
tagname = normalize_tagname(tagname)
self.cur.execute('SELECT * from tags WHERE name == ?', [tagname])
tag = self.cur.fetchone()
tag = self.get_tag(tagid=tagid, tagname=tagname, resolve_synonyms=False)
if tag is None:
raise NoSuchTag(tagid or tagname)
tag = self.tuple_to_tag(tag)
message = 'Is it a synonym?'
raise_nosuchtag(tagid=tagid, tagname=tagname, comment=message)
self.cur.execute('DELETE FROM tags WHERE id == ?', [tag.id])
self.cur.execute('DELETE FROM photo_tag_rel WHERE tagid == ?', [tag.id])
self.cur.execute('DELETE FROM tag_synonyms WHERE mastername == ?', [tag.name])
self.sql.commit()
@not_implemented
def remove_tag_synonym(self, tagname):
'''
Delete a tag synonym.
This will have no effect on photos or other synonyms because
they always resolve to the master tag before application.
'''
tagname = normalize_tagname(tagname)
self.cur.execute('SELECT * FROM tag_synonyms WHERE name == ?', [tagname])
fetch = self.cur.fetchone()
if fetch is None:
raise NoSuchSynonym(tagname)
self.cur.execute('DELETE FROM tag_synonyms WHERE name == ?', [tagname])
self.sql.commit()
@not_implemented
def rename_tag(self, tagname, newname, apply_to_synonyms):
pass
def tuple_to_photo(self, tu):
@ -527,6 +701,7 @@ class Photo:
Photo objects cannot exist without a corresponding PhotoDB object, because
Photos are not the actual files, just the database entry.
'''
int_properties = set(['area', 'bytes', 'created', 'height', 'id', 'width'])
def __init__(
self,
photodb,
@ -587,8 +762,11 @@ class Photo:
def __str__(self):
return 'Photo: %s' % self.id
def add_photo_tag(self, tagname):
return self.photodb.add_photo_tag(self.id, tagname, commit=True)
def apply_photo_tag(self, tagname):
return self.photodb.apply_photo_tag(self.id, tagname=tagname, commit=True)
def photo_has_tag(self, tagname):
return self.photodb.photo_has_tag(self.id, tagname=tagname)
class Tag:
'''

View file

@ -1,45 +0,0 @@
import os
import phototagger
import unittest
DB_NAME = ':memory:'
#try:
# os.remove(DB_NAME)
# print('Deleted old database.')
#except FileNotFound:
# pass
class PhotoDBTest(unittest.TestCase):
def setUp(self):
self.p = phototagger.PhotoDB(DB_NAME)
def tearDown(self):
pass
def test_add_and_remove_tag(self):
tag = self.p.new_tag('trains')
self.assertEqual(tag.name, 'trains')
self.assertEqual(len(tag.id), phototagger.UID_CHARACTERS)
tag2 = self.p.get_tag_by_id(tag.id)
self.assertEqual(tag, tag2)
tag3 = self.p.get_tag_by_name(tag.name)
self.assertEqual(tag, tag3)
self.assertEqual(tag2, tag3)
self.p.remove_tag(tagid=tag.id)
tag4 = self.p.get_tag_by_id(tag.id)
self.assertIsNone(tag4)
def test_new_tag_invalid_name(self):
print('NOT IMPLEMENTED')
def test_new_tag_too_long(self):
print('NOT IMPLEMENTED')
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,206 @@
import os
import phototagger
import unittest
DB_NAME = ':memory:'
class PhotoDBTest(unittest.TestCase):
def setUp(self):
self.p = phototagger.PhotoDB(DB_NAME)
def tearDown(self):
pass
def test_add_and_remove_photo(self):
self.setUp()
photo1 = self.p.new_photo('samples\\train.jpg')
self.assertEqual(len(photo1.id), self.p.id_length)
photo2 = self.p.get_photo_by_id(photo1.id)
self.assertEqual(photo1, photo2)
self.p.remove_photo(photo1.id)
photo3 = self.p.get_photo_by_id(photo1.id)
self.assertIsNone(photo3)
def test_add_and_remove_tag(self):
tag1 = self.p.new_tag('trains')
self.assertEqual(tag1.name, 'trains')
self.assertEqual(len(tag1.id), self.p.id_length)
tag2 = self.p.get_tag_by_id(tag1.id)
self.assertEqual(tag1, tag2)
self.p.remove_tag(tagid=tag1.id)
tag3 = self.p.get_tag_by_id(tag1.id)
self.assertIsNone(tag3)
# Normalization
tag = self.p.new_tag('one two!')
self.assertEqual(tag.name, 'one_two')
def test_add_and_remove_synonym(self):
self.setUp()
# Add synonym
giraffe = self.p.new_tag('giraffe')
horse = self.p.new_tag_synonym('long horse', 'giraffe')
tag = self.p.get_tag_by_name('long horse', resolve_synonyms=True)
self.assertEqual(tag, giraffe)
# Synonym of synonym should resolve to master
snake = self.p.new_tag_synonym('snake with legs', 'long horse')
tag = self.p.get_tag_by_name('snake with legs')
self.assertEqual(tag, giraffe)
# Remove Tag
self.p.remove_tag_synonym('long horse')
horse = self.p.get_tag_by_name('long horse')
self.assertIsNone(horse)
# Exceptions
self.assertRaises(phototagger.NoSuchTag, self.p.new_tag_synonym, 'blanc', 'white')
self.assertRaises(phototagger.NoSuchSynonym, self.p.remove_tag_synonym, 'blanc')
def test_apply_photo_tag(self):
self.setUp()
photo = self.p.new_photo('samples\\train.jpg')
self.p.new_tag('vehicles')
# Should only return True if it is a new tag.
status = self.p.apply_photo_tag(photo.id, tagname='vehicles')
self.assertTrue(status)
status = self.p.apply_photo_tag(photo.id, tagname='vehicles')
self.assertFalse(status)
def test_convert_tag_synonym(self):
self.setUp()
# Install tags and a synonym
photo = self.p.new_photo('samples\\train.jpg')
trains = self.p.new_tag('trains')
locomotives = self.p.new_tag('locomotives')
choochoos = self.p.new_tag_synonym('choochoos', 'locomotives')
# The first two, as independents, return True.
self.assertTrue(self.p.apply_photo_tag(photo.id, trains.id))
self.assertTrue(self.p.apply_photo_tag(photo.id, locomotives.id))
self.assertFalse(self.p.apply_photo_tag(photo.id, tagname='choochoos'))
# Pre-conversion, they should be independent.
trains = self.p.get_tag_by_name('trains', resolve_synonyms=False)
locomotives = self.p.get_tag_by_name('locomotives', resolve_synonyms=False)
self.assertNotEqual(trains, locomotives)
trains_id = trains.id
# Convert and make sure the second is no longer independent.
self.p.convert_tag_to_synonym(oldtagname='locomotives', mastertagname='trains')
trains = self.p.get_tag_by_name('trains', resolve_synonyms=False)
locomotives = self.p.get_tag_by_name('locomotives', resolve_synonyms=False)
self.assertIsNone(locomotives)
self.assertEqual(trains.id, trains_id)
# The old tag should still pass has_tag as a synonym.
# The synonym of the old tag should have been remapped to the master.
self.assertTrue(self.p.photo_has_tag(photo.id, tagname='trains'))
self.assertTrue(self.p.photo_has_tag(photo.id, tagname='locomotives'))
self.assertTrue(self.p.photo_has_tag(photo.id, tagname='choochoos'))
# Synonym should not be included in the photo's tag list.
tags = list(self.p.get_tags_by_photo(photo.id))
self.assertEqual(len(tags), 1)
self.assertEqual(tags[0].id, trains_id)
def test_generate_id(self):
self.setUp()
i_photo = self.p.generate_id('photos')
i_tag = self.p.generate_id('tags')
self.assertRaises(ValueError, self.p.generate_id, 'other')
self.assertEqual(len(i_photo), self.p.id_length)
self.assertEqual(len(i_tag), self.p.id_length)
self.assertEqual(int(i_photo), int(i_tag))
self.assertLess(int(i_photo), int(self.p.generate_id('photos')))
def test_get_photo_by_id(self):
self.setUp()
photo = self.p.new_photo('samples\\train.jpg')
photo2 = self.p.get_photo_by_id(photo.id)
self.assertEqual(photo, photo2)
def test_get_photo_by_path(self):
self.setUp()
photo = self.p.new_photo('samples\\train.jpg')
photo2 = self.p.get_photo_by_path(photo.filepath)
self.assertEqual(photo, photo2)
def test_get_photos_by_recent(self):
self.setUp()
paths = ['train.jpg', 'bolts.jpg', 'reddit.png']
paths = ['samples\\' + path for path in paths]
paths = [os.path.abspath(path) for path in paths]
for path in paths:
self.p.new_photo(path)
photos = list(self.p.get_photos_by_recent())
paths.reverse()
for (index, photo) in enumerate(photos):
self.assertEqual(photo.filepath, paths[index])
photos = list(self.p.get_photos_by_recent(count=2))
self.assertEqual(len(photos), 2)
def test_get_photos_by_search(self):
print('NOT IMPLEMENTED')
def test_get_tag_by_id(self):
tag1 = self.p.new_tag('test by id')
tag2 = self.p.get_tag_by_id(tag1.id)
self.assertEqual(tag1, tag2)
tag2 = self.p.get_tag(tagid=tag1.id)
self.assertEqual(tag1, tag2)
def test_get_tag_by_name(self):
tag1 = self.p.new_tag('test by name')
tag2 = self.p.get_tag_by_name(tag1.name)
self.assertEqual(tag1, tag2)
tag2 = self.p.get_tag(tagname=tag1.name)
self.assertEqual(tag1, tag2)
def test_get_tags_by_photo(self):
self.setUp()
photo = self.p.new_photo('samples\\train.jpg')
tag = self.p.new_tag('vehicles')
stat = self.p.apply_photo_tag(photo.id, tagname='vehicles')
tags = self.p.get_tags_by_photo(photo.id)
self.assertEqual(tags[0].name, 'vehicles')
def test_new_tag_lengths(self):
t = 'x' * (phototagger.MAX_TAG_NAME_LENGTH)
self.p.new_tag(t)
self.assertRaises(phototagger.TagTooLong, self.p.new_tag, t+'x')
self.assertRaises(phototagger.TagTooShort, self.p.new_tag, '')
self.assertRaises(phototagger.TagTooShort, self.p.new_tag, '!!??&&*')
def test_photo_has_tag(self):
self.setUp()
photo = self.p.new_photo('samples\\train.jpg')
tag = self.p.new_tag('vehicles')
self.p.apply_photo_tag(photo.id, tag.id)
self.p.photo_has_tag(photo.id, tag.id)
def test_rename_tag(self):
print('NOT IMPLEMENTED')
if __name__ == '__main__':
unittest.main()

View file

@ -1,6 +1,4 @@
Spinal
========
spinal.py is a couple of tools for copying files and directories
spinal_client.py is a wip tkinter interface for creating and running backup configurations. I probably won't finish it.
A couple of tools for copying files and directories.

3
SpinalTap/backups.py Normal file
View file

@ -0,0 +1,3 @@
import spinal
spinal.copy_dir('C:\\git', destination_new_root='G:\\voussoir', callback_file=spinal.callback_v1)

56
SpinalTap/ratelimiter.py Normal file
View file

@ -0,0 +1,56 @@
import time
class Ratelimiter:
def __init__(self, allowance_per_period, period, operation_cost=1, mode='sleep'):
'''
allowance_per_period:
The number of operations we can perform per `period` seconds.
period:
The number of seconds over which we can perform `allowance_per_period` operations.
operation_cost:
The default amount to remove from our balance after each operation.
Pass a `cost` parameter to `self.limit` to use a nondefault value.
mode:
'sleep': If we do not have the balance for an operation, sleep until we do.
Return True every time.
'reject': If we do not have the balance for an operation, return False.
'''
if mode not in ('sleep', 'reject'):
raise ValueError('Invalid mode %s' % repr(mode))
self.allowance_per_period = allowance_per_period
self.period = period
self.operation_cost = operation_cost
self.mode = mode
self.last_operation = time.time()
self.balance = 0
self.gain_rate = allowance_per_period / period
def limit(self, cost=None):
if cost is None:
cost = self.operation_cost
timediff = time.time() - self.last_operation
self.balance += timediff * self.gain_rate
self.balance = min(self.balance, self.allowance_per_period)
successful = False
deficit = cost - self.balance
if deficit > 0 and self.mode == 'sleep':
time_needed = (deficit / self.gain_rate)
#print(self.balance, deficit, 'Need to sleep %f' % time_needed)
time.sleep(time_needed)
self.balance = cost
#print(self.balance)
if self.balance >= cost:
#print('pass')
self.balance -= cost
successful = True
self.last_operation = time.time()
return successful

View file

@ -1,260 +1,467 @@
'''
-----:::
...-----------:::::::::
............---------:::::::,,,,
..`````````.......--------:::::::,,,,,
.```````````````...:vv,------:::::::,,,,,"
..``````````````````-zz+z:------::::::,,,,,,""
....`````` `````-zzzz:------:::::::,,,,,,"""
.....````` `````xzzx.-------::::::,,,,,,""""
---..::-````` ````,zzzz".------:::::::,,,_~~""""_
-----xzzzJ?~-``` `````_Jzzzzz~------::::::"vJonnnT"""__
------L+zzzzzz?".`````.:~xzzzzz+++J/":--:::,;Jooonnnn+"""___
:------/z+zzzzzzzxL??xzzzzzzz+++++++TTzJJJ+ooooonnnoL""""___
:::-------;J++zzzzzzzzzzzzzzzxxxJ+++TTTTTTToooooonT?""""""____
:::::-------~L+++++++++++z/,------"v+TTTTTooooooz/","""""_____
::::::::-------,vz+++++++z,----------"+TTooooooL_,,,""""""______
,,:::::::::------:L++++++x---------:::JooooooJ",,,,""""""_______
,,,,::::::::::::---?TTTTTTv:---::::::vooooooL,,,,,""""""________
,,,,,,,::::::::::::~TTTTTTT+xv/;;/?xToooooon;,,,,""""""_________
,,,,,,,,,,:::::::,zooooooooooooooooooooonnn+",""""""__________
""",,,,,,,,,,,,,,zoooooooooooooooooonnnnnnnn+"""""____________
"""""",,,,,,,,,,ooooooooooooonnnnnnnnnnnnnnn_""_____________
""""""""""",,,,,+nnnnnnnnnnnnnnnnnnnnnnnnZZT"_______________
____"""""""""""vnnnnnnnnnnnnnnnnnnnnZZZZZZ?_______________
________"""""""znnnnnnnnnnnZZZZZZZZZZZZZz_______________
______________JZZZZZZZZZZZZZZZZZZZZZZz______________
______________/+ZZZZZZZZZZZZZeeeeZ+/______________
______________;xoeeeeeeeeeeeeox;______________
_______________~vxJz++zJxv~_______________
______________________________________
________________________________
_______________________
________
'''
import json
import os
import ratelimiter
import shutil
import sys
import time
BYTE = 1
KILOBYTE = BYTE * 1024
MEGABYTE = KILOBYTE * 1024
GIGABYTE = MEGABYTE * 1024
TERABYTE = GIGABYTE * 1024
KIBIBYTE = BYTE * 1024
MIBIBYTE = KIBIBYTE * 1024
GIBIBYTE = MIBIBYTE * 1024
TEBIBYTE = GIBIBYTE * 1024
CHUNKSIZE = 64 * KILOBYTE
CHUNK_SIZE = 64 * KIBIBYTE
# Number of bytes to read and write at a time
EXC_SRCNOTDIR = 'srcnotdir'
EXC_SRCNOTFILE = 'srcnotfle'
EXC_RECURDIR = 'recurdir'
# These strings will become the `description` attribute
# of a SpinalError when it is raised. Use it to determine
# what type of SpinalError has occured.
class DestinationIsDirectory(Exception):
pass
class DestinationIsFile(Exception):
pass
class RecursiveDirectory(Exception):
pass
class SourceNotDirectory(Exception):
pass
class SourceNotFile(Exception):
pass
class SpinalError(Exception):
def __init__(self, err, desc):
super(SpinalError, self)
self.description = desc
pass
def copyfile(src, dst, overwrite=True, callbackfunction=None):
class SpinalTask:
def __init__(self, kwargs):
self.kwargs = kwargs
if 'source_dir' in self.kwargs:
self.method = copy_dir
elif 'source' in self.kwargs:
self.method = copy_file
else:
raise ValueError('Task is neither a file copy or directory copy', kwargs)
def execute(self, default_kwargs=None):
if default_kwargs is None:
kwargs = self.kwargs
else:
kwargs = {}
kwargs.update(default_kwargs)
kwargs.update(self.kwargs)
self.method(**kwargs)
class SpinalTaskManager:
def __init__(self, default_kwargs=None):
self.tasks = []
if default_kwargs is not None:
self.default_kwargs = default_kwargs
else:
self.default_kwargs = {}
def execute(self):
while len(self.tasks) > 0:
task = self.tasks.pop(0)
task.execute(self.default_kwargs)
def callback_exclusion(name, path_type):
'''
Copy a file from src to dst.
src : the file to copy.
dst : the filename of the new copy.
overwrite : if True, copy src to dst even if
dst already exists.
else, do nothing.
default = True
callbackfunction : if provided, this function will be called
after writing each CHUNKSIZE bytes to dst
with three parameters:
name of file being copied,
number of bytes written so far,
total number of bytes needed.
default = None
RETURN : [dst filename, number of bytes written to dst]
Example of an exclusion callback function.
'''
print('Excluding', name)
src = os.path.abspath(src)
dst = os.path.abspath(dst)
def callback_v1(filename, written_bytes, total_bytes):
'''
Example of a copy callback function.
if not os.path.isfile(src):
raise SpinalError("Source file is not a file: %s" % src,
EXC_SRCNOTFILE)
Prints "filename written/total (percent%)"
'''
if written_bytes >= total_bytes:
ends = '\n'
else:
ends = ''
percent = (100 * written_bytes) / total_bytes
percent = '%03.3f' % percent
written = '{:,}'.format(written_bytes)
total = '{:,}'.format(total_bytes)
written = written.rjust(len(total), ' ')
status = '{filename} {written}/{total} ({percent}%)\r'
status = status.format(filename=filename, written=written, total=total, percent=percent)
print(status, end=ends)
sys.stdout.flush()
totalbytes = os.path.getsize(src)
dstexists = os.path.exists(dst)
if dstexists and overwrite is False:
if callbackfunction is not None:
callbackfunction(dst, totalbytes, totalbytes)
return [dst, totalbytes]
elif dstexists:
src_modtime = os.path.getmtime(src)
dst_modtime = os.path.getmtime(dst)
if src_modtime == dst_modtime:
if callbackfunction is not None:
callbackfunction(dst, totalbytes, totalbytes)
return [dst, totalbytes]
def copy_file(
source,
destination,
bytes_per_second=None,
callback=None,
dry_run=False,
overwrite_old=True,
):
'''
Copy a file from one place to another.
writtenbytes = 0
srcfile = open(src, 'rb')
dstfile = open(dst, 'wb')
source:
The file to copy.
destination:
The filename of the new copy.
bytes_per_second:
Restrict file copying to this many bytes per second. Can be an integer
or an existing Ratelimiter object.
The provided BYTE, KIBIBYTE, etc constants may help.
Default = None
callback:
If provided, this function will be called after writing
each CHUNK_SIZE bytes to destination with three parameters:
name of file being copied, number of bytes written so far,
total number of bytes needed.
Default = None
dry_run:
Do everything except the actual file copying.
Default = False
overwrite_old:
If True, overwrite the destination file if the source file
has a more recent "last modified" timestamp.
Default = True
Returns: [destination filename, number of bytes written to destination]
'''
# Prepare parameters
source = os.path.abspath(source)
destination = os.path.abspath(destination)
if not os.path.isfile(source):
raise SourceNotFile(source)
if os.path.isdir(destination):
raise DestinationIsDirectory(destination)
if isinstance(bytes_per_second, ratelimiter.Ratelimiter):
limiter = bytes_per_second
elif bytes_per_second is not None:
limiter = ratelimiter.Ratelimiter(allowance_per_period=bytes_per_second, period=1)
else:
limiter = None
source_bytes = os.path.getsize(source)
# Determine overwrite
destination_exists = os.path.exists(destination)
if destination_exists:
if overwrite_old is False:
return [destination, source_bytes]
source_modtime = os.path.getmtime(source)
destination_modtime = os.path.getmtime(destination)
if source_modtime == destination_modtime:
return [destination, source_bytes]
# Copy
if dry_run:
if callback is not None:
callback(destination, source_bytes, source_bytes)
return [destination, source_bytes]
written_bytes = 0
source_file = open(source, 'rb')
destionation_file = open(destination, 'wb')
while True:
filedata = srcfile.read(CHUNKSIZE)
datasize = len(filedata)
if datasize == 0:
data_chunk = source_file.read(CHUNK_SIZE)
data_bytes = len(data_chunk)
if data_bytes == 0:
break
dstfile.write(filedata)
writtenbytes += datasize
destionation_file.write(data_chunk)
written_bytes += data_bytes
if callbackfunction is not None:
callbackfunction(dst, writtenbytes, totalbytes)
if limiter is not None:
limiter.limit(data_bytes)
srcfile.close()
dstfile.close()
shutil.copystat(src, dst)
return [dst, writtenbytes]
if callback is not None:
callback(destination, written_bytes, source_bytes)
def copydir(srcdir, dstdir, overwrite=True, precalcsize=False,
callbackfunction=None, callbackfile=None):
# Fin
source_file.close()
destionation_file.close()
shutil.copystat(source, destination)
return [destination, written_bytes]
def copy_dir(
source_dir,
destination_dir=None,
destination_new_root=None,
bytes_per_second=None,
callback_directory=None,
callback_file=None,
dry_run=False,
exclude_directories=None,
exclude_filenames=None,
exclusion_callback=None,
overwrite_old=True,
precalcsize=False,
):
'''
Copy all of the contents from srcdir to dstdir,
Copy all of the contents from source_dir to destination_dir,
including subdirectories.
srcdir : the directory which will be copied.
source_dir:
The directory which will be copied.
dstdir : the directory in which copied files are placed.
destination_dir:
The directory in which copied files are placed. Alternatively, use
destination_new_root.
overwrite : if True, overwrite any files in dstdir with
the copies from srcdir should they already exist.
else, ignore them.
destination_new_root:
Determine the destination path by calling
`new_root(source_dir, destination_new_root)`.
Thus, this path acts as a root and the rest of the path is matched.
default = True
bytes_per_second:
Restrict file copying to this many bytes per second. Can be an integer
or an existing Ratelimiter object.
The provided BYTE, KIBIBYTE, etc constants may help.
precalcsize : if True, calculate the size of srcdir
before beginning the operation. This number
can be used in the callbackfunction.
else, callbackfunction will receive
written bytes as total bytes.
Default = None
default = False
callback_directory:
This function will be called after each file copy with three parameters:
name of file copied, number of bytes written to destination_dir so far,
total bytes needed (from precalcsize).
callbackfunction : if provided, this function will be called
after each file copy with three parameters:
name of file copied,
number of bytes written to dstdir so far,
total bytes needed (from precalcsize).
Default = None
default = None
callback_file:
Will be passed into each individual copy_file() as the `callback`
for that file.
callbackfile : will be passed into each individual copyfile() as
the callbackfunction for that file.
Default = None
default = None
dry_run:
Do everything except the actual file copying.
RETURN : [dstdir path, number of bytes written to dstdir]
Default = False
exclude_filenames:
A set of filenames that will not be copied. Entries can be absolute
paths to exclude that particular file, or plain names to exclude
all matches. For example:
{'C:\\folder\\file.txt', 'desktop.ini'}
Default = None
exclude_directories:
A set of directories that will not be copied. Entries can be
absolute paths to exclude that particular directory, or plain names
to exclude all matches. For example:
{'C:\\folder', 'thumbnails'}
Default = None
exclusion_callback:
This function will be called when a file or directory is excluded with
two parameters: the path, and 'file' or 'directory'.
Default = None
overwrite_old:
If True, overwrite the destination file if the source file
has a more recent "last modified" timestamp.
Default = True
precalcsize:
If True, calculate the size of source_dir before beginning the
operation. This number can be used in the callback_directory function.
Else, callback_directory will receive written bytes as total bytes
(showing 100% always).
This can take a long time.
Default = False
Returns: [destination_dir path, number of bytes written to destination_dir]
'''
srcdir = os.path.abspath(srcdir)
dstdir = os.path.abspath(dstdir)
# Prepare parameters
if not is_xor(destination_dir, destination_new_root):
m = 'One and only one of `destination_dir` and '
m += '`destination_new_root` can be passed'
raise ValueError(m)
if dstdir.startswith(srcdir):
raise SpinalError("Will not copy a dir into itself %s" % dstdir,
EXC_RECURDIR)
if destination_new_root is not None:
destination_dir = new_root(source_dir, destination_new_root)
if os.path.isfile(srcdir):
raise SpinalError("Destination dir is a file: %s" % dstdir,
EXC_SRCNOTDIR)
source_dir = os.path.normpath(os.path.abspath(source_dir))
destination_dir = os.path.normpath(os.path.abspath(destination_dir))
if is_subfolder(source_dir, destination_dir):
raise RecursiveDirectory(source_dir, destination_dir)
if not os.path.isdir(source_dir):
raise SourceNotDirectory(source_dir)
if os.path.isfile(destination_dir):
raise DestinationIsFile(destination_dir)
if exclusion_callback is None:
exclusion_callback = lambda *x: None
if exclude_filenames is None:
exclude_filenames = set()
if exclude_directories is None:
exclude_directories = set()
exclude_filenames = {normalize(f) for f in exclude_filenames}
exclude_directories = {normalize(f) for f in exclude_directories}
if precalcsize is True:
totalbytes = getdirsize(srcdir)
total_bytes = get_dir_size(source_dir)
else:
totalbytes = 0
total_bytes = 0
walker = os.walk(srcdir)
writtenbytes = 0
for step in walker:
# (path, [dirs], [files])
srcpath = step[0]
dstpath = srcpath.replace(srcdir, dstdir)
files = step[2]
if not os.path.exists(dstpath):
os.makedirs(dstpath)
for filename in files:
srcfile = os.path.join(srcpath, filename)
dstfile = os.path.join(dstpath, filename)
copied = copyfile(srcfile, dstfile, overwrite=overwrite,
callbackfunction=callbackfile)
copiedname = copied[0]
writtenbytes += copied[1]
if callbackfunction is None:
continue
if isinstance(bytes_per_second, ratelimiter.Ratelimiter):
limiter = bytes_per_second
elif bytes_per_second is not None:
limiter = ratelimiter.Ratelimiter(allowance_per_period=bytes_per_second, period=1)
else:
limiter = None
if totalbytes == 0:
# precalcsize was not used. Just report the written bytes
callbackfunction(copiedname, writtenbytes, writtenbytes)
# Copy
written_bytes = 0
for (source_location, base_filename) in walk_generator(source_dir):
# Terminology:
# abspath: C:\folder\subfolder\filename.txt
# base_filename: filename.txt
# folder: subfolder
# location: C:\folder\subfolder
#source_location = normalize(source_location)
#base_filename = normalize(base_filename)
source_folder_name = os.path.split(source_location)[1]
source_abspath = os.path.join(source_location, base_filename)
destination_abspath = source_abspath.replace(source_dir, destination_dir)
destination_location = os.path.split(destination_abspath)[0]
if base_filename in exclude_filenames:
exclusion_callback(source_abspath, 'file')
continue
if source_abspath in exclude_filenames:
exclusion_callback(source_abspath, 'file')
continue
if source_location in exclude_directories:
exclusion_callback(source_location, 'directory')
continue
if source_folder_name in exclude_directories:
exclusion_callback(source_location, 'directory')
continue
if os.path.isdir(destination_abspath):
raise DestinationIsDirectory(destination_abspath)
if not os.path.isdir(destination_location):
os.makedirs(destination_location)
copied = copy_file(
source_abspath,
destination_abspath,
bytes_per_second=limiter,
callback=callback_file,
dry_run=dry_run,
overwrite_old=overwrite_old,
)
copiedname = copied[0]
written_bytes += copied[1]
if callback_directory is not None:
if precalcsize is False:
callback_directory(copiedname, written_bytes, written_bytes)
else:
# provide the precalcsize
callbackfunction(copiedname, writtenbytes, totalbytes)
callback_directory(copiedname, written_bytes, total_bytes)
return [dstdir, writtenbytes]
return [destination_dir, written_bytes]
def getdirsize(srcdir):
def execute_spinaltask(task):
'''
Using os.walk, return the total number of bytes
this directory contains, including all subdirectories.
Execute a spinal task.
'''
pass
srcdir = os.path.abspath(srcdir)
def get_dir_size(source_dir):
'''
Calculate the total number of bytes across all files in this directory
and its subdirectories.
'''
source_dir = os.path.abspath(source_dir)
if not os.path.isdir(srcdir):
raise SpinalError("Source dir is not a directory: %s" % srcdir,
EXC_SRCNOTDIR)
if not os.path.isdir(source_dir):
raise SourceNotDirectory(source_dir)
totalbytes = 0
walker = os.walk(srcdir)
for step in walker:
# (path, [dirs], [files])
path = step[0]
files = step[2]
total_bytes = 0
for (directory, filename) in walk_generator(source_dir):
filename = os.path.join(directory, filename)
filesize = os.path.getsize(filename)
total_bytes += filesize
return total_bytes
def is_subfolder(parent, child):
'''
Determine whether parent contains child.
'''
parent = normalize(os.path.abspath(parent)) + os.sep
child = normalize(os.path.abspath(child)) + os.sep
return child.startswith(parent)
def is_xor(*args):
'''
Return True if and only if one arg is truthy.
'''
return [bool(a) for a in args].count(True) == 1
def new_root(filepath, root):
'''
Prepend `root` to `filepath`, drive letter included. For example:
"C:\\folder\\subfolder\\file.txt" and "C:\\backups" becomes
"C:\\backups\\C\\folder\\subfolder\\file.txt"
I use this so that my G: drive can have backups from my C: and D: drives
while preserving directory structure in G:\\D and G:\\C.
'''
filepath = os.path.abspath(filepath)
root = os.path.abspath(root)
filepath = filepath.replace(':', os.sep)
filepath = os.path.normpath(filepath)
filepath = os.path.join(root, filepath)
return filepath
def normalize(text):
'''
Apply os.path.normpath and os.path.normcase.
'''
return os.path.normpath(os.path.normcase(text))
def walk_generator(path):
'''
Yield filenames from os.walk so the caller doesn't need to deal with the
nested for-loops.
'''
path = os.path.abspath(path)
walker = os.walk(path)
for (location, folders, files) in walker:
for filename in files:
fpath = os.path.join(path, filename)
totalbytes += os.path.getsize(fpath)
return totalbytes
def cb(filename, written, total):
'''
Example of a callbackfunction.
Prints the number of bytes written,
total bytes needed,
and percentage so far.
'''
name = os.path.basename(filename)
if written >= total:
ends = '\n'
else:
ends = '\n'
percentage = (100 * written) / total
percentage = '%03.3f' % percentage
written = '{:,}'.format(written)
total = '{:,}'.format(total)
written = (' '*(len(total)-len(written))) + written
status = '%s %s / %s (%s)\r' % (name, written, total, percentage)
print(status, end=ends)
yield (location, filename)

View file

@ -1,77 +0,0 @@
import spinal
import tkinter
class Path:
def __init__(self, src='', dst='', overwrite=False, precalcsize=False):
self.src = src
self.dst = dst
self.overwrite = overwrite
self.precalcsize = precalcsize
def __str__(self):
return 'Path: %s -> %s' % (self.src, self.dst)
class SpinalClient:
def __init__(self):
self.windowtitle = 'Spinal'
self.font_large = ("Consolas", 16)
self.font_med = ("Consolas", 12)
self.font_small = ("Consolas", 10)
self.t = tkinter.Tk()
self.t.title(self.windowtitle)
self.w = 450
self.h = 350
self.screenwidth = self.t.winfo_screenwidth()
self.screenheight = self.t.winfo_screenheight()
self.windowwidth = self.w
self.windowheight = self.h
self.windowx = (self.screenwidth-self.windowwidth) / 2
self.windowy = ((self.screenheight-self.windowheight) / 2) - 27
self.geometrystring = '%dx%d+%d+%d' % (
self.windowwidth, self.windowheight, self.windowx, self.windowy)
self.t.geometry(self.geometrystring)
self.panes_main = tkinter.PanedWindow(self.t, orient='vertical',
sashrelief='raised', sashpad=8)
self.panes_main.pack(expand=True, fill='both')
### FRAME_CONFIG ###
#
self.frame_config = tkinter.Frame(self.t)
self.button_configload = tkinter.Button(self.frame_config,text='Load')
self.button_configload.grid(row=0, column=0)
self.button_configload.configure(bg="#6fd5f6",
activebackground="#6fd5f6", relief="flat", width=4)
#
self.enter_configpath = tkinter.Entry(self.frame_config)
self.enter_configpath.grid(row=0, column=1, sticky='nesw')
self.enter_configpath.configure(font=self.font_small)
#
self.button_configsave = tkinter.Button(self.frame_config,text='Save')
self.button_configsave.grid(row=0, column=2)
self.button_configsave.configure(bg="#76E22E",
activebackground="#46E22E", relief="flat", width=4)
#
### END FRAME_CONFIG ###
### FRAME_PRIMARY ###
#
self.frame_primary = tkinter.Frame(self.t)
self.paths = []
#
### END FRAME_PRIMARY ###
tkinter.Grid.columnconfigure(self.frame_config, 1, weight=10)
self.panes_main.add(self.frame_config)
self.panes_main.add(self.frame_primary)
def mainloop(self):
self.t.mainloop()
def add_pathline(self):
pass
if __name__ == '__main__':
s = SpinalClient()
s.mainloop()

View file

@ -1,27 +0,0 @@
import spinal
import os
def catchexc(function, fargs=(), fkwargs={}, goalexc=''):
'''
Call function with *args fargs and **kwargs fkwargs,
expecting to get an exception.
If the raised exception has the description == goalexc,
we got what we wanted. Else (or if no exception is raised)
something is wrong.
'''
try:
function(*fargs, **fkwargs)
raise Exception("This should not have passed")
except spinal.SpinalError as e:
if e.description != goalexc:
raise e
if __name__ == '__main__':
os.chdir('testdata')
spinal.os.remove('dstfile.txt')
spinal.copyfile('srcfile.txt', 'dstfile.txt', callbackfunction=spinal.cb)
spinal.copyfile('srcfile.txt', 'dstfile.txt', callbackfunction=spinal.cb)
spinal.copyfile('srcfile.txt', 'dstfile_no_overwrite.txt', overwrite=False, callbackfunction=spinal.cb)
spinal.copydir('.', '..\\t',precalcsize=True, callbackfile=spinal.cb)
catchexc(spinal.copyfile, ('nonexist.txt', 'nonexist2.txt'), {'overwrite':False}, goalexc=spinal.EXC_SRCNOTFILE)
print('You did it!')

View file

@ -1 +0,0 @@
Test data.

View file

@ -1 +0,0 @@
This won't be overwritten by srcfile.txt

View file

@ -1,3 +0,0 @@
version https://git-lfs.github.com/spec/v1
oid sha256:fd6b01eab729dc91cbee8e31a8386f090648e57cb25787f0ae2e1f383e86ca0d
size 952888

View file

@ -1 +0,0 @@
Test data.

View file

@ -1 +0,0 @@
submarines