Create objects.py
Move Album, Photo, Tag, User to objects.py; Move SQL_COLUMNS variables to constants.py so they can be shared; Move more shared helpers to helpers.py
This commit is contained in:
parent
0160af57dd
commit
91fcbb7101
6 changed files with 957 additions and 953 deletions
|
@ -10,7 +10,6 @@ This is the readme file.
|
|||
- Improve the "tags on this page" list. Maybe add separate buttons for must/may/forbid on each.
|
||||
- Some way for the database to re-identify a file that was moved / renamed (lost & found). Maybe file hash of the first few mb is good enough.
|
||||
- Move out more helpers
|
||||
- Create objects.py
|
||||
- Debate whether the `UserMixin.login` method should accept usernames or I should standardize the usage of IDs only internally.
|
||||
|
||||
### Changelog
|
||||
|
|
75
constants.py
75
constants.py
|
@ -1,4 +1,15 @@
|
|||
import converter
|
||||
import string
|
||||
import traceback
|
||||
|
||||
try:
|
||||
ffmpeg = converter.Converter(
|
||||
ffmpeg_path='C:\\software\\ffmpeg\\bin\\ffmpeg.exe',
|
||||
ffprobe_path='C:\\software\\ffmpeg\\bin\\ffprobe.exe',
|
||||
)
|
||||
except converter.ffmpeg.FFMpegError:
|
||||
traceback.print_exc()
|
||||
ffmpeg = None
|
||||
|
||||
ALLOWED_ORDERBY_COLUMNS = [
|
||||
'extension',
|
||||
|
@ -13,6 +24,70 @@ ALLOWED_ORDERBY_COLUMNS = [
|
|||
'random',
|
||||
]
|
||||
|
||||
SQL_LASTID_COLUMNS = [
|
||||
'table',
|
||||
'last_id',
|
||||
]
|
||||
SQL_ALBUM_COLUMNS = [
|
||||
'id',
|
||||
'title',
|
||||
'description',
|
||||
'associated_directory',
|
||||
]
|
||||
SQL_PHOTO_COLUMNS = [
|
||||
'id',
|
||||
'filepath',
|
||||
'override_filename',
|
||||
'extension',
|
||||
'width',
|
||||
'height',
|
||||
'ratio',
|
||||
'area',
|
||||
'duration',
|
||||
'bytes',
|
||||
'created',
|
||||
'thumbnail',
|
||||
'tagged_at',
|
||||
]
|
||||
SQL_TAG_COLUMNS = [
|
||||
'id',
|
||||
'name',
|
||||
]
|
||||
SQL_SYN_COLUMNS = [
|
||||
'name',
|
||||
'master',
|
||||
]
|
||||
SQL_ALBUMPHOTO_COLUMNS = [
|
||||
'albumid',
|
||||
'photoid',
|
||||
]
|
||||
SQL_PHOTOTAG_COLUMNS = [
|
||||
'photoid',
|
||||
'tagid',
|
||||
]
|
||||
SQL_TAGGROUP_COLUMNS = [
|
||||
'parentid',
|
||||
'memberid',
|
||||
]
|
||||
SQL_USER_COLUMNS = [
|
||||
'id',
|
||||
'username',
|
||||
'password',
|
||||
'created',
|
||||
]
|
||||
|
||||
_sql_dictify = lambda columns: {key:index for (index, key) in enumerate(columns)}
|
||||
SQL_ALBUM = _sql_dictify(SQL_ALBUM_COLUMNS)
|
||||
SQL_ALBUMPHOTO = _sql_dictify(SQL_ALBUMPHOTO_COLUMNS)
|
||||
SQL_LASTID = _sql_dictify(SQL_LASTID_COLUMNS)
|
||||
SQL_PHOTO = _sql_dictify(SQL_PHOTO_COLUMNS)
|
||||
SQL_PHOTOTAG = _sql_dictify(SQL_PHOTOTAG_COLUMNS)
|
||||
SQL_SYN = _sql_dictify(SQL_SYN_COLUMNS)
|
||||
SQL_TAG = _sql_dictify(SQL_TAG_COLUMNS)
|
||||
SQL_TAGGROUP = _sql_dictify(SQL_TAGGROUP_COLUMNS)
|
||||
SQL_USER = _sql_dictify(SQL_USER_COLUMNS)
|
||||
|
||||
|
||||
# Errors and warnings
|
||||
ERROR_DATABASE_OUTOFDATE = 'Database is out-of-date. {current} should be {new}. Please use etiquette_upgrader.py'
|
||||
ERROR_INVALID_ACTION = 'Invalid action'
|
||||
|
|
14
etiquette.py
14
etiquette.py
|
@ -4,10 +4,6 @@ import json
|
|||
import mimetypes
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import requests
|
||||
import sys
|
||||
import time
|
||||
import warnings
|
||||
|
||||
import constants
|
||||
|
@ -19,7 +15,6 @@ import phototagger
|
|||
|
||||
# pip install
|
||||
# https://raw.githubusercontent.com/voussoir/else/master/_voussoirkit/voussoirkit.zip
|
||||
from voussoirkit import bytestring
|
||||
from voussoirkit import webstreamzip
|
||||
|
||||
site = flask.Flask(__name__)
|
||||
|
@ -211,7 +206,7 @@ def get_album_tar(albumid):
|
|||
photos = list(album.walk_photos())
|
||||
zipname_map = {p.real_filepath: '%s - %s' % (p.id, p.basename) for p in photos}
|
||||
streamed_zip = webstreamzip.stream_tar(zipname_map)
|
||||
content_length = sum(p.bytes for p in photos)
|
||||
#content_length = sum(p.bytes for p in photos)
|
||||
outgoing_headers = {'Content-Type': 'application/octet-stream'}
|
||||
return flask.Response(streamed_zip, headers=outgoing_headers)
|
||||
|
||||
|
@ -242,7 +237,6 @@ def get_bookmarks():
|
|||
|
||||
@site.route('/file/<photoid>')
|
||||
def get_file(photoid):
|
||||
requested_photoid = photoid
|
||||
photoid = photoid.split('.')[0]
|
||||
photo = P.get_photo(photoid)
|
||||
|
||||
|
@ -445,8 +439,8 @@ def get_search_html():
|
|||
@decorators.give_session_token
|
||||
def get_search_json():
|
||||
search_results = get_search_core()
|
||||
search_kwargs = search_results['search_kwargs']
|
||||
qualname_map = search_results['qualname_map']
|
||||
#search_kwargs = search_results['search_kwargs']
|
||||
#qualname_map = search_results['qualname_map']
|
||||
include_qualname_map = request.args.get('include_map', False)
|
||||
include_qualname_map = helpers.truthystring(include_qualname_map)
|
||||
if not include_qualname_map:
|
||||
|
@ -471,7 +465,7 @@ def get_tags_core(specific_tag=None):
|
|||
tags = [t for t in tags if t != '']
|
||||
tags = [(t, t.split('.')[-1].split('+')[0]) for t in tags]
|
||||
return tags
|
||||
|
||||
|
||||
@site.route('/tags')
|
||||
@site.route('/tags/<specific_tag>')
|
||||
@decorators.give_session_token
|
||||
|
|
42
helpers.py
42
helpers.py
|
@ -1,11 +1,14 @@
|
|||
import datetime
|
||||
import math
|
||||
import mimetypes
|
||||
import os
|
||||
|
||||
import exceptions
|
||||
import constants
|
||||
import exceptions
|
||||
import warnings
|
||||
|
||||
from voussoirkit import bytestring
|
||||
|
||||
def chunk_sequence(sequence, chunk_length, allow_incomplete=True):
|
||||
'''
|
||||
Given a sequence, divide it into sequences of length `chunk_length`.
|
||||
|
@ -129,6 +132,25 @@ def is_xor(*args):
|
|||
'''
|
||||
return [bool(a) for a in args].count(True) == 1
|
||||
|
||||
def normalize_filepath(filepath):
|
||||
'''
|
||||
Remove some bad characters.
|
||||
'''
|
||||
filepath = filepath.replace('/', os.sep)
|
||||
filepath = filepath.replace('\\', os.sep)
|
||||
filepath = filepath.replace('<', '')
|
||||
filepath = filepath.replace('>', '')
|
||||
return filepath
|
||||
|
||||
def now(timestamp=True):
|
||||
'''
|
||||
Return the current UTC timestamp or datetime object.
|
||||
'''
|
||||
n = datetime.datetime.now(datetime.timezone.utc)
|
||||
if timestamp:
|
||||
return n.timestamp()
|
||||
return n
|
||||
|
||||
def read_filebytes(filepath, range_min, range_max, chunk_size=2 ** 20):
|
||||
'''
|
||||
Yield chunks of bytes from the file between the endpoints.
|
||||
|
@ -158,12 +180,24 @@ def seconds_to_hms(seconds):
|
|||
(minutes, seconds) = divmod(seconds, 60)
|
||||
(hours, minutes) = divmod(minutes, 60)
|
||||
parts = []
|
||||
if hours: parts.append(hours)
|
||||
if minutes: parts.append(minutes)
|
||||
if hours:
|
||||
parts.append(hours)
|
||||
if minutes:
|
||||
parts.append(minutes)
|
||||
parts.append(seconds)
|
||||
hms = ':'.join('%02d' % part for part in parts)
|
||||
return hms
|
||||
|
||||
def select_generator(sql, query, bindings=None):
|
||||
bindings = bindings or []
|
||||
cursor = sql.cursor()
|
||||
cursor.execute(query, bindings)
|
||||
while True:
|
||||
fetch = cursor.fetchone()
|
||||
if fetch is None:
|
||||
break
|
||||
yield fetch
|
||||
|
||||
def truthystring(s):
|
||||
if isinstance(s, (bool, int)) or s is None:
|
||||
return s
|
||||
|
@ -279,7 +313,7 @@ def _unitconvert(value):
|
|||
if value is None:
|
||||
return None
|
||||
if ':' in value:
|
||||
return helpers.hms_to_seconds(value)
|
||||
return hms_to_seconds(value)
|
||||
elif all(c in '0123456789.' for c in value):
|
||||
return float(value)
|
||||
else:
|
||||
|
|
793
objects.py
Normal file
793
objects.py
Normal file
|
@ -0,0 +1,793 @@
|
|||
import os
|
||||
import PIL.Image
|
||||
import traceback
|
||||
|
||||
import constants
|
||||
import decorators
|
||||
import exceptions
|
||||
import helpers
|
||||
|
||||
# pip install
|
||||
# https://raw.githubusercontent.com/voussoir/else/master/_voussoirkit/voussoirkit.zip
|
||||
from voussoirkit import bytestring
|
||||
from voussoirkit import pathclass
|
||||
from voussoirkit import spinal
|
||||
|
||||
class ObjectBase:
|
||||
def __eq__(self, other):
|
||||
return (
|
||||
isinstance(other, type(self)) and
|
||||
self.photodb == other.photodb and
|
||||
self.id == other.id
|
||||
)
|
||||
|
||||
def __format__(self, formcode):
|
||||
if formcode == 'r':
|
||||
return repr(self)
|
||||
else:
|
||||
return str(self)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.id)
|
||||
|
||||
|
||||
class GroupableMixin:
|
||||
def add(self, member, *, commit=True):
|
||||
'''
|
||||
Add a child object to this group.
|
||||
Child must be of the same type as the calling object.
|
||||
|
||||
If that object is already a member of another group, an
|
||||
exceptions.GroupExists is raised.
|
||||
'''
|
||||
if not isinstance(member, type(self)):
|
||||
raise TypeError('Member must be of type %s' % type(self))
|
||||
|
||||
self.photodb.cur.execute('SELECT * FROM tag_group_rel WHERE memberid == ?', [member.id])
|
||||
fetch = self.photodb.cur.fetchone()
|
||||
if fetch is not None:
|
||||
if fetch[constants.SQL_TAGGROUP['parentid']] == self.id:
|
||||
that_group = self
|
||||
else:
|
||||
that_group = self.group_getter(id=fetch[constants.SQL_TAGGROUP['parentid']])
|
||||
raise exceptions.GroupExists('%s already in group %s' % (member.name, that_group.name))
|
||||
|
||||
self.photodb._cached_frozen_children = None
|
||||
self.photodb.cur.execute('INSERT INTO tag_group_rel VALUES(?, ?)', [self.id, member.id])
|
||||
if commit:
|
||||
self.photodb.log.debug('Commiting - add to group')
|
||||
self.photodb.commit()
|
||||
|
||||
def children(self):
|
||||
self.photodb.cur.execute('SELECT * FROM tag_group_rel WHERE parentid == ?', [self.id])
|
||||
fetch = self.photodb.cur.fetchall()
|
||||
results = []
|
||||
for f in fetch:
|
||||
memberid = f[constants.SQL_TAGGROUP['memberid']]
|
||||
child = self.group_getter(id=memberid)
|
||||
results.append(child)
|
||||
if isinstance(self, Tag):
|
||||
results.sort(key=lambda x: x.name)
|
||||
else:
|
||||
results.sort(key=lambda x: x.id)
|
||||
return results
|
||||
|
||||
def delete(self, *, delete_children=False, commit=True):
|
||||
'''
|
||||
Delete this object's relationships to other groupables.
|
||||
Any unique / specific deletion methods should be written within the
|
||||
inheriting class.
|
||||
|
||||
For example, Tag.delete calls here to remove the group links, but then
|
||||
does the rest of the tag deletion process on its own.
|
||||
|
||||
delete_children:
|
||||
If True, all children will be deleted.
|
||||
Otherwise they'll just be raised up one level.
|
||||
'''
|
||||
self.photodb._cached_frozen_children = None
|
||||
if delete_children:
|
||||
for child in self.children():
|
||||
child.delete(delete_children=delete_children, commit=False)
|
||||
else:
|
||||
# Lift children
|
||||
parent = self.parent()
|
||||
if parent is None:
|
||||
# Since this group was a root, children become roots by removing the row.
|
||||
self.photodb.cur.execute('DELETE FROM tag_group_rel WHERE parentid == ?', [self.id])
|
||||
else:
|
||||
# Since this group was a child, its parent adopts all its children.
|
||||
self.photodb.cur.execute(
|
||||
'UPDATE tag_group_rel SET parentid == ? WHERE parentid == ?',
|
||||
[parent.id, self.id]
|
||||
)
|
||||
# Note that this part comes after the deletion of children to prevent issues of recursion.
|
||||
self.photodb.cur.execute('DELETE FROM tag_group_rel WHERE memberid == ?', [self.id])
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - delete tag')
|
||||
self.photodb.commit()
|
||||
|
||||
def parent(self):
|
||||
'''
|
||||
Return the group of which this is a member, or None.
|
||||
Returned object will be of the same type as calling object.
|
||||
'''
|
||||
self.photodb.cur.execute('SELECT * FROM tag_group_rel WHERE memberid == ?', [self.id])
|
||||
fetch = self.photodb.cur.fetchone()
|
||||
if fetch is None:
|
||||
return None
|
||||
|
||||
parentid = fetch[constants.SQL_TAGGROUP['parentid']]
|
||||
return self.group_getter(id=parentid)
|
||||
|
||||
def join_group(self, group, *, commit=True):
|
||||
'''
|
||||
Leave the current group, then call `group.add(self)`.
|
||||
'''
|
||||
if isinstance(group, str):
|
||||
group = self.photodb.get_tag(group)
|
||||
if not isinstance(group, type(self)):
|
||||
raise TypeError('Group must also be %s' % type(self))
|
||||
|
||||
if self == group:
|
||||
raise ValueError('Cant join self')
|
||||
|
||||
self.leave_group(commit=commit)
|
||||
group.add(self, commit=commit)
|
||||
|
||||
def leave_group(self, *, commit=True):
|
||||
'''
|
||||
Leave the current group and become independent.
|
||||
'''
|
||||
self.photodb._cached_frozen_children = None
|
||||
self.photodb.cur.execute('DELETE FROM tag_group_rel WHERE memberid == ?', [self.id])
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - leave group')
|
||||
self.photodb.commit()
|
||||
|
||||
def walk_children(self):
|
||||
yield self
|
||||
for child in self.children():
|
||||
yield from child.walk_children()
|
||||
|
||||
def walk_parents(self):
|
||||
parent = self.parent()
|
||||
while parent is not None:
|
||||
yield parent
|
||||
parent = parent.parent()
|
||||
|
||||
|
||||
class Album(ObjectBase, GroupableMixin):
|
||||
def __init__(self, photodb, row_tuple):
|
||||
self.photodb = photodb
|
||||
if isinstance(row_tuple, (list, tuple)):
|
||||
row_tuple = {constants.SQL_ALBUM_COLUMNS[index]: value for (index, value) in enumerate(row_tuple)}
|
||||
self.id = row_tuple['id']
|
||||
self.title = row_tuple['title']
|
||||
self.description = row_tuple['description']
|
||||
self.name = 'Album %s' % self.id
|
||||
self.group_getter = self.photodb.get_album
|
||||
|
||||
def __repr__(self):
|
||||
return 'Album:{id}'.format(id=self.id)
|
||||
|
||||
def add_photo(self, photo, *, commit=True):
|
||||
if self.photodb != photo.photodb:
|
||||
raise ValueError('Not the same PhotoDB')
|
||||
if self.has_photo(photo):
|
||||
return
|
||||
self.photodb.cur.execute('INSERT INTO album_photo_rel VALUES(?, ?)', [self.id, photo.id])
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - add photo to album')
|
||||
self.photodb.commit()
|
||||
|
||||
def add_tag_to_all(self, tag, *, nested_children=True, commit=True):
|
||||
tag = self.photodb.get_tag(tag)
|
||||
if nested_children:
|
||||
photos = self.walk_photos()
|
||||
else:
|
||||
photos = self.photos()
|
||||
for photo in photos:
|
||||
photo.add_tag(tag, commit=False)
|
||||
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - add tag to all')
|
||||
self.photodb.commit()
|
||||
|
||||
def delete(self, *, delete_children=False, commit=True):
|
||||
self.photodb.log.debug('Deleting album {album:r}'.format(album=self))
|
||||
GroupableMixin.delete(self, delete_children=delete_children, commit=False)
|
||||
self.photodb.cur.execute('DELETE FROM albums WHERE id == ?', [self.id])
|
||||
self.photodb.cur.execute('DELETE FROM album_photo_rel WHERE albumid == ?', [self.id])
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - delete album')
|
||||
self.photodb.commit()
|
||||
|
||||
def edit(self, title=None, description=None, *, commit=True):
|
||||
if title is None:
|
||||
title = self.title
|
||||
if description is None:
|
||||
description = self.description
|
||||
self.photodb.cur.execute(
|
||||
'UPDATE albums SET title=?, description=? WHERE id == ?',
|
||||
[title, description, self.id]
|
||||
)
|
||||
self.title = title
|
||||
self.description = description
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - edit album')
|
||||
self.photodb.commit()
|
||||
|
||||
def has_photo(self, photo):
|
||||
if not isinstance(photo, Photo):
|
||||
raise TypeError('Must be a %s' % Photo)
|
||||
self.photodb.cur.execute(
|
||||
'SELECT * FROM album_photo_rel WHERE albumid == ? AND photoid == ?',
|
||||
[self.id, photo.id]
|
||||
)
|
||||
return self.photodb.cur.fetchone() is not None
|
||||
|
||||
def photos(self):
|
||||
photos = []
|
||||
generator = helpers.select_generator(
|
||||
self.photodb.sql,
|
||||
'SELECT * FROM album_photo_rel WHERE albumid == ?',
|
||||
[self.id]
|
||||
)
|
||||
for photo in generator:
|
||||
photoid = photo[constants.SQL_ALBUMPHOTO['photoid']]
|
||||
photo = self.photodb.get_photo(photoid)
|
||||
photos.append(photo)
|
||||
photos.sort(key=lambda x: x.basename.lower())
|
||||
return photos
|
||||
|
||||
def remove_photo(self, photo, *, commit=True):
|
||||
if not self.has_photo(photo):
|
||||
return
|
||||
self.photodb.cur.execute(
|
||||
'DELETE FROM album_photo_rel WHERE albumid == ? AND photoid == ?',
|
||||
[self.id, photo.id]
|
||||
)
|
||||
if commit:
|
||||
self.photodb.commit()
|
||||
|
||||
def walk_photos(self):
|
||||
yield from self.photos()
|
||||
children = self.walk_children()
|
||||
# The first yield is itself
|
||||
next(children)
|
||||
for child in children:
|
||||
print(child)
|
||||
yield from child.walk_photos()
|
||||
|
||||
class Photo(ObjectBase):
|
||||
'''
|
||||
A PhotoDB entry containing information about an image file.
|
||||
Photo objects cannot exist without a corresponding PhotoDB object, because
|
||||
Photos are not the actual image data, just the database entry.
|
||||
'''
|
||||
def __init__(self, photodb, row_tuple):
|
||||
self.photodb = photodb
|
||||
if isinstance(row_tuple, (list, tuple)):
|
||||
row_tuple = {constants.SQL_PHOTO_COLUMNS[index]: value for (index, value) in enumerate(row_tuple)}
|
||||
|
||||
self.id = row_tuple['id']
|
||||
self.real_filepath = row_tuple['filepath']
|
||||
self.real_filepath = helpers.normalize_filepath(self.real_filepath)
|
||||
self.real_path = pathclass.Path(self.real_filepath)
|
||||
self.filepath = row_tuple['override_filename'] or self.real_filepath
|
||||
self.basename = row_tuple['override_filename'] or os.path.basename(self.real_filepath)
|
||||
self.extension = row_tuple['extension']
|
||||
self.width = row_tuple['width']
|
||||
self.height = row_tuple['height']
|
||||
self.ratio = row_tuple['ratio']
|
||||
self.area = row_tuple['area']
|
||||
self.bytes = row_tuple['bytes']
|
||||
self.duration = row_tuple['duration']
|
||||
self.created = row_tuple['created']
|
||||
self.thumbnail = row_tuple['thumbnail']
|
||||
self.tagged_at = row_tuple['tagged_at']
|
||||
|
||||
def __reinit__(self):
|
||||
'''
|
||||
Reload the row from the database and do __init__ with them.
|
||||
'''
|
||||
self.photodb.cur.execute('SELECT * FROM photos WHERE id == ?', [self.id])
|
||||
row = self.photodb.cur.fetchone()
|
||||
self.__init__(self.photodb, row)
|
||||
|
||||
def __repr__(self):
|
||||
return 'Photo:{id}'.format(id=self.id)
|
||||
|
||||
def add_tag(self, tag, *, commit=True):
|
||||
tag = self.photodb.get_tag(tag)
|
||||
|
||||
if self.has_tag(tag, check_children=False):
|
||||
return
|
||||
|
||||
# If the tag is above one we already have, keep our current one.
|
||||
existing = self.has_tag(tag, check_children=True)
|
||||
if existing:
|
||||
message = 'Preferring existing {exi:s} over {tag:s}'.format(exi=existing, tag=tag)
|
||||
self.photodb.log.debug(message)
|
||||
return
|
||||
|
||||
# If the tag is beneath one we already have, remove our current one
|
||||
# in favor of the new, more specific tag.
|
||||
for parent in tag.walk_parents():
|
||||
if self.has_tag(parent, check_children=False):
|
||||
self.photodb.log.debug('Preferring new {tag:s} over {par:s}'.format(tag=tag, par=parent))
|
||||
self.remove_tag(parent)
|
||||
|
||||
self.photodb.log.debug('Applying tag {tag:s} to photo {pho:s}'.format(tag=tag, pho=self))
|
||||
now = int(helpers.now())
|
||||
self.photodb.cur.execute('INSERT INTO photo_tag_rel VALUES(?, ?)', [self.id, tag.id])
|
||||
self.photodb.cur.execute('UPDATE photos SET tagged_at = ? WHERE id == ?', [now, self.id])
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - add photo tag')
|
||||
self.photodb.commit()
|
||||
|
||||
def albums(self):
|
||||
'''
|
||||
Return the albums of which this photo is a member.
|
||||
'''
|
||||
self.photodb.cur.execute('SELECT albumid FROM album_photo_rel WHERE photoid == ?', [self.id])
|
||||
fetch = self.photodb.cur.fetchall()
|
||||
albums = [self.photodb.get_album(f[0]) for f in fetch]
|
||||
return albums
|
||||
|
||||
def bytestring(self):
|
||||
return bytestring.bytestring(self.bytes)
|
||||
|
||||
def copy_tags(self, other_photo):
|
||||
for tag in other_photo.tags():
|
||||
self.add_tag(tag)
|
||||
|
||||
def delete(self, *, delete_file=False, commit=True):
|
||||
'''
|
||||
Delete the Photo and its relation to any tags and albums.
|
||||
'''
|
||||
self.photodb.log.debug('Deleting photo {photo:r}'.format(photo=self))
|
||||
self.photodb.cur.execute('DELETE FROM photos WHERE id == ?', [self.id])
|
||||
self.photodb.cur.execute('DELETE FROM photo_tag_rel WHERE photoid == ?', [self.id])
|
||||
self.photodb.cur.execute('DELETE FROM album_photo_rel WHERE photoid == ?', [self.id])
|
||||
|
||||
if delete_file:
|
||||
path = self.real_path.absolute_path
|
||||
if commit:
|
||||
os.remove(path)
|
||||
else:
|
||||
queue_action = {'action': os.remove, 'args': [path]}
|
||||
self.photodb.on_commit_queue.append(queue_action)
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - delete photo')
|
||||
self.photodb.commit()
|
||||
|
||||
@decorators.time_me
|
||||
def generate_thumbnail(self, *, commit=True, **special):
|
||||
'''
|
||||
special:
|
||||
For videos, you can provide a `timestamp` to take the thumbnail from.
|
||||
'''
|
||||
hopeful_filepath = self.make_thumbnail_filepath()
|
||||
return_filepath = None
|
||||
|
||||
mime = self.mimetype()
|
||||
if mime == 'image':
|
||||
self.photodb.log.debug('Thumbnailing %s' % self.real_filepath)
|
||||
try:
|
||||
image = PIL.Image.open(self.real_filepath)
|
||||
image = image.convert('RGB')
|
||||
except (OSError, ValueError):
|
||||
pass
|
||||
else:
|
||||
(width, height) = image.size
|
||||
(new_width, new_height) = helpers.fit_into_bounds(
|
||||
image_width=width,
|
||||
image_height=height,
|
||||
frame_width=self.photodb.config['thumbnail_width'],
|
||||
frame_height=self.photodb.config['thumbnail_height'],
|
||||
)
|
||||
if new_width < width:
|
||||
image = image.resize((new_width, new_height))
|
||||
image.save(hopeful_filepath, quality=50)
|
||||
return_filepath = hopeful_filepath
|
||||
|
||||
elif mime == 'video' and constants.ffmpeg:
|
||||
#print('video')
|
||||
probe = constants.ffmpeg.probe(self.real_filepath)
|
||||
try:
|
||||
if probe.video:
|
||||
size = helpers.fit_into_bounds(
|
||||
image_width=probe.video.video_width,
|
||||
image_height=probe.video.video_height,
|
||||
frame_width=self.photodb.config['thumbnail_width'],
|
||||
frame_height=self.photodb.config['thumbnail_height'],
|
||||
)
|
||||
size = '%dx%d' % size
|
||||
duration = probe.video.duration
|
||||
if 'timestamp' in special:
|
||||
timestamp = special['timestamp']
|
||||
else:
|
||||
if duration < 3:
|
||||
timestamp = 0
|
||||
else:
|
||||
timestamp = 2
|
||||
constants.ffmpeg.thumbnail(self.real_filepath, time=timestamp, quality=2, size=size, outfile=hopeful_filepath)
|
||||
except:
|
||||
traceback.print_exc()
|
||||
else:
|
||||
return_filepath = hopeful_filepath
|
||||
|
||||
|
||||
if return_filepath != self.thumbnail:
|
||||
self.photodb.cur.execute('UPDATE photos SET thumbnail = ? WHERE id == ?', [return_filepath, self.id])
|
||||
self.thumbnail = return_filepath
|
||||
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - generate thumbnail')
|
||||
self.photodb.commit()
|
||||
|
||||
self.__reinit__()
|
||||
return self.thumbnail
|
||||
|
||||
def has_tag(self, tag, *, check_children=True):
|
||||
'''
|
||||
Return the Tag object if this photo contains that tag. Otherwise return False.
|
||||
|
||||
check_children:
|
||||
If True, children of the requested tag are counted
|
||||
'''
|
||||
tag = self.photodb.get_tag(tag)
|
||||
|
||||
if check_children:
|
||||
tags = tag.walk_children()
|
||||
else:
|
||||
tags = [tag]
|
||||
|
||||
for tag in tags:
|
||||
self.photodb.cur.execute(
|
||||
'SELECT * FROM photo_tag_rel WHERE photoid == ? AND tagid == ?',
|
||||
[self.id, tag.id]
|
||||
)
|
||||
if self.photodb.cur.fetchone() is not None:
|
||||
return tag
|
||||
|
||||
return False
|
||||
|
||||
def make_thumbnail_filepath(self):
|
||||
chunked_id = helpers.chunk_sequence(self.id, 3)
|
||||
basename = chunked_id[-1]
|
||||
folder = chunked_id[:-1]
|
||||
folder = os.sep.join(folder)
|
||||
folder = os.path.join(self.photodb.thumbnail_directory, folder)
|
||||
if folder:
|
||||
os.makedirs(folder, exist_ok=True)
|
||||
hopeful_filepath = os.path.join(folder, basename) + '.jpg'
|
||||
return hopeful_filepath
|
||||
|
||||
def mimetype(self):
|
||||
return helpers.get_mimetype(self.real_filepath)
|
||||
|
||||
@decorators.time_me
|
||||
def reload_metadata(self, *, commit=True):
|
||||
'''
|
||||
Load the file's height, width, etc as appropriate for this type of file.
|
||||
'''
|
||||
self.bytes = os.path.getsize(self.real_filepath)
|
||||
self.width = None
|
||||
self.height = None
|
||||
self.area = None
|
||||
self.ratio = None
|
||||
self.duration = None
|
||||
|
||||
mime = self.mimetype()
|
||||
if mime == 'image':
|
||||
try:
|
||||
image = PIL.Image.open(self.real_filepath)
|
||||
except (OSError, ValueError):
|
||||
self.photodb.log.debug('Failed to read image data for {photo:r}'.format(photo=self))
|
||||
else:
|
||||
(self.width, self.height) = image.size
|
||||
image.close()
|
||||
self.photodb.log.debug('Loaded image data for {photo:r}'.format(photo=self))
|
||||
|
||||
elif mime == 'video' and constants.ffmpeg:
|
||||
try:
|
||||
probe = constants.ffmpeg.probe(self.real_filepath)
|
||||
if probe and probe.video:
|
||||
self.duration = probe.format.duration or probe.video.duration
|
||||
self.width = probe.video.video_width
|
||||
self.height = probe.video.video_height
|
||||
except:
|
||||
traceback.print_exc()
|
||||
|
||||
elif mime == 'audio':
|
||||
try:
|
||||
probe = constants.ffmpeg.probe(self.real_filepath)
|
||||
if probe and probe.audio:
|
||||
self.duration = probe.audio.duration
|
||||
except:
|
||||
traceback.print_exc()
|
||||
|
||||
if self.width and self.height:
|
||||
self.area = self.width * self.height
|
||||
self.ratio = round(self.width / self.height, 2)
|
||||
|
||||
self.photodb.cur.execute(
|
||||
'UPDATE photos SET width=?, height=?, area=?, ratio=?, duration=?, bytes=? WHERE id==?',
|
||||
[self.width, self.height, self.area, self.ratio, self.duration, self.bytes, self.id],
|
||||
)
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - reload metadata')
|
||||
self.photodb.commit()
|
||||
|
||||
def remove_tag(self, tag, *, commit=True):
|
||||
tag = self.photodb.get_tag(tag)
|
||||
|
||||
self.photodb.log.debug('Removing tag {t} from photo {p}'.format(t=repr(tag), p=repr(self)))
|
||||
tags = list(tag.walk_children())
|
||||
for tag in tags:
|
||||
self.photodb.cur.execute(
|
||||
'DELETE FROM photo_tag_rel WHERE photoid == ? AND tagid == ?',
|
||||
[self.id, tag.id]
|
||||
)
|
||||
now = int(helpers.now())
|
||||
self.photodb.cur.execute('UPDATE photos SET tagged_at = ? WHERE id == ?', [now, self.id])
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - remove photo tag')
|
||||
self.photodb.commit()
|
||||
|
||||
def rename_file(self, new_filename, *, move=False, commit=True):
|
||||
'''
|
||||
Rename the file on the disk as well as in the database.
|
||||
If `move` is True, allow this operation to move the file.
|
||||
Otherwise, slashes will be considered an error.
|
||||
'''
|
||||
old_path = self.real_path
|
||||
old_path.correct_case()
|
||||
|
||||
new_filename = helpers.normalize_filepath(new_filename)
|
||||
if os.path.dirname(new_filename) == '':
|
||||
new_path = old_path.parent.with_child(new_filename)
|
||||
else:
|
||||
new_path = pathclass.Path(new_filename)
|
||||
new_path.correct_case()
|
||||
|
||||
self.photodb.log.debug(old_path)
|
||||
self.photodb.log.debug(new_path)
|
||||
if (new_path.parent != old_path.parent) and not move:
|
||||
raise ValueError('Cannot move the file without param move=True')
|
||||
|
||||
if new_path.absolute_path == old_path.absolute_path:
|
||||
raise ValueError('The new and old names are the same')
|
||||
|
||||
os.makedirs(new_path.parent.absolute_path, exist_ok=True)
|
||||
|
||||
if new_path != old_path:
|
||||
# This is different than the absolute == absolute check above, because this normalizes
|
||||
# the paths. It's possible on case-insensitive systems to have the paths point to the
|
||||
# same place while being differently cased, thus we couldn't make the intermediate link.
|
||||
try:
|
||||
os.link(old_path.absolute_path, new_path.absolute_path)
|
||||
except OSError:
|
||||
spinal.copy_file(old_path, new_path)
|
||||
|
||||
self.photodb.cur.execute(
|
||||
'UPDATE photos SET filepath = ? WHERE filepath == ?',
|
||||
[new_path.absolute_path, old_path.absolute_path]
|
||||
)
|
||||
|
||||
if commit:
|
||||
if new_path == old_path:
|
||||
# If they are equivalent but differently cased paths, just rename.
|
||||
os.rename(old_path.absolute_path, new_path.absolute_path)
|
||||
else:
|
||||
# Delete the original hardlink or copy.
|
||||
os.remove(old_path.absolute_path)
|
||||
self.photodb.log.debug('Committing - rename file')
|
||||
self.photodb.commit()
|
||||
else:
|
||||
queue_action = {'action': os.remove, 'args': [old_path.absolute_path]}
|
||||
self.photodb.on_commit_queue.append(queue_action)
|
||||
|
||||
self.__reinit__()
|
||||
|
||||
def tags(self):
|
||||
'''
|
||||
Return the tags assigned to this Photo.
|
||||
'''
|
||||
tags = []
|
||||
generator = helpers.select_generator(
|
||||
self.photodb.sql,
|
||||
'SELECT * FROM photo_tag_rel WHERE photoid == ?',
|
||||
[self.id]
|
||||
)
|
||||
for tag in generator:
|
||||
tagid = tag[constants.SQL_PHOTOTAG['tagid']]
|
||||
tag = self.photodb.get_tag(id=tagid)
|
||||
tags.append(tag)
|
||||
return tags
|
||||
|
||||
|
||||
class Tag(ObjectBase, GroupableMixin):
|
||||
'''
|
||||
A Tag, which can be applied to Photos for organization.
|
||||
'''
|
||||
def __init__(self, photodb, row_tuple):
|
||||
self.photodb = photodb
|
||||
if isinstance(row_tuple, (list, tuple)):
|
||||
row_tuple = {constants.SQL_TAG_COLUMNS[index]: value for (index, value) in enumerate(row_tuple)}
|
||||
self.id = row_tuple['id']
|
||||
self.name = row_tuple['name']
|
||||
self.group_getter = self.photodb.get_tag
|
||||
self._cached_qualified_name = None
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.name == other or ObjectBase.__eq__(self, other)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.name)
|
||||
|
||||
def __repr__(self):
|
||||
rep = 'Tag:{id}:{name}'.format(name=self.name, id=self.id)
|
||||
return rep
|
||||
|
||||
def __str__(self):
|
||||
rep = 'Tag:{name}'.format(name=self.name)
|
||||
return rep
|
||||
|
||||
def add_synonym(self, synname, *, commit=True):
|
||||
synname = self.photodb.normalize_tagname(synname)
|
||||
|
||||
if synname == self.name:
|
||||
raise ValueError('Cannot assign synonym to itself.')
|
||||
|
||||
try:
|
||||
self.photodb.get_tag_by_name(synname)
|
||||
except exceptions.NoSuchTag:
|
||||
pass
|
||||
else:
|
||||
raise exceptions.TagExists(synname)
|
||||
|
||||
self.photodb._cached_frozen_children = None
|
||||
self.photodb.cur.execute('INSERT INTO tag_synonyms VALUES(?, ?)', [synname, self.name])
|
||||
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - add synonym')
|
||||
self.photodb.commit()
|
||||
|
||||
def convert_to_synonym(self, mastertag, *, commit=True):
|
||||
'''
|
||||
Convert an independent tag into a synonym for a different independent tag.
|
||||
All photos which possess the current tag will have it replaced
|
||||
with the new master tag.
|
||||
All synonyms of the old tag will point to the new tag.
|
||||
|
||||
Good for when two tags need to be merged under a single name.
|
||||
'''
|
||||
mastertag = self.photodb.get_tag(mastertag)
|
||||
|
||||
# Migrate the old tag's synonyms to the new one
|
||||
# UPDATE is safe for this operation because there is no chance of duplicates.
|
||||
self.photodb._cached_frozen_children = None
|
||||
self.photodb.cur.execute(
|
||||
'UPDATE tag_synonyms SET mastername = ? WHERE mastername == ?',
|
||||
[mastertag.name, self.name]
|
||||
)
|
||||
|
||||
# Iterate over all photos with the old tag, and swap them to the new tag
|
||||
# if they don't already have it.
|
||||
generator = helpers.select_generator(self.photodb.sql, 'SELECT * FROM photo_tag_rel WHERE tagid == ?', [self.id])
|
||||
for relationship in generator:
|
||||
photoid = relationship[constants.SQL_PHOTOTAG['photoid']]
|
||||
self.photodb.cur.execute('SELECT * FROM photo_tag_rel WHERE photoid == ? AND tagid == ?', [photoid, mastertag.id])
|
||||
if self.photodb.cur.fetchone() is None:
|
||||
self.photodb.cur.execute('INSERT INTO photo_tag_rel VALUES(?, ?)', [photoid, mastertag.id])
|
||||
|
||||
# Then delete the relationships with the old tag
|
||||
self.delete()
|
||||
|
||||
# Enjoy your new life as a monk.
|
||||
mastertag.add_synonym(self.name, commit=False)
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - convert to synonym')
|
||||
self.photodb.commit()
|
||||
|
||||
def delete(self, *, delete_children=False, commit=True):
|
||||
self.photodb.log.debug('Deleting tag {tag:r}'.format(tag=self))
|
||||
self.photodb._cached_frozen_children = None
|
||||
GroupableMixin.delete(self, delete_children=delete_children, commit=False)
|
||||
self.photodb.cur.execute('DELETE FROM tags WHERE id == ?', [self.id])
|
||||
self.photodb.cur.execute('DELETE FROM photo_tag_rel WHERE tagid == ?', [self.id])
|
||||
self.photodb.cur.execute('DELETE FROM tag_synonyms WHERE mastername == ?', [self.name])
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - delete tag')
|
||||
self.photodb.commit()
|
||||
|
||||
def qualified_name(self):
|
||||
'''
|
||||
Return the 'group1.group2.tag' string for this tag.
|
||||
'''
|
||||
if self._cached_qualified_name:
|
||||
return self._cached_qualified_name
|
||||
qualname = self.name
|
||||
for parent in self.walk_parents():
|
||||
qualname = parent.name + '.' + qualname
|
||||
self._cached_qualified_name = qualname
|
||||
return qualname
|
||||
|
||||
def remove_synonym(self, synname, *, commit=True):
|
||||
'''
|
||||
Delete a synonym.
|
||||
This will have no effect on photos or other synonyms because
|
||||
they always resolve to the master tag before application.
|
||||
'''
|
||||
synname = self.photodb.normalize_tagname(synname)
|
||||
self.photodb.cur.execute('SELECT * FROM tag_synonyms WHERE name == ?', [synname])
|
||||
fetch = self.photodb.cur.fetchone()
|
||||
if fetch is None:
|
||||
raise exceptions.NoSuchSynonym(synname)
|
||||
|
||||
self.photodb._cached_frozen_children = None
|
||||
self.photodb.cur.execute('DELETE FROM tag_synonyms WHERE name == ?', [synname])
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - remove synonym')
|
||||
self.photodb.commit()
|
||||
|
||||
def rename(self, new_name, *, apply_to_synonyms=True, commit=True):
|
||||
'''
|
||||
Rename the tag. Does not affect its relation to Photos or tag groups.
|
||||
'''
|
||||
new_name = self.photodb.normalize_tagname(new_name)
|
||||
if new_name == self.name:
|
||||
return
|
||||
|
||||
try:
|
||||
self.photodb.get_tag(new_name)
|
||||
except exceptions.NoSuchTag:
|
||||
pass
|
||||
else:
|
||||
raise exceptions.TagExists(new_name)
|
||||
|
||||
self._cached_qualified_name = None
|
||||
self.photodb._cached_frozen_children = None
|
||||
self.photodb.cur.execute('UPDATE tags SET name = ? WHERE id == ?', [new_name, self.id])
|
||||
if apply_to_synonyms:
|
||||
self.photodb.cur.execute(
|
||||
'UPDATE tag_synonyms SET mastername = ? WHERE mastername = ?',
|
||||
[new_name, self.name]
|
||||
)
|
||||
|
||||
self.name = new_name
|
||||
if commit:
|
||||
self.photodb.log.debug('Committing - rename tag')
|
||||
self.photodb.commit()
|
||||
|
||||
def synonyms(self):
|
||||
self.photodb.cur.execute('SELECT name FROM tag_synonyms WHERE mastername == ?', [self.name])
|
||||
fetch = self.photodb.cur.fetchall()
|
||||
fetch = [f[0] for f in fetch]
|
||||
fetch.sort()
|
||||
return fetch
|
||||
|
||||
|
||||
class User(ObjectBase):
|
||||
'''
|
||||
A dear friend of ours.
|
||||
'''
|
||||
def __init__(self, photodb, row_tuple):
|
||||
self.photodb = photodb
|
||||
if isinstance(row_tuple, (list, tuple)):
|
||||
row_tuple = {constants.SQL_USER_COLUMNS[index]: value for (index, value) in enumerate(row_tuple)}
|
||||
self.id = row_tuple['id']
|
||||
self.username = row_tuple['username']
|
||||
self.created = row_tuple['created']
|
||||
|
||||
def __repr__(self):
|
||||
rep = 'User:{id}:{username}'.format(id=self.id, username=self.username)
|
||||
return rep
|
||||
|
||||
def __str__(self):
|
||||
rep = 'User:{username}'.format(username=self.username)
|
||||
return rep
|
985
phototagger.py
985
phototagger.py
File diff suppressed because it is too large
Load diff
Loading…
Reference in a new issue