diff --git a/etiquette/helpers.py b/etiquette/helpers.py index 70c45e7..c8a61d2 100644 --- a/etiquette/helpers.py +++ b/etiquette/helpers.py @@ -7,6 +7,7 @@ import hashlib import mimetypes import os import PIL.Image +import typing import zipstream from voussoirkit import bytestring @@ -123,7 +124,7 @@ def album_photos_as_filename_map( return arcnames -def checkerboard_image(color_1, color_2, image_size, checker_size): +def checkerboard_image(color_1, color_2, image_size, checker_size) -> PIL.Image: ''' Generate a PIL Image with a checkerboard pattern. @@ -200,10 +201,10 @@ def decollide_names(things, namer): final[thing] = myname return final -def dict_to_tuple(d): +def dict_to_tuple(d) -> tuple: return tuple(sorted(d.items())) -def generate_image_thumbnail(filepath, width, height): +def generate_image_thumbnail(filepath, width, height) -> PIL.Image: if not os.path.isfile(filepath): raise FileNotFoundError(filepath) image = PIL.Image.open(filepath) @@ -234,7 +235,7 @@ def generate_image_thumbnail(filepath, width, height): image = image.convert('RGB') return image -def generate_video_thumbnail(filepath, outfile, width, height, **special): +def generate_video_thumbnail(filepath, outfile, width, height, **special) -> PIL.Image: if not os.path.isfile(filepath): raise FileNotFoundError(filepath) probe = constants.ffmpeg.probe(filepath) @@ -267,7 +268,7 @@ def generate_video_thumbnail(filepath, outfile, width, height, **special): ) return True -def get_mimetype(filepath): +def get_mimetype(filepath) -> typing.Optional[str]: ''' Extension to mimetypes.guess_type which uses my constants.ADDITIONAL_MIMETYPES. @@ -278,7 +279,7 @@ def get_mimetype(filepath): mimetype = mimetypes.guess_type(filepath)[0] return mimetype -def hash_photoset(photos): +def hash_photoset(photos) -> str: ''' Given some photos, return a fingerprint string for that particular set. ''' @@ -290,7 +291,7 @@ def hash_photoset(photos): return hasher.hexdigest() -def hyphen_range(s): +def hyphen_range(s) -> tuple: ''' Given a string like '1-3', return numbers (1, 3) representing lower and upper bounds. @@ -319,9 +320,9 @@ def hyphen_range(s): if low is not None and high is not None and low > high: raise exceptions.OutOfOrder(range=s, min=low, max=high) - return low, high + return (low, high) -def is_xor(*args): +def is_xor(*args) -> bool: ''' Return True if and only if one arg is truthy. ''' @@ -336,7 +337,7 @@ def now(timestamp=True): return n.timestamp() return n -def parse_unit_string(s): +def parse_unit_string(s) -> typing.Union[int, float, None]: ''' Try to parse the string as an int, float, or bytestring, or hms. ''' @@ -357,7 +358,12 @@ def parse_unit_string(s): else: return bytestring.parsebytes(s) -def read_filebytes(filepath, range_min=0, range_max=None, chunk_size=bytestring.MIBIBYTE): +def read_filebytes( + filepath, + range_min=0, + range_max=None, + chunk_size=bytestring.MIBIBYTE, + ) -> typing.Iterable[bytes]: ''' Yield chunks of bytes from the file between the endpoints. ''' @@ -373,19 +379,15 @@ def read_filebytes(filepath, range_min=0, range_max=None, chunk_size=bytestring. with f: f.seek(range_min) while sent_amount < range_span: - chunk = f.read(chunk_size) - if len(chunk) == 0: - break - needed = range_span - sent_amount - if len(chunk) >= needed: - yield chunk[:needed] + chunk = f.read(min(needed, chunk_size)) + if len(chunk) == 0: break yield chunk sent_amount += len(chunk) -def remove_path_badchars(filepath, allowed=''): +def remove_path_badchars(filepath, allowed='') -> str: ''' Remove the bad characters seen in constants.FILENAME_BADCHARS, except those which you explicitly permit. @@ -409,15 +411,18 @@ def slice_before(li, item): index = li.index(item) return li[:index] -def split_easybake_string(ebstring): +def split_easybake_string(ebstring) -> tuple[str, str, str]: ''' Given an easybake string, return (tagname, synonym, rename_to), where tagname may be a full qualified name, and at least one of synonym or rename_to will be None since both are not posible at once. - 'languages.python' -> ('languages.python', None, None) - 'languages.python+py' -> ('languages.python', 'py', None) - 'languages.python=bestlang' -> ('languages.python', None, 'bestlang') + >>> split_easybake_string('languages.python') + ('languages.python', None, None) + >>> split_easybake_string('languages.python+py') + ('languages.python', 'py', None) + >>> split_easybake_string('languages.python=bestlang') + ('languages.python', None, 'bestlang') ''' ebstring = ebstring.strip() ebstring = ebstring.strip('.+=') @@ -454,7 +459,7 @@ def split_easybake_string(ebstring): tagname = tagname.strip('.') return (tagname, synonym, rename_to) -def truthystring(s, fallback=False): +def truthystring(s, fallback=False) -> typing.Union[bool, None]: ''' If s is already a boolean, int, or None, return a boolean or None. If s is a string, return True, False, or None based on the options presented @@ -480,7 +485,7 @@ def truthystring(s, fallback=False): return None return False -def zip_album(album, recursive=True): +def zip_album(album, recursive=True) -> zipstream.ZipFile: ''' Given an album, return a zipstream zipfile that contains the album's photos (recursive = include children's photos) organized into folders @@ -521,7 +526,7 @@ def zip_album(album, recursive=True): return zipfile -def zip_photos(photos): +def zip_photos(photos) -> zipstream.ZipFile: ''' Given some photos, return a zipstream zipfile that contains the files. ''' diff --git a/etiquette/objects.py b/etiquette/objects.py index 5695d5b..54fa6eb 100644 --- a/etiquette/objects.py +++ b/etiquette/objects.py @@ -10,6 +10,7 @@ import PIL.Image import re import send2trash import traceback +import typing from voussoirkit import bytestring from voussoirkit import gentools @@ -27,7 +28,12 @@ from . import helpers BAIL = sentinel.Sentinel('BAIL') -def normalize_db_row(db_row, table): +def normalize_db_row(db_row, table) -> dict: + ''' + Raises KeyError if table is not one of the recognized tables. + + Raises TypeError if db_row is not the right type. + ''' if isinstance(db_row, dict): return db_row @@ -69,7 +75,12 @@ class ObjectBase: return hash(self.id) @staticmethod - def normalize_author_id(author_id): + def normalize_author_id(author_id) -> typing.Optional[str]: + ''' + Raises TypeError if author_id is not the right type. + + Raises ValueError if author_id contains invalid characters. + ''' if author_id is None: return None @@ -85,10 +96,14 @@ class ObjectBase: return author_id - def assert_not_deleted(self): + def assert_not_deleted(self) -> None: + ''' + Raises exceptions.DeletedObject if this object is deleted. + ''' if self.deleted: raise exceptions.DeletedObject(self) + # Will add -> User when forward references are supported by Python. def get_author(self): ''' Return the User who created this object, or None if it is unassigned. @@ -101,7 +116,7 @@ class GroupableMixin(metaclass=abc.ABCMeta): group_getter_many = None group_table = None - def __lift_children(self): + def _lift_children(self): ''' If this object has parents, the parents adopt all of its children. Otherwise, this object is at the root level, so the parental @@ -117,7 +132,7 @@ class GroupableMixin(metaclass=abc.ABCMeta): for parent in parents: parent.add_children(children) - def __add_child(self, member): + def _add_child(self, member): self.assert_same_type(member) if member == self: @@ -140,24 +155,28 @@ class GroupableMixin(metaclass=abc.ABCMeta): @abc.abstractmethod def add_child(self, member): - return self.__add_child(member) + return self._add_child(member) @abc.abstractmethod def add_children(self, members): bail = True for member in members: - bail = (self.__add_child(member) is BAIL) and bail + bail = (self._add_child(member) is BAIL) and bail if bail: return BAIL - def assert_same_type(self, other): + def assert_same_type(self, other) -> None: + ''' + Raise TypeError if other is not the same type as self, or if other is + associated with a different etiquette.PhotoDB object. + ''' if not isinstance(other, type(self)): raise TypeError(f'Object must be of type {type(self)}, not {type(other)}.') if self.photodb != other.photodb: raise TypeError(f'Objects must belong to the same PhotoDB.') @abc.abstractmethod - def delete(self, *, delete_children=False): + def delete(self, *, delete_children=False) -> None: ''' Delete this object's relationships to other groupables. Any unique / specific deletion methods should be written within the @@ -174,7 +193,7 @@ class GroupableMixin(metaclass=abc.ABCMeta): for child in self.get_children(): child.delete(delete_children=True) else: - self.__lift_children() + self._lift_children() # Note that this part comes after the deletion of children to prevent # issues of recursion. @@ -182,7 +201,7 @@ class GroupableMixin(metaclass=abc.ABCMeta): self._uncache() self.deleted = True - def get_children(self): + def get_children(self) -> set: child_rows = self.photodb.sql_select( f'SELECT memberid FROM {self.group_table} WHERE parentid == ?', [self.id] @@ -191,42 +210,42 @@ class GroupableMixin(metaclass=abc.ABCMeta): children = set(self.group_getter_many(child_ids)) return children - def get_parents(self): + def get_parents(self) -> set: query = f'SELECT parentid FROM {self.group_table} WHERE memberid == ?' parent_rows = self.photodb.sql_select(query, [self.id]) parent_ids = (parent_id for (parent_id,) in parent_rows) parents = set(self.group_getter_many(parent_ids)) return parents - def has_ancestor(self, ancestor): + def has_ancestor(self, ancestor) -> bool: return ancestor in self.walk_parents() - def has_any_child(self): + def has_any_child(self) -> bool: query = f'SELECT 1 FROM {self.group_table} WHERE parentid == ? LIMIT 1' row = self.photodb.sql_select_one(query, [self.id]) return row is not None - def has_any_parent(self): + def has_any_parent(self) -> bool: query = f'SELECT 1 FROM {self.group_table} WHERE memberid == ? LIMIT 1' row = self.photodb.sql_select_one(query, [self.id]) return row is not None - def has_child(self, member): + def has_child(self, member) -> bool: self.assert_same_type(member) query = f'SELECT 1 FROM {self.group_table} WHERE parentid == ? AND memberid == ?' row = self.photodb.sql_select_one(query, [self.id, member.id]) return row is not None - def has_descendant(self, descendant): + def has_descendant(self, descendant) -> bool: return self in descendant.walk_parents() - def has_parent(self, parent): + def has_parent(self, parent) -> bool: self.assert_same_type(parent) query = f'SELECT 1 FROM {self.group_table} WHERE parentid == ? AND memberid == ?' row = self.photodb.sql_select_one(query, [parent.id, self.id]) return row is not None - def __remove_child(self, member): + def _remove_child(self, member): if not self.has_child(member): return BAIL @@ -240,17 +259,17 @@ class GroupableMixin(metaclass=abc.ABCMeta): @abc.abstractmethod def remove_child(self, member): - return self.__remove_child(member) + return self._remove_child(member) @abc.abstractmethod def remove_children(self, members): bail = True for member in members: - bail = (self.__remove_child(member) is BAIL) and bail + bail = (self._remove_child(member) is BAIL) and bail if bail: return BAIL - def walk_children(self): + def walk_children(self) -> typing.Iterable: ''' Yield self and all descendants. ''' @@ -258,7 +277,7 @@ class GroupableMixin(metaclass=abc.ABCMeta): for child in self.get_children(): yield from child.walk_children() - def walk_parents(self): + def walk_parents(self) -> typing.Iterable: ''' Yield all ancestors, but not self, in no particular order. ''' @@ -300,7 +319,10 @@ class Album(ObjectBase, GroupableMixin): return f'Album:{self.id}' @staticmethod - def normalize_description(description): + def normalize_description(description) -> str: + ''' + Raises TypeError if description is not a string or None. + ''' if description is None: return '' @@ -312,7 +334,10 @@ class Album(ObjectBase, GroupableMixin): return description @staticmethod - def normalize_title(title): + def normalize_title(title) -> str: + ''' + Raises TypeError if title is not a string or None. + ''' if title is None: return '' @@ -341,23 +366,35 @@ class Album(ObjectBase, GroupableMixin): @decorators.required_feature('album.edit') @decorators.transaction - def add_associated_directory(self, path): + def add_associated_directory(self, path) -> None: ''' Add a directory from which this album will pull files during rescans. These relationships are not unique and multiple albums can associate with the same directory if desired. + + Raises ValueError if path is not a directory. ''' self._add_associated_directory(path) @decorators.required_feature('album.edit') @decorators.transaction - def add_associated_directories(self, paths): + def add_associated_directories(self, paths) -> None: + ''' + Add multiple associated directories. + + Raises ValueError if any path is not a directory. + ''' for path in paths: self._add_associated_directory(path) @decorators.required_feature('album.edit') @decorators.transaction def add_child(self, member): + ''' + Raises exceptions.CantGroupSelf if member is self. + + Raises exceptions.RecursiveGrouping if member is an ancestor of self. + ''' return super().add_child(member) @decorators.required_feature('album.edit') @@ -372,7 +409,7 @@ class Album(ObjectBase, GroupableMixin): @decorators.required_feature('album.edit') @decorators.transaction - def add_photo(self, photo): + def add_photo(self, photo) -> None: if self.has_photo(photo): return @@ -380,7 +417,7 @@ class Album(ObjectBase, GroupableMixin): @decorators.required_feature('album.edit') @decorators.transaction - def add_photos(self, photos): + def add_photos(self, photos) -> None: existing_photos = set(self.get_photos()) photos = set(photos) new_photos = photos.difference(existing_photos) @@ -393,7 +430,7 @@ class Album(ObjectBase, GroupableMixin): # Photo.add_tag already has @required_feature @decorators.transaction - def add_tag_to_all(self, tag, *, nested_children=True): + def add_tag_to_all(self, tag, *, nested_children=True) -> None: ''' Add this tag to every photo in the album. Saves you from having to write the for-loop yourself. @@ -413,7 +450,7 @@ class Album(ObjectBase, GroupableMixin): @decorators.required_feature('album.edit') @decorators.transaction - def delete(self, *, delete_children=False): + def delete(self, *, delete_children=False) -> None: self.photodb.log.info('Deleting %s.', self) GroupableMixin.delete(self, delete_children=delete_children) self.photodb.sql_delete(table='album_associated_directories', pairs={'albumid': self.id}) @@ -423,7 +460,7 @@ class Album(ObjectBase, GroupableMixin): self.deleted = True @property - def display_name(self): + def display_name(self) -> str: if self.title: return self.title else: @@ -431,7 +468,7 @@ class Album(ObjectBase, GroupableMixin): @decorators.required_feature('album.edit') @decorators.transaction - def edit(self, title=None, description=None): + def edit(self, title=None, description=None) -> None: ''' Change the title or description. Leave None to keep current value. ''' @@ -454,13 +491,13 @@ class Album(ObjectBase, GroupableMixin): self.description = description @property - def full_name(self): + def full_name(self) -> str: if self.title: return f'{self.id} - {self.title}' else: return self.id - def get_associated_directories(self): + def get_associated_directories(self) -> set[pathclass.Path]: directory_rows = self.photodb.sql_select( 'SELECT directory FROM album_associated_directories WHERE albumid == ?', [self.id] @@ -468,7 +505,7 @@ class Album(ObjectBase, GroupableMixin): directories = set(pathclass.Path(directory) for (directory,) in directory_rows) return directories - def get_photos(self): + def get_photos(self) -> set: photo_rows = self.photodb.sql_select( 'SELECT photoid FROM album_photo_rel WHERE albumid == ?', [self.id] @@ -477,7 +514,7 @@ class Album(ObjectBase, GroupableMixin): photos = set(self.photodb.get_photos_by_id(photo_ids)) return photos - def has_any_associated_directory(self): + def has_any_associated_directory(self) -> bool: ''' Return True if this album has at least 1 associated directory. ''' @@ -487,7 +524,7 @@ class Album(ObjectBase, GroupableMixin): ) return row is not None - def has_any_photo(self, recurse=False): + def has_any_photo(self, recurse=False) -> bool: ''' Return True if this album contains at least 1 photo. @@ -505,14 +542,14 @@ class Album(ObjectBase, GroupableMixin): return self.has_any_subalbum_photo() return False - def has_any_subalbum_photo(self): + def has_any_subalbum_photo(self) -> bool: ''' Return True if any descendent album has any photo, ignoring whether this particular album itself has photos. ''' return any(child.has_any_photo(recurse=True) for child in self.get_children()) - def has_associated_directory(self, path): + def has_associated_directory(self, path) -> bool: path = pathclass.Path(path) row = self.photodb.sql_select_one( 'SELECT 1 FROM album_associated_directories WHERE albumid == ? AND directory == ?', @@ -520,14 +557,14 @@ class Album(ObjectBase, GroupableMixin): ) return row is not None - def has_photo(self, photo): + def has_photo(self, photo) -> bool: row = self.photodb.sql_select_one( 'SELECT 1 FROM album_photo_rel WHERE albumid == ? AND photoid == ?', [self.id, photo.id] ) return row is not None - def jsonify(self, minimal=False): + def jsonify(self, minimal=False) -> dict: j = { 'type': 'album', 'id': self.id, @@ -562,12 +599,12 @@ class Album(ObjectBase, GroupableMixin): @decorators.required_feature('album.edit') @decorators.transaction - def remove_photo(self, photo): + def remove_photo(self, photo) -> None: self._remove_photo(photo) @decorators.required_feature('album.edit') @decorators.transaction - def remove_photos(self, photos): + def remove_photos(self, photos) -> None: existing_photos = set(self.get_photos()) photos = set(photos) remove_photos = photos.intersection(existing_photos) @@ -580,7 +617,12 @@ class Album(ObjectBase, GroupableMixin): @decorators.required_feature('album.edit') @decorators.transaction - def set_thumbnail_photo(self, photo): + def set_thumbnail_photo(self, photo) -> None: + ''' + Raises TypeError if photo is not a Photo. + + Raises exceptions.DeletedObject if self.deleted. + ''' if photo is None: photo_id = None elif isinstance(photo, str): @@ -600,7 +642,10 @@ class Album(ObjectBase, GroupableMixin): self.photodb.sql_update(table='albums', pairs=pairs, where_key='id') self._thumbnail_photo = photo - def sum_bytes(self, recurse=True): + def sum_bytes(self, recurse=True) -> int: + ''' + Return the total number of bytes of all photos in this album. + ''' query = stringtools.collapse_whitespace(''' SELECT SUM(bytes) FROM photos WHERE photos.id IN ( @@ -618,7 +663,13 @@ class Album(ObjectBase, GroupableMixin): total = self.photodb.sql_select_one(query)[0] return total - def sum_children(self, recurse=True): + def sum_children(self, recurse=True) -> int: + ''' + Return the total number of child albums in this album. + + This method may be preferable to len(album.get_children()) because it + performs the counting in the database instead of creating Album objects. + ''' if recurse: walker = self.walk_children() # First yield is itself. @@ -634,11 +685,12 @@ class Album(ObjectBase, GroupableMixin): total = self.photodb.sql_select_one(query, bindings)[0] return total - def sum_photos(self, recurse=True): + def sum_photos(self, recurse=True) -> int: ''' - If all you need is the number of photos in the album, this method is - preferable to len(album.get_photos()) because it performs the counting - in the database instead of creating the Photo objects. + Return the total number of photos in this album. + + This method may be preferable to len(album.get_photos()) because it + performs the counting in the database instead of creating Photo objects. ''' query = stringtools.collapse_whitespace(''' SELECT COUNT(photoid) @@ -655,6 +707,7 @@ class Album(ObjectBase, GroupableMixin): total = self.photodb.sql_select_one(query)[0] return total + # Will add -> Photo when forward references are supported by Python. @property def thumbnail_photo(self): if self._thumbnail_photo is None: @@ -665,7 +718,7 @@ class Album(ObjectBase, GroupableMixin): self._thumbnail_photo = photo return photo - def walk_photos(self): + def walk_photos(self) -> typing.Iterable: yield from self.get_photos() children = self.walk_children() # The first yield is itself @@ -690,7 +743,10 @@ class Bookmark(ObjectBase): return f'Bookmark:{self.id}' @staticmethod - def normalize_title(title): + def normalize_title(title) -> str: + ''' + Raises TypeError if title is not a string or None. + ''' if title is None: return '' @@ -702,7 +758,12 @@ class Bookmark(ObjectBase): return title @staticmethod - def normalize_url(url): + def normalize_url(url) -> str: + ''' + Raises TypeError if url is not a string or None. + + Raises ValueError if url is invalid. + ''' if url is None: return '' @@ -721,13 +782,13 @@ class Bookmark(ObjectBase): @decorators.required_feature('bookmark.edit') @decorators.transaction - def delete(self): + def delete(self) -> None: self.photodb.sql_delete(table='bookmarks', pairs={'id': self.id}) self._uncache() self.deleted = True @property - def display_name(self): + def display_name(self) -> str: if self.title: return self.title else: @@ -735,7 +796,7 @@ class Bookmark(ObjectBase): @decorators.required_feature('bookmark.edit') @decorators.transaction - def edit(self, title=None, url=None): + def edit(self, title=None, url=None) -> None: ''' Change the title or URL. Leave None to keep current. ''' @@ -757,7 +818,7 @@ class Bookmark(ObjectBase): self.title = title self.url = url - def jsonify(self): + def jsonify(self) -> dict: j = { 'type': 'bookmark', 'id': self.id, @@ -816,7 +877,7 @@ class Photo(ObjectBase): def __str__(self): return f'Photo:{self.id}:{self.basename}' - def normalize_thumbnail(self, thumbnail): + def normalize_thumbnail(self, thumbnail) -> pathclass.Path: if thumbnail is None: return None @@ -827,10 +888,19 @@ class Photo(ObjectBase): return thumbnail @staticmethod - def normalize_override_filename(override_filename): + def normalize_override_filename(override_filename) -> typing.Optional[str]: + ''' + Raises TypeError if override_filename is not a string or None. + + Raises ValueError if override_filename does not contain any valid + characters remaining after invalid path chars are removed. + ''' if override_filename is None: return None + if not isinstance(override_filename, str): + raise TypeError(f'URL must be {str}, not {type(override_filename)}.') + cleaned = helpers.remove_path_badchars(override_filename) cleaned = cleaned.strip() if not cleaned: @@ -856,6 +926,7 @@ class Photo(ObjectBase): def _uncache(self): self.photodb.caches['photo'].remove(self.id) + # Will add -> Tag when forward references are supported by Python. @decorators.required_feature('photo.add_remove_tag') @decorators.transaction def add_tag(self, tag): @@ -894,25 +965,25 @@ class Photo(ObjectBase): return tag @property - def basename(self): + def basename(self) -> str: return self.override_filename or self.real_path.basename @property - def bitrate(self): + def bitrate(self) -> typing.Optional[float]: if self.duration and self.bytes is not None: return (self.bytes / 128) / self.duration else: return None @property - def bytes_string(self): + def bytes_string(self) -> str: if self.bytes is not None: return bytestring.bytestring(self.bytes) return '??? b' # Photo.add_tag already has @required_feature add_remove_tag @decorators.transaction - def copy_tags(self, other_photo): + def copy_tags(self, other_photo) -> None: ''' Take all of the tags owned by other_photo and apply them to this photo. ''' @@ -921,7 +992,7 @@ class Photo(ObjectBase): @decorators.required_feature('photo.edit') @decorators.transaction - def delete(self, *, delete_file=False): + def delete(self, *, delete_file=False) -> None: ''' Delete the Photo and its relation to any tags and albums. ''' @@ -953,14 +1024,14 @@ class Photo(ObjectBase): self.deleted = True @property - def duration_string(self): + def duration_string(self) -> typing.Optional[str]: if self.duration is None: return None return hms.seconds_to_hms(self.duration) @decorators.required_feature('photo.generate_thumbnail') @decorators.transaction - def generate_thumbnail(self, **special): + def generate_thumbnail(self, **special) -> pathclass.Path: ''' special: For videos, you can provide a `timestamp` to take the thumbnail at. @@ -1015,7 +1086,7 @@ class Photo(ObjectBase): self.__reinit__() return self.thumbnail - def get_containing_albums(self): + def get_containing_albums(self) -> set[Album]: ''' Return the albums of which this photo is a member. ''' @@ -1027,7 +1098,7 @@ class Photo(ObjectBase): albums = set(self.photodb.get_albums_by_id(album_ids)) return albums - def get_tags(self): + def get_tags(self) -> set: ''' Return the tags assigned to this Photo. ''' @@ -1039,13 +1110,15 @@ class Photo(ObjectBase): tags = set(self.photodb.get_tags_by_id(tag_ids)) return tags + # Will add -> Tag/False when forward references are supported. def has_tag(self, tag, *, check_children=True): ''' Return the Tag object if this photo contains that tag. Otherwise return False. check_children: - If True, children of the requested tag are accepted. + If True, children of the requested tag are accepted. That is, + a photo with family.parents can be said to have the 'family' tag. ''' tag = self.photodb.get_tag(name=tag) @@ -1066,7 +1139,7 @@ class Photo(ObjectBase): return tag_by_id[rel_row[0]] - def jsonify(self, include_albums=True, include_tags=True): + def jsonify(self, include_albums=True, include_tags=True) -> dict: j = { 'type': 'photo', 'id': self.id, @@ -1095,7 +1168,7 @@ class Photo(ObjectBase): return j - def make_thumbnail_filepath(self): + def make_thumbnail_filepath(self) -> pathclass.Path: ''' Create the filepath that should be the location of our thumbnail. ''' @@ -1110,7 +1183,7 @@ class Photo(ObjectBase): # Photo.rename_file already has @required_feature @decorators.transaction - def move_file(self, directory): + def move_file(self, directory) -> None: directory = pathclass.Path(directory) directory.assert_is_directory() new_path = directory.with_child(self.real_path.basename) @@ -1161,7 +1234,7 @@ class Photo(ObjectBase): @decorators.required_feature('photo.reload_metadata') @decorators.transaction - def reload_metadata(self, hash_kwargs=None): + def reload_metadata(self, hash_kwargs=None) -> None: ''' Load the file's height, width, etc as appropriate for this type of file. ''' @@ -1218,13 +1291,20 @@ class Photo(ObjectBase): @decorators.required_feature('photo.edit') @decorators.transaction - def relocate(self, new_filepath): + def relocate(self, new_filepath) -> None: ''' Point the Photo object to a different filepath. - DOES NOT MOVE THE FILE, only acknowledges a move that was performed - outside of the system. + This method DOES NOT MOVE THE FILE. It updates the database to reflect + a move that was performed outside of the system. + To rename or move the file, use `rename_file`. + + Raises FileNotFoundError if the supposed new_filepath is not actually + a file. + + Raises exceptions.PhotoExists if new_filepath is already associated + with another photo in the database. ''' new_filepath = pathclass.Path(new_filepath) if not new_filepath.is_file: @@ -1246,7 +1326,7 @@ class Photo(ObjectBase): @decorators.required_feature('photo.add_remove_tag') @decorators.transaction - def remove_tag(self, tag): + def remove_tag(self, tag) -> None: tag = self.photodb.get_tag(name=tag) self.photodb.log.info('Removing %s from %s.', tag, self) @@ -1264,7 +1344,7 @@ class Photo(ObjectBase): @decorators.required_feature('photo.add_remove_tag') @decorators.transaction - def remove_tags(self, tags): + def remove_tags(self, tags) -> None: tags = [self.photodb.get_tag(name=tag) for tag in tags] self.photodb.log.info('Removing %s from %s.', tags, self) @@ -1283,13 +1363,21 @@ class Photo(ObjectBase): @decorators.required_feature('photo.edit') @decorators.transaction - def rename_file(self, new_filename, *, move=False): + def rename_file(self, new_filename, *, move=False) -> None: ''' Rename the file on the disk as well as in the database. move: If True, allow the file to be moved into another directory. Otherwise, the rename must be local. + + Raises ValueError if new_filename includes a path to a directory that + is not the file's current directory, and move is False. + + Raises ValueError if new_filename is the same as the current path. + + Raises pathclass.Exists if new_filename leads to a file that already + exists. ''' old_path = self.real_path old_path.correct_case() @@ -1362,7 +1450,7 @@ class Photo(ObjectBase): @decorators.required_feature('photo.edit') @decorators.transaction - def set_override_filename(self, new_filename): + def set_override_filename(self, new_filename) -> None: new_filename = self.normalize_override_filename(new_filename) data = { @@ -1376,7 +1464,7 @@ class Photo(ObjectBase): @decorators.required_feature('photo.edit') @decorators.transaction - def set_searchhidden(self, searchhidden): + def set_searchhidden(self, searchhidden) -> None: data = { 'id': self.id, 'searchhidden': bool(searchhidden), @@ -1417,7 +1505,10 @@ class Tag(ObjectBase, GroupableMixin): return f'Tag:{self.name}' @staticmethod - def normalize_description(description): + def normalize_description(description) -> str: + ''' + Raises TypeError if description is not a string or None. + ''' if description is None: return '' @@ -1429,7 +1520,13 @@ class Tag(ObjectBase, GroupableMixin): return description @staticmethod - def normalize_name(name, min_length=None, max_length=None): + def normalize_name(name, min_length=None, max_length=None) -> str: + ''' + Raises exceptions.TagTooShort if shorter than min_length. + + Raises exceptions.TagTooLong if longer than max_length after invalid + characters are removed. + ''' original_name = name # if valid_chars is None: # valid_chars = constants.DEFAULT_CONFIGURATION['tag']['valid_chars'] @@ -1455,8 +1552,8 @@ class Tag(ObjectBase, GroupableMixin): def _uncache(self): self.photodb.caches['tag'].remove(self.id) - def __add_child(self, member): - ret = super().add_child(member) + def _add_child(self, member): + ret = super()._add_child(member) if ret is BAIL: return BAIL @@ -1480,6 +1577,11 @@ class Tag(ObjectBase, GroupableMixin): @decorators.required_feature('tag.edit') @decorators.transaction def add_child(self, member): + ''' + Raises exceptions.CantGroupSelf if member is self. + + Raises exceptions.RecursiveGrouping if member is an ancestor of self. + ''' ret = super().add_child(member) if ret is BAIL: return BAIL @@ -1499,7 +1601,14 @@ class Tag(ObjectBase, GroupableMixin): @decorators.required_feature('tag.edit') @decorators.transaction - def add_synonym(self, synname): + def add_synonym(self, synname) -> str: + ''' + Raises any exceptions from photodb.normalize_tagname. + + Raises exceptions.CantSynonymSelf if synname is the tag's name. + + Raises exceptions.TagExists if synname resolves to an existing tag. + ''' synname = self.photodb.normalize_tagname(synname) if synname == self.name: @@ -1524,7 +1633,7 @@ class Tag(ObjectBase, GroupableMixin): @decorators.required_feature('tag.edit') @decorators.transaction - def convert_to_synonym(self, mastertag): + def convert_to_synonym(self, mastertag) -> None: ''' Convert this tag into a synonym for a different tag. All photos which possess the current tag will have it replaced with the @@ -1587,7 +1696,7 @@ class Tag(ObjectBase, GroupableMixin): @decorators.required_feature('tag.edit') @decorators.transaction - def delete(self, *, delete_children=False): + def delete(self, *, delete_children=False) -> None: self.photodb.log.info('Deleting %s.', self) super().delete(delete_children=delete_children) self.photodb.sql_delete(table='photo_tag_rel', pairs={'tagid': self.id}) @@ -1599,7 +1708,7 @@ class Tag(ObjectBase, GroupableMixin): @decorators.required_feature('tag.edit') @decorators.transaction - def edit(self, description=None): + def edit(self, description=None) -> None: ''' Change the description. Leave None to keep current value. ''' @@ -1617,7 +1726,7 @@ class Tag(ObjectBase, GroupableMixin): self._uncache() - def get_synonyms(self): + def get_synonyms(self) -> set[str]: if self._cached_synonyms is not None: return self._cached_synonyms @@ -1629,7 +1738,7 @@ class Tag(ObjectBase, GroupableMixin): self._cached_synonyms = synonyms return synonyms - def jsonify(self, include_synonyms=False, minimal=False): + def jsonify(self, include_synonyms=False, minimal=False) -> dict: j = { 'type': 'tag', 'id': self.id, @@ -1653,6 +1762,7 @@ class Tag(ObjectBase, GroupableMixin): ret = super().remove_child(*args, **kwargs) if ret is BAIL: return + self.photodb.caches['tag_exports'].clear() return ret @@ -1662,16 +1772,23 @@ class Tag(ObjectBase, GroupableMixin): ret = super().remove_children(*args, **kwargs) if ret is BAIL: return + self.photodb.caches['tag_exports'].clear() return ret @decorators.required_feature('tag.edit') @decorators.transaction - def remove_synonym(self, synname): + def remove_synonym(self, synname) -> str: ''' Delete a synonym. + This will have no effect on photos or other synonyms because they always resolve to the master tag before application. + + Raises any exceptions from photodb.normalize_tagname. + + Raises exceptions.NoSuchSynonym if that synname does not exist or is + not a synonym of this tag. ''' synname = self.photodb.normalize_tagname(synname) if synname == self.name: @@ -1693,12 +1810,18 @@ class Tag(ObjectBase, GroupableMixin): @decorators.required_feature('tag.edit') @decorators.transaction - def rename(self, new_name, *, apply_to_synonyms=True): + def rename(self, new_name, *, apply_to_synonyms=True) -> None: ''' Rename the tag. Does not affect its relation to Photos or tag groups. + + Raises any exceptions from photodb.normalize_tagname. + + Raises exceptions.TagExists if new_name is already an existing + tag or synonym. ''' new_name = self.photodb.normalize_tagname(new_name) old_name = self.name + if new_name == old_name: return @@ -1750,7 +1873,12 @@ class User(ObjectBase): return f'User:{self.username}' @staticmethod - def normalize_display_name(display_name, max_length=None): + def normalize_display_name(display_name, max_length=None) -> typing.Optional[str]: + ''' + Raises TypeError if display_name is not a string or None. + + Raises exceptions.DisplayNameTooLong if longer than max_length. + ''' if display_name is None: return None @@ -1772,13 +1900,13 @@ class User(ObjectBase): @decorators.required_feature('user.edit') @decorators.transaction - def delete(self, *, disown_authored_things): + def delete(self, *, disown_authored_things) -> None: ''' - If disown_authored_things is True, then all of this user's albums, + If disown_authored_things is True then all of this user's albums, bookmarks, photos, and tags will have their author_id set to None. - If disown_authored_things is False, and the user has any belongings, - exceptions.CantDeleteUser is raised. + If disown_authored_things is False and the user has any belongings, + raises exceptions.CantDeleteUser. You should delete those objects first. Since each object type has different options while deleting, that functionality is not provided @@ -1804,13 +1932,16 @@ class User(ObjectBase): self.deleted = True @property - def display_name(self): + def display_name(self) -> str: if self._display_name is None: return self.username else: return self._display_name - def get_albums(self, *, direction='asc'): + def get_albums(self, *, direction='asc') -> typing.Iterable[Album]: + ''' + Raises ValueError if direction is not asc or desc. + ''' if direction.lower() not in {'asc', 'desc'}: raise ValueError(direction) @@ -1819,7 +1950,10 @@ class User(ObjectBase): [self.id] ) - def get_bookmarks(self, *, direction='asc'): + def get_bookmarks(self, *, direction='asc') -> typing.Iterable[Bookmark]: + ''' + Raises ValueError if direction is not asc or desc. + ''' if direction.lower() not in {'asc', 'desc'}: raise ValueError(direction) @@ -1828,7 +1962,10 @@ class User(ObjectBase): [self.id] ) - def get_photos(self, *, direction='asc'): + def get_photos(self, *, direction='asc') -> typing.Iterable[Photo]: + ''' + Raises ValueError if direction is not asc or desc. + ''' if direction.lower() not in {'asc', 'desc'}: raise ValueError(direction) @@ -1837,7 +1974,10 @@ class User(ObjectBase): [self.id] ) - def get_tags(self, *, direction='asc'): + def get_tags(self, *, direction='asc') -> typing.Iterable[Tag]: + ''' + Raises ValueError if direction is not asc or desc. + ''' if direction.lower() not in {'asc', 'desc'}: raise ValueError(direction) @@ -1846,27 +1986,27 @@ class User(ObjectBase): [self.id] ) - def has_any_albums(self): + def has_any_albums(self) -> bool: query = f'SELECT 1 FROM albums WHERE author_id == ? LIMIT 1' row = self.photodb.sql_select_one(query, [self.id]) return row is not None - def has_any_bookmarks(self): + def has_any_bookmarks(self) -> bool: query = f'SELECT 1 FROM bookmarks WHERE author_id == ? LIMIT 1' row = self.photodb.sql_select_one(query, [self.id]) return row is not None - def has_any_photos(self): + def has_any_photos(self) -> bool: query = f'SELECT 1 FROM photos WHERE author_id == ? LIMIT 1' row = self.photodb.sql_select_one(query, [self.id]) return row is not None - def has_any_tags(self): + def has_any_tags(self) -> bool: query = f'SELECT 1 FROM tags WHERE author_id == ? LIMIT 1' row = self.photodb.sql_select_one(query, [self.id]) return row is not None - def jsonify(self): + def jsonify(self) -> dict: j = { 'type': 'user', 'id': self.id, @@ -1878,7 +2018,7 @@ class User(ObjectBase): @decorators.required_feature('user.edit') @decorators.transaction - def set_display_name(self, display_name): + def set_display_name(self, display_name) -> None: display_name = self.normalize_display_name( display_name, max_length=self.photodb.config['user']['max_display_name_length'], @@ -1893,7 +2033,7 @@ class User(ObjectBase): @decorators.required_feature('user.edit') @decorators.transaction - def set_password(self, password): + def set_password(self, password) -> None: if not isinstance(password, bytes): password = password.encode('utf-8') @@ -1911,5 +2051,5 @@ class WarningBag: def __init__(self): self.warnings = set() - def add(self, warning): + def add(self, warning) -> None: self.warnings.add(warning) diff --git a/etiquette/photodb.py b/etiquette/photodb.py index fd83479..5c645cf 100644 --- a/etiquette/photodb.py +++ b/etiquette/photodb.py @@ -8,6 +8,7 @@ import sqlite3 import tempfile import time import types +import typing from voussoirkit import cacheclass from voussoirkit import configlayers @@ -34,19 +35,19 @@ class PDBAlbumMixin: def __init__(self): super().__init__() - def get_album(self, id): + def get_album(self, id) -> objects.Album: return self.get_thing_by_id('album', id) - def get_album_count(self): + def get_album_count(self) -> int: return self.sql_select_one('SELECT COUNT(id) FROM albums')[0] - def get_albums(self): + def get_albums(self) -> typing.Iterable[objects.Album]: return self.get_things(thing_type='album') - def get_albums_by_id(self, ids): + def get_albums_by_id(self, ids) -> typing.Iterable[objects.Album]: return self.get_things_by_id('album', ids) - def get_albums_by_path(self, directory): + def get_albums_by_path(self, directory) -> typing.Iterable[objects.Album]: ''' Yield Albums with the `associated_directory` of this value, NOT case-sensitive. @@ -58,10 +59,10 @@ class PDBAlbumMixin: album_ids = (album_id for (album_id,) in album_rows) return self.get_albums_by_id(album_ids) - def get_albums_by_sql(self, query, bindings=None): + def get_albums_by_sql(self, query, bindings=None) -> typing.Iterable[objects.Album]: return self.get_things_by_sql('album', query, bindings) - def get_albums_within_directory(self, directory): + def get_albums_within_directory(self, directory) -> typing.Iterable[objects.Album]: # This function is something of a stopgap measure since `search` only # searches for photos and then yields their containing albums. Thus it # is not possible for search to find albums that contain no photos. @@ -78,7 +79,7 @@ class PDBAlbumMixin: albums = self.get_albums_by_id(album_ids) return albums - def get_root_albums(self): + def get_root_albums(self) -> typing.Iterable[objects.Album]: ''' Yield Albums that have no parent. ''' @@ -94,7 +95,7 @@ class PDBAlbumMixin: associated_directories=None, author=None, photos=None, - ): + ) -> objects.Album: ''' Create a new album. Photos can be added now or later. ''' @@ -131,7 +132,7 @@ class PDBAlbumMixin: return album @decorators.transaction - def purge_deleted_associated_directories(self, albums=None): + def purge_deleted_associated_directories(self, albums=None) -> typing.Iterable[pathclass.Path]: directories = self.sql_select('SELECT DISTINCT directory FROM album_associated_directories') directories = (pathclass.Path(directory) for (directory,) in directories) directories = [directory for directory in directories if not directory.is_dir] @@ -148,7 +149,7 @@ class PDBAlbumMixin: yield from directories @decorators.transaction - def purge_empty_albums(self, albums=None): + def purge_empty_albums(self, albums=None) -> typing.Iterable[objects.Album]: if albums is None: to_check = set(self.get_albums()) else: @@ -171,24 +172,24 @@ class PDBBookmarkMixin: def __init__(self): super().__init__() - def get_bookmark(self, id): + def get_bookmark(self, id) -> objects.Bookmark: return self.get_thing_by_id('bookmark', id) - def get_bookmark_count(self): + def get_bookmark_count(self) -> int: return self.sql_select_one('SELECT COUNT(id) FROM bookmarks')[0] - def get_bookmarks(self): + def get_bookmarks(self) -> typing.Iterable[objects.Bookmark]: return self.get_things(thing_type='bookmark') - def get_bookmarks_by_id(self, ids): + def get_bookmarks_by_id(self, ids) -> typing.Iterable[objects.Bookmark]: return self.get_things_by_id('bookmark', ids) - def get_bookmarks_by_sql(self, query, bindings=None): + def get_bookmarks_by_sql(self, query, bindings=None) -> typing.Iterable[objects.Bookmark]: return self.get_things_by_sql('bookmark', query, bindings) @decorators.required_feature('bookmark.new') @decorators.transaction - def new_bookmark(self, url, title=None, *, author=None): + def new_bookmark(self, url, title=None, *, author=None) -> objects.Bookmark: # These might raise exceptions. title = objects.Bookmark.normalize_title(title) url = objects.Bookmark.normalize_url(url) @@ -245,7 +246,7 @@ class PDBCacheManagerMixin: def __init__(self): super().__init__() - def clear_all_caches(self): + def clear_all_caches(self) -> None: self.caches['album'].clear() self.caches['bookmark'].clear() self.caches['photo'].clear() @@ -430,7 +431,7 @@ class PDBPhotoMixin: def __init__(self): super().__init__() - def assert_no_such_photo_by_path(self, filepath): + def assert_no_such_photo_by_path(self, filepath) -> None: try: existing = self.get_photo_by_path(filepath) except exceptions.NoSuchPhoto: @@ -438,10 +439,10 @@ class PDBPhotoMixin: else: raise exceptions.PhotoExists(existing) - def get_photo(self, id): + def get_photo(self, id) -> objects.Photo: return self.get_thing_by_id('photo', id) - def get_photo_by_path(self, filepath): + def get_photo_by_path(self, filepath) -> objects.Photo: filepath = pathclass.Path(filepath) query = 'SELECT * FROM photos WHERE filepath == ?' bindings = [filepath.absolute_path] @@ -451,13 +452,13 @@ class PDBPhotoMixin: photo = self.get_cached_instance('photo', photo_row) return photo - def get_photo_count(self): + def get_photo_count(self) -> int: return self.sql_select_one('SELECT COUNT(id) FROM photos')[0] - def get_photos_by_id(self, ids): + def get_photos_by_id(self, ids) -> typing.Iterable[objects.Photo]: return self.get_things_by_id('photo', ids) - def get_photos_by_recent(self, count=None): + def get_photos_by_recent(self, count=None) -> typing.Iterable[objects.Photo]: ''' Yield photo objects in order of creation time. ''' @@ -476,7 +477,7 @@ class PDBPhotoMixin: if count <= 0: break - def get_photos_by_hash(self, sha256): + def get_photos_by_hash(self, sha256) -> typing.Iterable[objects.Photo]: if not isinstance(sha256, str) or len(sha256) != 64: raise TypeError(f'sha256 shoulbe the 64-character hexdigest string.') @@ -484,7 +485,7 @@ class PDBPhotoMixin: bindings = [sha256] yield from self.get_photos_by_sql(query, bindings) - def get_photos_by_sql(self, query, bindings=None): + def get_photos_by_sql(self, query, bindings=None) -> typing.Iterable[objects.Photo]: return self.get_things_by_sql('photo', query, bindings) @decorators.required_feature('photo.new') @@ -500,7 +501,7 @@ class PDBPhotoMixin: known_hash=None, searchhidden=False, tags=None, - ): + ) -> objects.Photo: ''' Given a filepath, determine its attributes and create a new Photo object in the database. Tags may be applied now or later. @@ -574,7 +575,7 @@ class PDBPhotoMixin: return photo @decorators.transaction - def purge_deleted_files(self, photos=None): + def purge_deleted_files(self, photos=None) -> typing.Iterable[objects.Photo]: ''' Delete Photos whose corresponding file on disk is missing. @@ -1020,13 +1021,13 @@ class PDBSQLMixin: self.savepoints = [] self._cached_sql_tables = None - def assert_table_exists(self, table): + def assert_table_exists(self, table) -> None: if not self._cached_sql_tables: self._cached_sql_tables = self.get_sql_tables() if table not in self._cached_sql_tables: raise exceptions.BadTable(table) - def commit(self, message=None): + def commit(self, message=None) -> None: if message is not None: self.log.debug('Committing - %s.', message) @@ -1048,13 +1049,13 @@ class PDBSQLMixin: self.savepoints.clear() self.sql.commit() - def get_sql_tables(self): + def get_sql_tables(self) -> set[str]: query = 'SELECT name FROM sqlite_master WHERE type = "table"' table_rows = self.sql_select(query) tables = set(name for (name,) in table_rows) return tables - def release_savepoint(self, savepoint, allow_commit=False): + def release_savepoint(self, savepoint, allow_commit=False) -> None: ''' Releasing a savepoint removes that savepoint from the timeline, so that you can no longer roll back to it. Then your choices are to commit @@ -1078,7 +1079,7 @@ class PDBSQLMixin: self.sql_execute(f'RELEASE "{savepoint}"') self.savepoints = helpers.slice_before(self.savepoints, savepoint) - def rollback(self, savepoint=None): + def rollback(self, savepoint=None) -> None: ''' Given a savepoint, roll the database back to the moment before that savepoint was created. Keep in mind that a @transaction savepoint is @@ -1117,7 +1118,7 @@ class PDBSQLMixin: self.savepoints.clear() self.on_commit_queue.clear() - def savepoint(self, message=None): + def savepoint(self, message=None) -> str: savepoint_id = passwordy.random_hex(length=16) if message: self.log.log(5, 'Savepoint %s for %s.', savepoint_id, message) @@ -1130,13 +1131,13 @@ class PDBSQLMixin: self.on_rollback_queue.append(savepoint_id) return savepoint_id - def sql_delete(self, table, pairs): + def sql_delete(self, table, pairs) -> None: self.assert_table_exists(table) (qmarks, bindings) = sqlhelpers.delete_filler(pairs) query = f'DELETE FROM {table} {qmarks}' self.sql_execute(query, bindings) - def sql_execute(self, query, bindings=[]): + def sql_execute(self, query, bindings=[]) -> sqlite3.Cursor: if bindings is None: bindings = [] cur = self.sql.cursor() @@ -1144,7 +1145,7 @@ class PDBSQLMixin: cur.execute(query, bindings) return cur - def sql_executescript(self, script): + def sql_executescript(self, script) -> None: ''' The problem with Python's default executescript is that it executes a COMMIT before running your script. If I wanted a commit I'd write one! @@ -1157,7 +1158,7 @@ class PDBSQLMixin: self.log.loud(line) cur.execute(line) - def sql_insert(self, table, data): + def sql_insert(self, table, data) -> None: self.assert_table_exists(table) column_names = constants.SQL_COLUMNS[table] (qmarks, bindings) = sqlhelpers.insert_filler(column_names, data) @@ -1165,7 +1166,7 @@ class PDBSQLMixin: query = f'INSERT INTO {table} VALUES({qmarks})' self.sql_execute(query, bindings) - def sql_select(self, query, bindings=None): + def sql_select(self, query, bindings=None) -> typing.Iterable: cur = self.sql_execute(query, bindings) while True: fetch = cur.fetchone() @@ -1177,7 +1178,7 @@ class PDBSQLMixin: cur = self.sql_execute(query, bindings) return cur.fetchone() - def sql_update(self, table, pairs, where_key): + def sql_update(self, table, pairs, where_key) -> None: self.assert_table_exists(table) (qmarks, bindings) = sqlhelpers.update_filler(pairs, where_key=where_key) query = f'UPDATE {table} {qmarks}' @@ -1189,7 +1190,7 @@ class PDBTagMixin: def __init__(self): super().__init__() - def assert_no_such_tag(self, name): + def assert_no_such_tag(self, name) -> None: try: existing_tag = self.get_tag_by_name(name) except exceptions.NoSuchTag: @@ -1203,7 +1204,7 @@ class PDBTagMixin: names = set(name for (name,) in tag_rows) return names - def get_all_tag_names(self): + def get_all_tag_names(self) -> set[str]: ''' Return a set containing the names of all tags as strings. Useful for when you don't want the overhead of actual Tag objects. @@ -1216,19 +1217,19 @@ class PDBTagMixin: synonyms = {syn: tag for (syn, tag) in syn_rows} return synonyms - def get_all_synonyms(self): + def get_all_synonyms(self) -> dict: ''' Return a dict mapping {synonym: mastertag} as strings. ''' return self.get_cached_tag_export(self._get_all_synonyms) - def get_root_tags(self): + def get_root_tags(self) -> typing.Iterable[objects.Tag]: ''' Yield Tags that have no parent. ''' return self.get_root_things('tag') - def get_tag(self, name=None, id=None): + def get_tag(self, name=None, id=None) -> objects.Tag: ''' Redirect to get_tag_by_id or get_tag_by_name. ''' @@ -1240,10 +1241,10 @@ class PDBTagMixin: else: return self.get_tag_by_name(name) - def get_tag_by_id(self, id): + def get_tag_by_id(self, id) -> objects.Tag: return self.get_thing_by_id('tag', thing_id=id) - def get_tag_by_name(self, tagname): + def get_tag_by_name(self, tagname) -> objects.Tag: if isinstance(tagname, objects.Tag): if tagname.photodb == self: return tagname @@ -1277,24 +1278,24 @@ class PDBTagMixin: tag = self.get_cached_instance('tag', tag_row) return tag - def get_tag_count(self): + def get_tag_count(self) -> int: return self.sql_select_one('SELECT COUNT(id) FROM tags')[0] - def get_tags(self): + def get_tags(self) -> typing.Iterable[objects.Tag]: ''' Yield all Tags in the database. ''' return self.get_things(thing_type='tag') - def get_tags_by_id(self, ids): + def get_tags_by_id(self, ids) -> typing.Iterable[objects.Tag]: return self.get_things_by_id('tag', ids) - def get_tags_by_sql(self, query, bindings=None): + def get_tags_by_sql(self, query, bindings=None) -> typing.Iterable[objects.Tag]: return self.get_things_by_sql('tag', query, bindings) @decorators.required_feature('tag.new') @decorators.transaction - def new_tag(self, tagname, description=None, *, author=None): + def new_tag(self, tagname, description=None, *, author=None) -> objects.Tag: ''' Register a new tag and return the Tag object. ''' @@ -1324,7 +1325,7 @@ class PDBTagMixin: return tag - def normalize_tagname(self, tagname): + def normalize_tagname(self, tagname) -> str: tagname = objects.Tag.normalize_name( tagname, # valid_chars=self.config['tag']['valid_chars'], @@ -1339,7 +1340,7 @@ class PDBUserMixin: def __init__(self): super().__init__() - def assert_no_such_user(self, username): + def assert_no_such_user(self, username) -> None: try: existing_user = self.get_user(username=username) except exceptions.NoSuchUser: @@ -1347,14 +1348,14 @@ class PDBUserMixin: else: raise exceptions.UserExists(existing_user) - def assert_valid_password(self, password): + def assert_valid_password(self, password) -> None: if not isinstance(password, bytes): raise TypeError(f'Password must be {bytes}, not {type(password)}.') if len(password) < self.config['user']['min_password_length']: raise exceptions.PasswordTooShort(min_length=self.config['user']['min_password_length']) - def assert_valid_username(self, username): + def assert_valid_username(self, username) -> None: if not isinstance(username, str): raise TypeError(f'Username must be {str}, not {type(username)}.') @@ -1374,7 +1375,7 @@ class PDBUserMixin: if badchars: raise exceptions.InvalidUsernameChars(username=username, badchars=badchars) - def generate_user_id(self): + def generate_user_id(self) -> str: ''' User IDs are randomized instead of integers like the other objects, so they get their own method. @@ -1391,7 +1392,7 @@ class PDBUserMixin: return user_id - def get_user(self, username=None, id=None): + def get_user(self, username=None, id=None) -> objects.User: ''' Redirect to get_user_by_id or get_user_by_username. ''' @@ -1403,10 +1404,10 @@ class PDBUserMixin: else: return self.get_user_by_username(username) - def get_user_by_id(self, id): + def get_user_by_id(self, id) -> objects.User: return self.get_thing_by_id('user', id) - def get_user_by_username(self, username): + def get_user_by_username(self, username) -> objects.User: user_row = self.sql_select_one('SELECT * FROM users WHERE username == ?', [username]) if user_row is None: @@ -1414,10 +1415,10 @@ class PDBUserMixin: return self.get_cached_instance('user', user_row) - def get_user_count(self): + def get_user_count(self) -> int: return self.sql_select_one('SELECT COUNT(id) FROM users')[0] - def get_user_id_or_none(self, user_obj_or_id): + def get_user_id_or_none(self, user_obj_or_id) -> typing.Optional[str]: ''' For methods that create photos, albums, etc., we sometimes associate them with an author but sometimes not. The callers of those methods @@ -1446,17 +1447,17 @@ class PDBUserMixin: return author_id - def get_users(self): + def get_users(self) -> typing.Iterable[objects.User]: return self.get_things('user') - def get_users_by_id(self, ids): + def get_users_by_id(self, ids) -> typing.Iterable[objects.User]: return self.get_things_by_id('user', ids) - def get_users_by_sql(self, query, bindings=None): + def get_users_by_sql(self, query, bindings=None) -> typing.Iterable[objects.User]: return self.get_things_by_sql('user', query, bindings) @decorators.required_feature('user.login') - def login(self, username=None, id=None, *, password): + def login(self, username=None, id=None, *, password) -> objects.User: ''' Return the User object for the user if the credentials are correct. ''' @@ -1476,7 +1477,7 @@ class PDBUserMixin: @decorators.required_feature('user.new') @decorators.transaction - def new_user(self, username, password, *, display_name=None): + def new_user(self, username, password, *, display_name=None) -> objects.User: # These might raise exceptions. self.assert_valid_username(username) self.assert_no_such_user(username=username) @@ -1953,6 +1954,7 @@ class PhotoDB( self.sql_executescript(constants.DB_PRAGMAS) self.sql.commit() + # Will add -> PhotoDB when forward references are supported @classmethod def closest_photodb(cls, path, *args, **kwargs): ''' @@ -1989,7 +1991,7 @@ class PhotoDB( else: return f'PhotoDB(data_directory={self.data_directory})' - def close(self): + def close(self) -> None: # Wrapped in hasattr because if the object fails __init__, Python will # still call __del__ and thus close(), even though the attributes # we're trying to clean up never got set. @@ -1999,7 +2001,7 @@ class PhotoDB( if getattr(self, 'ephemeral', False): self.ephemeral_directory.cleanup() - def generate_id(self, table): + def generate_id(self, table) -> str: ''' Create a new ID number that is unique to the given table. Note that while this method may INSERT / UPDATE, it does not commit. @@ -2032,7 +2034,7 @@ class PhotoDB( self.sql_update(table='id_numbers', pairs=pairs, where_key='tab') return new_id - def load_config(self): + def load_config(self) -> None: (config, needs_rewrite) = configlayers.load_file( filepath=self.config_filepath, defaults=constants.DEFAULT_CONFIGURATION, @@ -2042,6 +2044,6 @@ class PhotoDB( if needs_rewrite: self.save_config() - def save_config(self): + def save_config(self) -> None: with self.config_filepath.open('w', encoding='utf-8') as handle: handle.write(json.dumps(self.config, indent=4, sort_keys=True))