Ethan Dalool
4c65ccaf68
I found that the strict heirarchy was not satisfying the situation where one tag is the intersection of two others, but we can only pick one as the parent For example, does red_jacket belong under clothes.red_clothes or clothes.jackets? A search for "red_clothes AND jackets" might give us someone wearing red pants and a black jacket, so this definitely needs to be a separate tag, but picking only one parent for it is not sufficient. Now, a search for red_clothes and a search for jackets will both find our red_jacket photo. The change also applies to Albums because why not, and I'm sure a similar case can be made. Unfortunately this means tags no longer have one true qualname. The concept of qualnames has not been completely phased out but it's in progress. This commit is very big because I was not sure for a long time whether to go through with it, and so much stuff had to change that I don't want to go back and figure out what could be grouped together.
1487 lines
49 KiB
Python
1487 lines
49 KiB
Python
'''
|
|
This file provides the data objects that should not be instantiated directly,
|
|
but are returned by the PDB accesses.
|
|
'''
|
|
|
|
import os
|
|
import PIL.Image
|
|
import string
|
|
import traceback
|
|
|
|
from . import constants
|
|
from . import decorators
|
|
from . import exceptions
|
|
from . import helpers
|
|
|
|
from voussoirkit import bytestring
|
|
from voussoirkit import pathclass
|
|
from voussoirkit import spinal
|
|
|
|
|
|
class ObjectBase:
|
|
def __init__(self, photodb):
|
|
super().__init__()
|
|
self.photodb = photodb
|
|
|
|
def __eq__(self, other):
|
|
return (
|
|
isinstance(other, type(self)) and
|
|
self.photodb == other.photodb and
|
|
self.id == other.id
|
|
)
|
|
|
|
def __format__(self, formcode):
|
|
if formcode == 'r':
|
|
return repr(self)
|
|
else:
|
|
return str(self)
|
|
|
|
def __hash__(self):
|
|
return hash(self.id)
|
|
|
|
@staticmethod
|
|
def normalize_author_id(author_id):
|
|
if author_id is None:
|
|
return None
|
|
|
|
if not isinstance(author_id, str):
|
|
raise TypeError(f'Author ID must be string, not {type(author_id)}.')
|
|
|
|
author_id = author_id.strip()
|
|
if author_id == '':
|
|
return None
|
|
|
|
return author_id
|
|
|
|
def get_author(self):
|
|
'''
|
|
Return the User who created this object, or None if it is unassigned.
|
|
'''
|
|
if self.author_id is None:
|
|
return None
|
|
return self.photodb.get_user(id=self.author_id)
|
|
|
|
|
|
class GroupableMixin:
|
|
group_getter = None
|
|
group_getter_many = None
|
|
group_sql_index = None
|
|
group_table = None
|
|
|
|
def _lift_children(self):
|
|
'''
|
|
If this object has parents, the parents adopt all of its children.
|
|
Otherwise the parental relationship is simply deleted.
|
|
'''
|
|
children = self.get_children()
|
|
if not children:
|
|
return
|
|
|
|
self.photodb.sql_delete(table=self.group_table, pairs={'parentid': self.id})
|
|
|
|
parents = self.get_parents()
|
|
for parent in parents:
|
|
parent.add_children(children)
|
|
|
|
@decorators.transaction
|
|
def add_child(self, member, *, commit=True):
|
|
self.assert_same_type(member)
|
|
|
|
if self.has_child(member):
|
|
return
|
|
|
|
self.photodb.log.debug(f'Adding child {member} to {self}.')
|
|
|
|
for my_ancestor in self.walk_parents():
|
|
if my_ancestor == member:
|
|
raise exceptions.RecursiveGrouping(member=member, group=self)
|
|
|
|
data = {
|
|
'parentid': self.id,
|
|
'memberid': member.id,
|
|
}
|
|
self.photodb.sql_insert(table=self.group_table, data=data)
|
|
|
|
self.photodb._cached_frozen_children = None
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - add to group')
|
|
self.photodb.commit()
|
|
|
|
@decorators.transaction
|
|
def add_children(self, members, *, commit=True):
|
|
for member in members:
|
|
self.add_child(member, commit=False)
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - add multiple to group')
|
|
self.photodb.commit()
|
|
|
|
def assert_same_type(self, other):
|
|
if not isinstance(other, type(self)):
|
|
raise TypeError(f'Object must be of type {type(self)}, not {type(other)}.')
|
|
if self.photodb != other.photodb:
|
|
raise TypeError(f'Objects must belong to the same PhotoDB.')
|
|
|
|
@decorators.transaction
|
|
def delete(self, *, delete_children=False, commit=True):
|
|
'''
|
|
Delete this object's relationships to other groupables.
|
|
Any unique / specific deletion methods should be written within the
|
|
inheriting class.
|
|
|
|
For example, Tag.delete calls here to remove the group links, but then
|
|
does the rest of the tag deletion process on its own.
|
|
|
|
delete_children:
|
|
If True, all children will be deleted.
|
|
Otherwise they'll just be raised up one level.
|
|
'''
|
|
self.photodb._cached_frozen_children = None
|
|
if delete_children:
|
|
for child in self.get_children():
|
|
child.delete(delete_children=True, commit=False)
|
|
else:
|
|
self._lift_children()
|
|
|
|
# Note that this part comes after the deletion of children to prevent
|
|
# issues of recursion.
|
|
self.photodb.sql_delete(table=self.group_table, pairs={'memberid': self.id})
|
|
self._uncache()
|
|
if commit:
|
|
self.photodb.log.debug('Committing - delete tag')
|
|
self.photodb.commit()
|
|
|
|
def get_children(self):
|
|
child_rows = self.photodb.sql_select(
|
|
f'SELECT memberid FROM {self.group_table} WHERE parentid == ?',
|
|
[self.id]
|
|
)
|
|
child_ids = [row[0] for row in child_rows]
|
|
children = self.group_getter_many(child_ids)
|
|
|
|
if isinstance(self, Tag):
|
|
children = sorted(children, key=lambda x: x.name)
|
|
else:
|
|
children = sorted(children, key=lambda x: x.id)
|
|
return children
|
|
|
|
def get_parents(self):
|
|
query = f'SELECT parentid FROM {self.group_table} WHERE memberid == ?'
|
|
parent_rows = self.photodb.sql_select(query, [self.id])
|
|
parent_ids = [row[0] for row in parent_rows]
|
|
parents = list(self.group_getter_many(parent_ids))
|
|
return parents
|
|
|
|
def has_any_child(self):
|
|
query = f'SELECT 1 FROM {self.group_table} WHERE parentid == ? LIMIT 1'
|
|
row = self.photodb.sql_select_one(query, [self.id])
|
|
return row is not None
|
|
|
|
def has_any_parent(self):
|
|
query = f'SELECT 1 FROM {self.group_table} WHERE memberid == ? LIMIT 1'
|
|
row = self.photodb.sql_select_one(query, [self.id])
|
|
return row is not None
|
|
|
|
def has_child(self, member):
|
|
self.assert_same_type(member)
|
|
query = f'SELECT 1 FROM {self.group_table} WHERE parentid == ? AND memberid == ?'
|
|
row = self.photodb.sql_select_one(query, [self.id, member.id])
|
|
return row is not None
|
|
|
|
@decorators.transaction
|
|
def remove_child(self, member, *, commit=True):
|
|
if not self.has_child(member):
|
|
return
|
|
|
|
self.photodb.log.debug(f'Removing child {member} from {self}.')
|
|
|
|
pairs = {
|
|
'parentid': self.id,
|
|
'memberid': member.id,
|
|
}
|
|
self.photodb.sql_delete(table=self.group_table, pairs=pairs)
|
|
self.photodb._cached_frozen_children = None
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - remove from group')
|
|
self.photodb.commit()
|
|
|
|
def walk_children(self):
|
|
yield self
|
|
for child in self.get_children():
|
|
yield from child.walk_children()
|
|
|
|
def walk_parents(self):
|
|
'''
|
|
Yield all ancestors in no particular order.
|
|
'''
|
|
parents = self.get_parents()
|
|
seen = set(parents)
|
|
todo = list(parents)
|
|
while len(todo) > 0:
|
|
parent = todo.pop(-1)
|
|
yield parent
|
|
seen.add(parent)
|
|
more_parents = set(parent.get_parents())
|
|
more_parents = more_parents.difference(seen)
|
|
todo.extend(more_parents)
|
|
|
|
|
|
class Album(ObjectBase, GroupableMixin):
|
|
group_table = 'album_group_rel'
|
|
group_sql_index = constants.SQL_INDEX[group_table]
|
|
|
|
def __init__(self, photodb, db_row):
|
|
super().__init__(photodb)
|
|
if isinstance(db_row, (list, tuple)):
|
|
db_row = dict(zip(constants.SQL_COLUMNS['albums'], db_row))
|
|
|
|
self.id = db_row['id']
|
|
self.title = self.normalize_title(db_row['title'])
|
|
self.description = self.normalize_description(db_row['description'])
|
|
self.author_id = self.normalize_author_id(db_row['author_id'])
|
|
|
|
self.group_getter = self.photodb.get_album
|
|
self.group_getter_many = self.photodb.get_albums_by_id
|
|
|
|
self._sum_bytes_local = None
|
|
self._sum_bytes_recursive = None
|
|
self._sum_photos_recursive = None
|
|
|
|
def __repr__(self):
|
|
return f'Album:{self.id}'
|
|
|
|
@staticmethod
|
|
def normalize_description(description):
|
|
if description is None:
|
|
return ''
|
|
|
|
if not isinstance(description, str):
|
|
raise TypeError(f'Description must be string, not {type(description)}')
|
|
|
|
description = description.strip()
|
|
|
|
return description
|
|
|
|
@staticmethod
|
|
def normalize_title(title):
|
|
if title is None:
|
|
return ''
|
|
|
|
if not isinstance(title, str):
|
|
raise TypeError(f'Title must be string, not {type(title)}')
|
|
|
|
title = title.strip()
|
|
for whitespace in string.whitespace:
|
|
title = title.replace(whitespace, ' ')
|
|
|
|
return title
|
|
|
|
def _uncache(self):
|
|
self.photodb.caches['album'].remove(self.id)
|
|
|
|
@decorators.required_feature('album.edit')
|
|
# GroupableMixin.add_child already has @transaction.
|
|
def add_child(self, *args, **kwargs):
|
|
result = super().add_child(*args, **kwargs)
|
|
return result
|
|
|
|
@decorators.required_feature('album.edit')
|
|
# GroupableMixin.add_children already has @transaction.
|
|
def add_children(self, *args, **kwargs):
|
|
return super().add_children(*args, **kwargs)
|
|
|
|
@decorators.required_feature('album.edit')
|
|
@decorators.transaction
|
|
def add_associated_directory(self, filepath, *, commit=True):
|
|
'''
|
|
Add a directory from which this album will pull files during rescans.
|
|
These relationships are not unique and multiple albums
|
|
can associate with the same directory if desired.
|
|
'''
|
|
filepath = pathclass.Path(filepath)
|
|
if not filepath.is_dir:
|
|
raise ValueError(f'{filepath} is not a directory')
|
|
|
|
try:
|
|
existing = self.photodb.get_album_by_path(filepath)
|
|
except exceptions.NoSuchAlbum:
|
|
existing = None
|
|
|
|
if existing is None:
|
|
pass
|
|
elif existing == self:
|
|
return
|
|
else:
|
|
raise exceptions.AlbumExists(filepath)
|
|
|
|
data = {
|
|
'albumid': self.id,
|
|
'directory': filepath.absolute_path,
|
|
}
|
|
self.photodb.sql_insert(table='album_associated_directories', data=data)
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - add associated directory')
|
|
self.photodb.commit()
|
|
|
|
def _add_photo(self, photo):
|
|
self.photodb.log.debug('Adding photo %s to %s', photo, self)
|
|
data = {'albumid': self.id, 'photoid': photo.id}
|
|
self.photodb.sql_insert(table='album_photo_rel', data=data)
|
|
|
|
@decorators.required_feature('album.edit')
|
|
@decorators.transaction
|
|
def add_photo(self, photo, *, commit=True):
|
|
if self.has_photo(photo):
|
|
return
|
|
|
|
self._add_photo(photo)
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - add photo to album')
|
|
self.photodb.commit()
|
|
|
|
@decorators.required_feature('album.edit')
|
|
@decorators.transaction
|
|
def add_photos(self, photos, *, commit=True):
|
|
existing_photos = set(self.get_photos())
|
|
photos = set(photos)
|
|
photos = photos.difference(existing_photos)
|
|
|
|
for photo in photos:
|
|
self._add_photo(photo)
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - add photos to album')
|
|
self.photodb.commit()
|
|
|
|
# Photo.add_tag already has @required_feature
|
|
@decorators.transaction
|
|
def add_tag_to_all(self, tag, *, nested_children=True, commit=True):
|
|
'''
|
|
Add this tag to every photo in the album. Saves you from having to
|
|
write the for-loop yourself.
|
|
|
|
nested_children:
|
|
If True, add the tag to photos contained in sub-albums.
|
|
Otherwise, only local photos.
|
|
'''
|
|
tag = self.photodb.get_tag(name=tag)
|
|
if nested_children:
|
|
photos = self.walk_photos()
|
|
else:
|
|
photos = self.get_photos()
|
|
|
|
for photo in photos:
|
|
photo.add_tag(tag, commit=False)
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - add tag to all')
|
|
self.photodb.commit()
|
|
|
|
@decorators.required_feature('album.edit')
|
|
@decorators.transaction
|
|
def delete(self, *, delete_children=False, commit=True):
|
|
self.photodb.log.debug('Deleting %s', self)
|
|
GroupableMixin.delete(self, delete_children=delete_children, commit=False)
|
|
self.photodb.sql_delete(table='album_associated_directories', pairs={'albumid': self.id})
|
|
self.photodb.sql_delete(table='album_photo_rel', pairs={'albumid': self.id})
|
|
self.photodb.sql_delete(table='albums', pairs={'id': self.id})
|
|
self._uncache()
|
|
if commit:
|
|
self.photodb.log.debug('Committing - delete album')
|
|
self.photodb.commit()
|
|
|
|
@property
|
|
def display_name(self):
|
|
if self.title:
|
|
return self.title
|
|
else:
|
|
return self.id
|
|
|
|
@decorators.required_feature('album.edit')
|
|
@decorators.transaction
|
|
def edit(self, title=None, description=None, *, commit=True):
|
|
'''
|
|
Change the title or description. Leave None to keep current value.
|
|
'''
|
|
if title is None and description is None:
|
|
return
|
|
|
|
if title is not None:
|
|
self.title = self.normalize_title(title)
|
|
|
|
if description is not None:
|
|
self.description = self.normalize_description(description)
|
|
|
|
data = {
|
|
'id': self.id,
|
|
'title': self.title,
|
|
'description': self.description,
|
|
}
|
|
self.photodb.sql_update(table='albums', pairs=data, where_key='id')
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - edit album')
|
|
self.photodb.commit()
|
|
|
|
def get_associated_directories(self):
|
|
directory_rows = self.photodb.sql_select(
|
|
'SELECT directory FROM album_associated_directories WHERE albumid == ?',
|
|
[self.id]
|
|
)
|
|
directories = [x[0] for x in directory_rows]
|
|
directories = [pathclass.Path(x) for x in directories]
|
|
return directories
|
|
|
|
def get_photos(self):
|
|
photos = []
|
|
generator = self.photodb.sql_select(
|
|
'SELECT photoid FROM album_photo_rel WHERE albumid == ?',
|
|
[self.id]
|
|
)
|
|
photo_ids = [row[0] for row in generator]
|
|
photos = self.photodb.get_photos_by_id(photo_ids)
|
|
photos = sorted(photos, key=lambda x: x.basename.lower())
|
|
return photos
|
|
|
|
def has_any_photo(self, recurse=False):
|
|
row = self.photodb.sql_select_one(
|
|
'SELECT photoid FROM album_photo_rel WHERE albumid == ? LIMIT 1',
|
|
[self.id]
|
|
)
|
|
if row is not None:
|
|
return True
|
|
if recurse:
|
|
return self.has_any_subalbum_photo()
|
|
return False
|
|
|
|
def has_any_subalbum_photo(self):
|
|
return any(child.has_any_photo(recurse=True) for child in self.get_children())
|
|
|
|
def has_photo(self, photo):
|
|
if not isinstance(photo, Photo):
|
|
raise TypeError(f'`photo` must be of type {Photo}, not {type(photo)}.')
|
|
|
|
rel_row = self.photodb.sql_select_one(
|
|
'SELECT 1 FROM album_photo_rel WHERE albumid == ? AND photoid == ?',
|
|
[self.id, photo.id]
|
|
)
|
|
return rel_row is not None
|
|
|
|
def _remove_photo(self, photo):
|
|
self.photodb.log.debug('Removing photo %s from %s', photo, self)
|
|
pairs = {'albumid': self.id, 'photoid': photo.id}
|
|
self.photodb.sql_delete(table='album_photo_rel', pairs=pairs)
|
|
|
|
@decorators.required_feature('album.edit')
|
|
@decorators.transaction
|
|
def remove_photo(self, photo, *, commit=True):
|
|
self._remove_photo(photo)
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - remove photo from album')
|
|
self.photodb.commit()
|
|
|
|
@decorators.required_feature('album.edit')
|
|
@decorators.transaction
|
|
def remove_photos(self, photos, *, commit=True):
|
|
existing_photos = set(self.get_photos())
|
|
photos = set(photos)
|
|
photos = photos.intersection(existing_photos)
|
|
|
|
for photo in photos:
|
|
self._remove_photo(photo)
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - remove photos from album')
|
|
self.photodb.commit()
|
|
|
|
def sum_bytes(self, recurse=True):
|
|
query = '''
|
|
SELECT SUM(bytes) FROM photos
|
|
WHERE photos.id IN (
|
|
SELECT photoid FROM album_photo_rel WHERE
|
|
albumid IN {albumids}
|
|
)
|
|
'''
|
|
if recurse:
|
|
albumids = [child.id for child in self.walk_children()]
|
|
else:
|
|
albumids = [self.id]
|
|
|
|
albumids = helpers.sql_listify(albumids)
|
|
query = query.format(albumids=albumids)
|
|
total = self.photodb.sql_select_one(query)[0]
|
|
return total
|
|
|
|
def sum_photos(self, recurse=True):
|
|
query = '''
|
|
SELECT COUNT(photoid)
|
|
FROM album_photo_rel
|
|
WHERE albumid IN {albumids}
|
|
'''
|
|
if recurse:
|
|
albumids = [child.id for child in self.walk_children()]
|
|
else:
|
|
albumids = [self.id]
|
|
|
|
albumids = helpers.sql_listify(albumids)
|
|
query = query.format(albumids=albumids)
|
|
total = self.photodb.sql_select_one(query)[0]
|
|
return total
|
|
|
|
def walk_photos(self):
|
|
yield from self.get_photos()
|
|
children = self.walk_children()
|
|
# The first yield is itself
|
|
next(children)
|
|
for child in children:
|
|
yield from child.walk_photos()
|
|
|
|
|
|
class Bookmark(ObjectBase):
|
|
def __init__(self, photodb, db_row):
|
|
super().__init__(photodb)
|
|
if isinstance(db_row, (list, tuple)):
|
|
db_row = dict(zip(constants.SQL_COLUMNS['bookmarks'], db_row))
|
|
|
|
self.id = db_row['id']
|
|
self.title = self.normalize_title(db_row['title'])
|
|
self.url = self.normalize_url(db_row['url'])
|
|
self.author_id = self.normalize_author_id(db_row['author_id'])
|
|
|
|
def __repr__(self):
|
|
return f'Bookmark:{self.id}'
|
|
|
|
@staticmethod
|
|
def normalize_title(title):
|
|
if title is None:
|
|
return ''
|
|
|
|
if not isinstance(title, str):
|
|
raise TypeError(f'Title must be string, not {type(title)}')
|
|
|
|
title = title.strip()
|
|
for whitespace in string.whitespace:
|
|
title = title.replace(whitespace, ' ')
|
|
|
|
return title
|
|
|
|
@staticmethod
|
|
def normalize_url(url):
|
|
if url is None:
|
|
return ''
|
|
|
|
if not isinstance(url, str):
|
|
raise TypeError(f'URL must be string, not {type(url)}')
|
|
|
|
url = url.strip()
|
|
|
|
if not url:
|
|
raise ValueError(f'Invalid URL "{url}"')
|
|
|
|
return url
|
|
|
|
def _uncache(self):
|
|
self.photodb.caches['bookmark'].remove(self.id)
|
|
|
|
@decorators.required_feature('bookmark.edit')
|
|
@decorators.transaction
|
|
def delete(self, *, commit=True):
|
|
self.photodb.sql_delete(table='bookmarks', pairs={'id': self.id})
|
|
self._uncache()
|
|
if commit:
|
|
self.photodb.commit()
|
|
|
|
@decorators.required_feature('bookmark.edit')
|
|
@decorators.transaction
|
|
def edit(self, title=None, url=None, *, commit=True):
|
|
'''
|
|
Change the title or URL. Leave None to keep current.
|
|
'''
|
|
if title is None and url is None:
|
|
return
|
|
|
|
if title is not None:
|
|
self.title = self.normalize_title(title)
|
|
|
|
if url is not None:
|
|
self.url = self.normalize_url(url)
|
|
|
|
data = {
|
|
'id': self.id,
|
|
'title': self.title,
|
|
'url': self.url,
|
|
}
|
|
self.photodb.sql_update(table='bookmarks', pairs=data, where_key='id')
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - edit bookmark')
|
|
self.photodb.commit()
|
|
|
|
|
|
class Photo(ObjectBase):
|
|
'''
|
|
A PhotoDB entry containing information about an image file.
|
|
Photo objects cannot exist without a corresponding PhotoDB object, because
|
|
Photos are not the actual image data, just the database entry.
|
|
'''
|
|
def __init__(self, photodb, db_row):
|
|
super().__init__(photodb)
|
|
if isinstance(db_row, (list, tuple)):
|
|
db_row = dict(zip(constants.SQL_COLUMNS['photos'], db_row))
|
|
|
|
self.real_path = db_row['filepath']
|
|
self.real_path = helpers.remove_path_badchars(self.real_path, allowed=':\\/')
|
|
self.real_path = pathclass.Path(self.real_path)
|
|
|
|
self.id = db_row['id']
|
|
self.created = db_row['created']
|
|
self.author_id = self.normalize_author_id(db_row['author_id'])
|
|
self.basename = db_row['override_filename'] or self.real_path.basename
|
|
self.extension = db_row['extension']
|
|
self.tagged_at = db_row['tagged_at']
|
|
|
|
if self.extension == '':
|
|
self.dot_extension = ''
|
|
else:
|
|
self.dot_extension = '.' + self.extension
|
|
|
|
self.area = db_row['area']
|
|
self.bytes = db_row['bytes']
|
|
self.duration = db_row['duration']
|
|
self.width = db_row['width']
|
|
self.height = db_row['height']
|
|
self.ratio = db_row['ratio']
|
|
|
|
if db_row['thumbnail'] is not None:
|
|
self.thumbnail = self.photodb.thumbnail_directory.join(db_row['thumbnail'])
|
|
else:
|
|
self.thumbnail = None
|
|
|
|
self.searchhidden = db_row['searchhidden']
|
|
|
|
self.mimetype = helpers.get_mimetype(self.real_path.basename)
|
|
if self.mimetype is None:
|
|
self.simple_mimetype = None
|
|
else:
|
|
self.simple_mimetype = self.mimetype.split('/')[0]
|
|
|
|
def __reinit__(self):
|
|
'''
|
|
Reload the row from the database and do __init__ with them.
|
|
'''
|
|
row = self.photodb.sql_select_one('SELECT * FROM photos WHERE id == ?', [self.id])
|
|
self.__init__(self.photodb, row)
|
|
|
|
def __repr__(self):
|
|
return f'Photo:{self.id}'
|
|
|
|
def _uncache(self):
|
|
self.photodb.caches['photo'].remove(self.id)
|
|
|
|
@decorators.required_feature('photo.add_remove_tag')
|
|
@decorators.transaction
|
|
def add_tag(self, tag, *, commit=True):
|
|
tag = self.photodb.get_tag(name=tag)
|
|
|
|
existing = self.has_tag(tag, check_children=False)
|
|
if existing:
|
|
return existing
|
|
|
|
# If the new tag is less specific than one we already have,
|
|
# keep our current one.
|
|
existing = self.has_tag(tag, check_children=True)
|
|
if existing:
|
|
self.photodb.log.debug(f'Preferring existing {existing} over {tag}')
|
|
return existing
|
|
|
|
# If the new tag is more specific, remove our current one for it.
|
|
for parent in tag.walk_parents():
|
|
if self.has_tag(parent, check_children=False):
|
|
self.photodb.log.debug(f'Preferring new {tag} over {parent}')
|
|
self.remove_tag(parent, commit=False)
|
|
|
|
self.photodb.log.debug('Applying %s to %s', tag, self)
|
|
|
|
data = {
|
|
'photoid': self.id,
|
|
'tagid': tag.id
|
|
}
|
|
self.photodb.sql_insert(table='photo_tag_rel', data=data)
|
|
data = {
|
|
'id': self.id,
|
|
'tagged_at': helpers.now(),
|
|
}
|
|
self.photodb.sql_update(table='photos', pairs=data, where_key='id')
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - add photo tag')
|
|
self.photodb.commit()
|
|
return tag
|
|
|
|
@property
|
|
def bitrate(self):
|
|
if self.duration and self.bytes is not None:
|
|
return (self.bytes / 128) / self.duration
|
|
else:
|
|
return None
|
|
|
|
@property
|
|
def bytestring(self):
|
|
if self.bytes is not None:
|
|
return bytestring.bytestring(self.bytes)
|
|
return '??? b'
|
|
|
|
# Photo.add_tag already has required_feature add_remove_tag
|
|
# Photo.add_tag already has @transaction.
|
|
def copy_tags(self, other_photo, *, commit=True):
|
|
'''
|
|
Take all of the tags owned by other_photo and apply them to this photo.
|
|
'''
|
|
for tag in other_photo.get_tags():
|
|
self.add_tag(tag, commit=False)
|
|
if commit:
|
|
self.photodb.log.debug('Committing - copy tags')
|
|
self.photodb.commit()
|
|
|
|
@decorators.required_feature('photo.edit')
|
|
@decorators.transaction
|
|
def delete(self, *, delete_file=False, commit=True):
|
|
'''
|
|
Delete the Photo and its relation to any tags and albums.
|
|
'''
|
|
self.photodb.log.debug('Deleting %s', self)
|
|
self.photodb.sql_delete(table='photo_tag_rel', pairs={'photoid': self.id})
|
|
self.photodb.sql_delete(table='album_photo_rel', pairs={'photoid': self.id})
|
|
self.photodb.sql_delete(table='photos', pairs={'id': self.id})
|
|
|
|
if delete_file:
|
|
path = self.real_path.absolute_path
|
|
if commit:
|
|
os.remove(path)
|
|
else:
|
|
queue_action = {'action': os.remove, 'args': [path]}
|
|
self.photodb.on_commit_queue.append(queue_action)
|
|
self._uncache()
|
|
if commit:
|
|
self.photodb.log.debug('Committing - delete photo')
|
|
self.photodb.commit()
|
|
|
|
@property
|
|
def duration_string(self):
|
|
if self.duration is None:
|
|
return None
|
|
return helpers.seconds_to_hms(self.duration)
|
|
|
|
#@decorators.time_me
|
|
@decorators.required_feature('photo.generate_thumbnail')
|
|
@decorators.transaction
|
|
def generate_thumbnail(self, *, commit=True, **special):
|
|
'''
|
|
special:
|
|
For videos, you can provide a `timestamp` to take the thumbnail at.
|
|
'''
|
|
hopeful_filepath = self.make_thumbnail_filepath()
|
|
return_filepath = None
|
|
|
|
if self.simple_mimetype == 'image':
|
|
self.photodb.log.debug('Thumbnailing %s', self.real_path.absolute_path)
|
|
try:
|
|
image = helpers.generate_image_thumbnail(
|
|
self.real_path.absolute_path,
|
|
width=self.photodb.config['thumbnail_width'],
|
|
height=self.photodb.config['thumbnail_height'],
|
|
)
|
|
except (OSError, ValueError):
|
|
traceback.print_exc()
|
|
else:
|
|
image.save(hopeful_filepath.absolute_path, quality=50)
|
|
return_filepath = hopeful_filepath
|
|
|
|
elif self.simple_mimetype == 'video' and constants.ffmpeg:
|
|
self.photodb.log.debug('Thumbnailing %s', self.real_path.absolute_path)
|
|
try:
|
|
success = helpers.generate_video_thumbnail(
|
|
self.real_path.absolute_path,
|
|
outfile=hopeful_filepath.absolute_path,
|
|
width=self.photodb.config['thumbnail_width'],
|
|
height=self.photodb.config['thumbnail_height'],
|
|
**special
|
|
)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
else:
|
|
if success:
|
|
return_filepath = hopeful_filepath
|
|
|
|
if return_filepath != self.thumbnail:
|
|
if return_filepath is not None:
|
|
return_filepath = return_filepath.relative_to(self.photodb.thumbnail_directory)
|
|
data = {
|
|
'id': self.id,
|
|
'thumbnail': return_filepath,
|
|
}
|
|
self.photodb.sql_update(table='photos', pairs=data, where_key='id')
|
|
self.thumbnail = return_filepath
|
|
|
|
self._uncache()
|
|
if commit:
|
|
self.photodb.log.debug('Committing - generate thumbnail')
|
|
self.photodb.commit()
|
|
|
|
self.__reinit__()
|
|
return self.thumbnail
|
|
|
|
def get_containing_albums(self):
|
|
'''
|
|
Return the albums of which this photo is a member.
|
|
'''
|
|
album_ids = self.photodb.sql_select(
|
|
'SELECT albumid FROM album_photo_rel WHERE photoid == ?',
|
|
[self.id]
|
|
)
|
|
album_ids = [row[0] for row in album_ids]
|
|
albums = list(self.photodb.get_albums_by_id(album_ids))
|
|
return albums
|
|
|
|
def get_tags(self):
|
|
'''
|
|
Return the tags assigned to this Photo.
|
|
'''
|
|
tag_ids = self.photodb.sql_select(
|
|
'SELECT tagid FROM photo_tag_rel WHERE photoid == ?',
|
|
[self.id]
|
|
)
|
|
tag_ids = [row[0] for row in tag_ids]
|
|
tags = list(self.photodb.get_tags_by_id(tag_ids))
|
|
return tags
|
|
|
|
def has_tag(self, tag, *, check_children=True):
|
|
'''
|
|
Return the Tag object if this photo contains that tag.
|
|
Otherwise return False.
|
|
|
|
check_children:
|
|
If True, children of the requested tag are accepted.
|
|
'''
|
|
tag = self.photodb.get_tag(name=tag)
|
|
|
|
if check_children:
|
|
tag_options = tag.walk_children()
|
|
else:
|
|
tag_options = [tag]
|
|
|
|
tag_by_id = {t.id: t for t in tag_options}
|
|
tag_option_ids = helpers.sql_listify(tag_by_id)
|
|
rel_row = self.photodb.sql_select_one(
|
|
f'SELECT tagid FROM photo_tag_rel WHERE photoid == ? AND tagid IN {tag_option_ids}',
|
|
[self.id]
|
|
)
|
|
|
|
if rel_row is None:
|
|
return False
|
|
|
|
return tag_by_id[rel_row[0]]
|
|
|
|
def make_thumbnail_filepath(self):
|
|
'''
|
|
Create the filepath that should be the location of our thumbnail.
|
|
'''
|
|
chunked_id = helpers.chunk_sequence(self.id, 3)
|
|
(folder, basename) = (chunked_id[:-1], chunked_id[-1])
|
|
folder = os.sep.join(folder)
|
|
folder = self.photodb.thumbnail_directory.join(folder)
|
|
if folder:
|
|
os.makedirs(folder.absolute_path, exist_ok=True)
|
|
hopeful_filepath = folder.with_child(basename + '.jpg')
|
|
return hopeful_filepath
|
|
|
|
#@decorators.time_me
|
|
@decorators.required_feature('photo.reload_metadata')
|
|
@decorators.transaction
|
|
def reload_metadata(self, *, commit=True):
|
|
'''
|
|
Load the file's height, width, etc as appropriate for this type of file.
|
|
'''
|
|
self.bytes = self.real_path.size
|
|
self.width = None
|
|
self.height = None
|
|
self.area = None
|
|
self.ratio = None
|
|
self.duration = None
|
|
|
|
self.photodb.log.debug('Reloading metadata for %s', self)
|
|
|
|
if self.simple_mimetype == 'image':
|
|
try:
|
|
image = PIL.Image.open(self.real_path.absolute_path)
|
|
except (OSError, ValueError):
|
|
self.photodb.log.debug('Failed to read image data for %s', self)
|
|
else:
|
|
(self.width, self.height) = image.size
|
|
image.close()
|
|
|
|
elif self.simple_mimetype == 'video' and constants.ffmpeg:
|
|
try:
|
|
probe = constants.ffmpeg.probe(self.real_path.absolute_path)
|
|
if probe and probe.video:
|
|
self.duration = probe.format.duration or probe.video.duration
|
|
self.width = probe.video.video_width
|
|
self.height = probe.video.video_height
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
elif self.simple_mimetype == 'audio' and constants.ffmpeg:
|
|
try:
|
|
probe = constants.ffmpeg.probe(self.real_path.absolute_path)
|
|
if probe and probe.audio:
|
|
self.duration = probe.audio.duration
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
if self.width and self.height:
|
|
self.area = self.width * self.height
|
|
self.ratio = round(self.width / self.height, 2)
|
|
|
|
data = {
|
|
'id': self.id,
|
|
'width': self.width,
|
|
'height': self.height,
|
|
'area': self.area,
|
|
'ratio': self.ratio,
|
|
'duration': self.duration,
|
|
'bytes': self.bytes,
|
|
}
|
|
self.photodb.sql_update(table='photos', pairs=data, where_key='id')
|
|
|
|
self._uncache()
|
|
if commit:
|
|
self.photodb.log.debug('Committing - reload metadata')
|
|
self.photodb.commit()
|
|
|
|
@decorators.required_feature('photo.edit')
|
|
@decorators.transaction
|
|
def relocate(self, new_filepath, *, allow_duplicates=False, commit=True):
|
|
'''
|
|
Point the Photo object to a different filepath.
|
|
|
|
DOES NOT MOVE THE FILE, only acknowledges a move that was performed
|
|
outside of the system.
|
|
To rename or move the file, use `rename_file`.
|
|
|
|
allow_duplicates:
|
|
Allow even if there is another Photo for that path.
|
|
'''
|
|
new_filepath = pathclass.Path(new_filepath)
|
|
if not new_filepath.is_file:
|
|
raise FileNotFoundError(new_filepath.absolute_path)
|
|
|
|
if not allow_duplicates:
|
|
self.photodb.assert_no_such_photo_by_path(filepath=new_filepath)
|
|
|
|
data = {
|
|
'id': self.id,
|
|
'filepath': new_filepath.absolute_path,
|
|
}
|
|
self.photodb.sql_update(table='photos', pairs=data, where_key='id')
|
|
|
|
self._uncache()
|
|
if commit:
|
|
self.photodb.log.debug('Committing - relocate photo')
|
|
self.photodb.commit()
|
|
|
|
@decorators.required_feature('photo.add_remove_tag')
|
|
@decorators.transaction
|
|
def remove_tag(self, tag, *, commit=True):
|
|
tag = self.photodb.get_tag(name=tag)
|
|
|
|
self.photodb.log.debug('Removing %s from %s', tag, self)
|
|
tags = list(tag.walk_children())
|
|
|
|
for tag in tags:
|
|
pairs = {'photoid': self.id, 'tagid': tag.id}
|
|
self.photodb.sql_delete(table='photo_tag_rel', pairs=pairs)
|
|
|
|
data = {
|
|
'id': self.id,
|
|
'tagged_at': helpers.now(),
|
|
}
|
|
self.photodb.sql_update(table='photos', pairs=data, where_key='id')
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - remove photo tag')
|
|
self.photodb.commit()
|
|
|
|
@decorators.required_feature('photo.edit')
|
|
@decorators.transaction
|
|
def rename_file(self, new_filename, *, move=False, commit=True):
|
|
'''
|
|
Rename the file on the disk as well as in the database.
|
|
|
|
move:
|
|
If True, allow the file to be moved into another directory.
|
|
Otherwise, the rename must be local.
|
|
'''
|
|
old_path = self.real_path
|
|
old_path.correct_case()
|
|
|
|
new_filename = helpers.remove_path_badchars(new_filename, allowed=':\\/')
|
|
if os.path.dirname(new_filename) == '':
|
|
new_path = old_path.parent.with_child(new_filename)
|
|
else:
|
|
new_path = pathclass.Path(new_filename)
|
|
#new_path.correct_case()
|
|
|
|
self.photodb.log.debug(old_path)
|
|
self.photodb.log.debug(new_path)
|
|
if (new_path.parent != old_path.parent) and not move:
|
|
raise ValueError('Cannot move the file without param move=True')
|
|
|
|
if new_path.absolute_path == old_path.absolute_path:
|
|
raise ValueError('The new and old names are the same')
|
|
|
|
os.makedirs(new_path.parent.absolute_path, exist_ok=True)
|
|
|
|
if new_path.normcase != old_path.normcase:
|
|
# It's possible on case-insensitive systems to have the paths point
|
|
# to the same place while being differently cased, thus we couldn't
|
|
# make the intermediate link.
|
|
# Instead, we will do a simple rename in just a moment.
|
|
try:
|
|
os.link(old_path.absolute_path, new_path.absolute_path)
|
|
except OSError:
|
|
spinal.copy_file(old_path, new_path)
|
|
|
|
data = {
|
|
'filepath': (old_path.absolute_path, new_path.absolute_path),
|
|
}
|
|
self.photodb.sql_update(table='photos', pairs=data, where_key='filepath')
|
|
|
|
if new_path.normcase == old_path.normcase:
|
|
# If they are equivalent but differently cased, just rename.
|
|
action = os.rename
|
|
args = [old_path.absolute_path, new_path.absolute_path]
|
|
else:
|
|
# Delete the original, leaving only the new copy / hardlink.
|
|
action = os.remove
|
|
args = [old_path.absolute_path]
|
|
|
|
self._uncache()
|
|
if commit:
|
|
action(*args)
|
|
self.photodb.log.debug('Committing - rename file')
|
|
self.photodb.commit()
|
|
else:
|
|
queue_action = {'action': action, 'args': args}
|
|
self.photodb.on_commit_queue.append(queue_action)
|
|
|
|
self.__reinit__()
|
|
|
|
@decorators.required_feature('photo.edit')
|
|
@decorators.transaction
|
|
def set_searchhidden(self, searchhidden, *, commit=True):
|
|
data = {
|
|
'id': self.id,
|
|
'searchhidden': bool(searchhidden),
|
|
}
|
|
self.photodb.sql_update(table='photos', pairs=data, where_key='id')
|
|
|
|
self.searchhidden = searchhidden
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - set searchhidden')
|
|
self.photodb.commit()
|
|
|
|
@decorators.required_feature('photo.edit')
|
|
@decorators.transaction
|
|
def set_override_filename(self, new_filename, *, commit=True):
|
|
if new_filename is not None:
|
|
cleaned = helpers.remove_path_badchars(new_filename)
|
|
cleaned = cleaned.strip()
|
|
if not cleaned:
|
|
raise ValueError(f'"{new_filename}" is not valid.')
|
|
new_filename = cleaned
|
|
|
|
data = {
|
|
'id': self.id,
|
|
'override_filename': new_filename,
|
|
}
|
|
self.photodb.sql_update(table='photos', pairs=data, where_key='id')
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - set override filename')
|
|
self.photodb.commit()
|
|
|
|
self.__reinit__()
|
|
|
|
|
|
class Tag(ObjectBase, GroupableMixin):
|
|
'''
|
|
A Tag, which can be applied to Photos for organization.
|
|
'''
|
|
group_table = 'tag_group_rel'
|
|
group_sql_index = constants.SQL_INDEX[group_table]
|
|
|
|
def __init__(self, photodb, db_row):
|
|
super().__init__(photodb)
|
|
if isinstance(db_row, (list, tuple)):
|
|
db_row = dict(zip(constants.SQL_COLUMNS['tags'], db_row))
|
|
self.id = db_row['id']
|
|
|
|
# Do not pass the name through the normalizer. It may be grandfathered
|
|
# from previous character / length rules.
|
|
self.name = db_row['name']
|
|
self.description = self.normalize_description(db_row['description'])
|
|
self.author_id = self.normalize_author_id(db_row['author_id'])
|
|
|
|
self.group_getter = self.photodb.get_tag
|
|
self.group_getter_many = self.photodb.get_tags_by_id
|
|
self._cached_synonyms = None
|
|
|
|
def __repr__(self):
|
|
return f'Tag:{self.id}:{self.name}'
|
|
|
|
def __str__(self):
|
|
return f'Tag:{self.name}'
|
|
|
|
@staticmethod
|
|
def normalize_description(description):
|
|
if description is None:
|
|
return ''
|
|
|
|
if not isinstance(description, str):
|
|
raise TypeError(f'Description must be string, not {type(description)}')
|
|
|
|
description = description.strip()
|
|
|
|
return description
|
|
|
|
@staticmethod
|
|
def normalize_name(name, valid_chars=None, min_length=None, max_length=None):
|
|
original_name = name
|
|
if valid_chars is None:
|
|
valid_chars = constants.DEFAULT_CONFIGURATION['tag']['valid_chars']
|
|
|
|
name = name.lower().strip()
|
|
name = name.strip('.+')
|
|
name = name.split('+')[0].split('.')[-1]
|
|
name = name.replace('-', '_')
|
|
name = name.replace(' ', '_')
|
|
name = ''.join(c for c in name if c in valid_chars)
|
|
|
|
if min_length is not None and len(name) < min_length:
|
|
raise exceptions.TagTooShort(original_name)
|
|
|
|
if max_length is not None and len(name) > max_length:
|
|
raise exceptions.TagTooLong(name)
|
|
|
|
return name
|
|
|
|
def _uncache(self):
|
|
self.photodb.caches['tag'].remove(self.id)
|
|
|
|
@decorators.required_feature('tag.edit')
|
|
# GroupableMixin.add_child already has @transaction.
|
|
def add_child(self, *args, **kwargs):
|
|
return super().add_child(*args, **kwargs)
|
|
|
|
@decorators.required_feature('tag.edit')
|
|
# GroupableMixin.add_children already has @transaction.
|
|
def add_children(self, *args, **kwargs):
|
|
return super().add_children(*args, **kwargs)
|
|
|
|
@decorators.required_feature('tag.edit')
|
|
@decorators.transaction
|
|
def add_synonym(self, synname, *, commit=True):
|
|
synname = self.photodb.normalize_tagname(synname)
|
|
|
|
if synname == self.name:
|
|
raise exceptions.CantSynonymSelf()
|
|
|
|
self.photodb.assert_no_such_tag(name=synname)
|
|
|
|
self.photodb.log.debug('New synonym %s of %s', synname, self.name)
|
|
|
|
self.photodb._cached_frozen_children = None
|
|
|
|
data = {
|
|
'name': synname,
|
|
'mastername': self.name,
|
|
}
|
|
self.photodb.sql_insert(table='tag_synonyms', data=data)
|
|
|
|
if self._cached_synonyms is not None:
|
|
self._cached_synonyms.add(synname)
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - add synonym')
|
|
self.photodb.commit()
|
|
|
|
return synname
|
|
|
|
@decorators.required_feature('tag.edit')
|
|
@decorators.transaction
|
|
def convert_to_synonym(self, mastertag, *, commit=True):
|
|
'''
|
|
Convert this tag into a synonym for a different tag.
|
|
All photos which possess the current tag will have it replaced with the
|
|
new master tag.
|
|
All synonyms of the old tag will point to the new tag.
|
|
|
|
Good for when two tags need to be merged under a single name.
|
|
'''
|
|
mastertag = self.photodb.get_tag(name=mastertag)
|
|
|
|
self.photodb._cached_frozen_children = None
|
|
|
|
# Migrate the old tag's synonyms to the new one
|
|
# UPDATE is safe for this operation because there is no chance of duplicates.
|
|
my_synonyms = self.get_synonyms()
|
|
data = {
|
|
'mastername': (self.name, mastertag.name),
|
|
}
|
|
self.photodb.sql_update(table='tag_synonyms', pairs=data, where_key='mastername')
|
|
if mastertag._cached_synonyms is not None:
|
|
mastertag._cached_synonyms.update(my_synonyms)
|
|
|
|
# Because these were two separate tags, perhaps in separate trees, it
|
|
# is possible for a photo to have both at the moment.
|
|
#
|
|
# If they already have both, the deletion of the syn rel will happen
|
|
# when the syn tag is deleted.
|
|
# If they only have the syn, we will UPDATE it to the master.
|
|
# If they only have the master, nothing needs to happen.
|
|
|
|
# Find photos that have the old tag and DON'T already have the new one.
|
|
query = '''
|
|
SELECT photoid FROM photo_tag_rel p1
|
|
WHERE tagid == ?
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM photo_tag_rel p2
|
|
WHERE p1.photoid == p2.photoid
|
|
AND tagid == ?
|
|
)
|
|
'''
|
|
bindings = [self.id, mastertag.id]
|
|
replace_photoids = [row[0] for row in self.photodb.sql_execute(query, bindings)]
|
|
|
|
# For those photos that only had the syn, simply replace with master.
|
|
if replace_photoids:
|
|
query = f'''
|
|
UPDATE photo_tag_rel
|
|
SET tagid = ?
|
|
WHERE tagid == ?
|
|
AND photoid IN {helpers.sql_listify(replace_photoids)}
|
|
'''
|
|
bindings = [mastertag.id, self.id]
|
|
self.photodb.sql_execute(query, bindings)
|
|
|
|
# For photos that have the old tag and DO already have the new one,
|
|
# don't worry because the old rels will be deleted when the tag is
|
|
# deleted.
|
|
self.delete(commit=False)
|
|
|
|
# Enjoy your new life as a monk.
|
|
mastertag.add_synonym(self.name, commit=False)
|
|
if commit:
|
|
self.photodb.log.debug('Committing - convert to synonym')
|
|
self.photodb.commit()
|
|
|
|
@decorators.required_feature('tag.edit')
|
|
@decorators.transaction
|
|
def delete(self, *, delete_children=False, commit=True):
|
|
self.photodb.log.debug('Deleting %s', self)
|
|
self.photodb._cached_frozen_children = None
|
|
GroupableMixin.delete(self, delete_children=delete_children, commit=False)
|
|
self.photodb.sql_delete(table='photo_tag_rel', pairs={'tagid': self.id})
|
|
self.photodb.sql_delete(table='tag_synonyms', pairs={'mastername': self.name})
|
|
self.photodb.sql_delete(table='tags', pairs={'id': self.id})
|
|
self._uncache()
|
|
if commit:
|
|
self.photodb.log.debug('Committing - delete tag')
|
|
self.photodb.commit()
|
|
|
|
@decorators.required_feature('tag.edit')
|
|
@decorators.transaction
|
|
def edit(self, description=None, *, commit=True):
|
|
'''
|
|
Change the description. Leave None to keep current value.
|
|
'''
|
|
if description is None:
|
|
return
|
|
|
|
self.description = self.normalize_description(description)
|
|
|
|
data = {
|
|
'id': self.id,
|
|
'description': self.description
|
|
}
|
|
self.photodb.sql_update(table='tags', pairs=data, where_key='id')
|
|
|
|
self._uncache()
|
|
if commit:
|
|
self.photodb.log.debug('Committing - edit tag')
|
|
self.photodb.commit()
|
|
|
|
def get_synonyms(self):
|
|
if self._cached_synonyms is not None:
|
|
return self._cached_synonyms.copy()
|
|
|
|
syn_rows = self.photodb.sql_select(
|
|
'SELECT name FROM tag_synonyms WHERE mastername == ?',
|
|
[self.name]
|
|
)
|
|
synonyms = set(row[0] for row in syn_rows)
|
|
self._cached_synonyms = synonyms.copy()
|
|
return synonyms
|
|
|
|
@decorators.required_feature('tag.edit')
|
|
# GroupableMixin.leave_group already has @transaction.
|
|
def leave_group(self, *args, **kwargs):
|
|
return super().leave_group(*args, **kwargs)
|
|
|
|
@decorators.required_feature('tag.edit')
|
|
@decorators.transaction
|
|
def remove_synonym(self, synname, *, commit=True):
|
|
'''
|
|
Delete a synonym.
|
|
This will have no effect on photos or other synonyms because
|
|
they always resolve to the master tag before application.
|
|
'''
|
|
synname = self.photodb.normalize_tagname(synname)
|
|
if synname == self.name:
|
|
raise exceptions.NoSuchSynonym(synname)
|
|
|
|
syn_exists = self.photodb.sql_select_one(
|
|
'SELECT 1 FROM tag_synonyms WHERE mastername == ? AND name == ?',
|
|
[self.name, synname]
|
|
)
|
|
|
|
if syn_exists is None:
|
|
raise exceptions.NoSuchSynonym(synname)
|
|
|
|
self.photodb._cached_frozen_children = None
|
|
self.photodb.sql_delete(table='tag_synonyms', pairs={'name': synname})
|
|
if self._cached_synonyms is not None:
|
|
self._cached_synonyms.remove(synname)
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - remove synonym')
|
|
self.photodb.commit()
|
|
|
|
@decorators.required_feature('tag.edit')
|
|
@decorators.transaction
|
|
def rename(self, new_name, *, apply_to_synonyms=True, commit=True):
|
|
'''
|
|
Rename the tag. Does not affect its relation to Photos or tag groups.
|
|
'''
|
|
new_name = self.photodb.normalize_tagname(new_name)
|
|
old_name = self.name
|
|
if new_name == old_name:
|
|
return
|
|
|
|
try:
|
|
self.photodb.get_tag(name=new_name)
|
|
except exceptions.NoSuchTag:
|
|
pass
|
|
else:
|
|
raise exceptions.TagExists(new_name)
|
|
|
|
self.photodb._cached_frozen_children = None
|
|
|
|
data = {
|
|
'id': self.id,
|
|
'name': new_name,
|
|
}
|
|
self.photodb.sql_update(table='tags', pairs=data, where_key='id')
|
|
|
|
if apply_to_synonyms:
|
|
data = {
|
|
'mastername': (old_name, new_name),
|
|
}
|
|
self.photodb.sql_update(table='tag_synonyms', pairs=data, where_key='mastername')
|
|
|
|
self.name = new_name
|
|
self._uncache()
|
|
if commit:
|
|
self.photodb.log.debug('Committing - rename tag')
|
|
self.photodb.commit()
|
|
|
|
|
|
class User(ObjectBase):
|
|
'''
|
|
A dear friend of ours.
|
|
'''
|
|
def __init__(self, photodb, db_row):
|
|
super().__init__(photodb)
|
|
if isinstance(db_row, (list, tuple)):
|
|
db_row = dict(zip(constants.SQL_COLUMNS['users'], db_row))
|
|
self.id = db_row['id']
|
|
self.username = db_row['username']
|
|
self.created = db_row['created']
|
|
self.password_hash = db_row['password']
|
|
# Do not enforce maxlen here, they may be grandfathered in.
|
|
self._display_name = self.normalize_display_name(db_row['display_name'])
|
|
|
|
def __repr__(self):
|
|
return f'User:{self.id}:{self.username}'
|
|
|
|
def __str__(self):
|
|
return f'User:{self.username}'
|
|
|
|
@staticmethod
|
|
def normalize_display_name(display_name, max_length=None):
|
|
if display_name is None:
|
|
return None
|
|
|
|
if not isinstance(display_name, str):
|
|
raise TypeError(f'Display name must be string, not {type(display_name)}.')
|
|
|
|
display_name = display_name.strip()
|
|
|
|
if display_name == '':
|
|
return None
|
|
|
|
if max_length is not None and len(display_name) > max_length:
|
|
raise exceptions.DisplayNameTooLong(display_name=display_name, max_length=max_length)
|
|
|
|
return display_name
|
|
|
|
@property
|
|
def display_name(self):
|
|
if self._display_name is None:
|
|
return self.username
|
|
else:
|
|
return self._display_name
|
|
|
|
@decorators.required_feature('user.edit')
|
|
@decorators.transaction
|
|
def set_display_name(self, display_name, *, commit=True):
|
|
display_name = self.normalize_display_name(
|
|
display_name,
|
|
max_length=self.photodb.config['user']['max_display_name_length'],
|
|
)
|
|
|
|
data = {
|
|
'id': self.id,
|
|
'display_name': display_name,
|
|
}
|
|
self.photodb.sql_update(table='users', pairs=data, where_key='id')
|
|
|
|
self._display_name = display_name
|
|
|
|
if commit:
|
|
self.photodb.log.debug('Committing - set display name')
|
|
self.photodb.commit()
|
|
|
|
|
|
class WarningBag:
|
|
def __init__(self):
|
|
self.warnings = set()
|
|
|
|
def add(self, warning):
|
|
self.warnings.add(warning)
|