import argparse import os import re import sys from voussoirkit import betterhelp from voussoirkit import interactive from voussoirkit import pathclass from voussoirkit import spinal from voussoirkit import stringtools from voussoirkit import vlogging import etiquette LOG_LEVEL = vlogging.NOTSET class CantFindPhotoDB(Exception): pass photodbs = {} def find_photodb(): path = pathclass.cwd() while True: try: return photodbs[path] except KeyError: pass if path.with_child('_etiquette').is_dir: break if path == path.parent: raise CantFindPhotoDB() path = path.parent photodb = etiquette.photodb.PhotoDB( path.with_child('_etiquette'), create=False, log_level=LOG_LEVEL, ) photodbs[path] = photodb return photodb # HELPERS ########################################################################################## def export_symlinks_albums(albums, destination, dry_run): album_directory_names = etiquette.helpers.decollide_names(albums, lambda a: a.display_name) for (album, directory_name) in album_directory_names.items(): associated_directories = album.get_associated_directories() if len(associated_directories) == 1: album_dir = associated_directories.pop() directory_name = etiquette.helpers.remove_path_badchars(directory_name) symlink_dir = destination.with_child(directory_name) if dry_run: yield symlink_dir continue if not album_dir.exists: continue if symlink_dir.exists: yield symlink_dir continue print(album, symlink_dir) os.symlink(src=album_dir.absolute_path, dst=symlink_dir.absolute_path) yield symlink_dir def export_symlinks_photos(photos, destination, dry_run): photo_filenames = etiquette.helpers.decollide_names(photos, lambda p: p.basename) for (photo, filename) in photo_filenames.items(): symlink_path = destination.with_child(filename) if dry_run: yield symlink_path continue if not photo.real_path.exists: continue if symlink_path.exists: yield symlink_path continue print(symlink_path.absolute_path) os.symlink(src=photo.real_path.absolute_path, dst=symlink_path.absolute_path) yield symlink_path def get_photos_by_glob(pattern): photodb = find_photodb() pattern = pathclass.normalize_sep(pattern) if pattern == '**': return search_in_cwd(yield_photos=True, yield_albums=False) cwd = pathclass.cwd() (folder, pattern) = os.path.split(pattern) if folder: folder = cwd.join(folder) else: folder = cwd files = [f for f in folder.glob(pattern) if f.is_file] for file in files: try: photo = photodb.get_photo_by_path(file) yield photo except etiquette.exceptions.NoSuchPhoto: pass def get_photos_by_globs(patterns): for pattern in patterns: yield from get_photos_by_glob(pattern) def get_photos_from_args(args): photodb = find_photodb() photos = [] if args.photo_id_args: photos.extend(photodb.get_photos_by_id(args.photo_id_args)) if args.photo_search_args: photos.extend(search_by_argparse(args.photo_search_args, yield_photos=True)) return photos def get_albums_from_args(args): photodb = find_photodb() albums = [] if args.album_id_args: albums.extend(photodb.get_albums_by_id(args.album_id_args)) if args.album_search_args: albums.extend(search_by_argparse(args.album_search_args, yield_albums=True)) return albums def search_in_cwd(**kwargs): photodb = find_photodb() cwd = pathclass.cwd() return photodb.search( within_directory=cwd, **kwargs, ) def search_by_argparse(args, yield_albums=False, yield_photos=False): return search_in_cwd( area=args.area, width=args.width, height=args.height, ratio=args.ratio, bytes=args.bytes, duration=args.duration, author=args.author, created=args.created, extension=args.extension, extension_not=args.extension_not, filename=args.filename, has_tags=args.has_tags, has_thumbnail=args.has_thumbnail, is_searchhidden=args.is_searchhidden, mimetype=args.mimetype, tag_musts=args.tag_musts, tag_mays=args.tag_mays, tag_forbids=args.tag_forbids, tag_expression=args.tag_expression, limit=args.limit, offset=args.offset, orderby=args.orderby, yield_albums=yield_albums, yield_photos=yield_photos, ) #################################################################################################### def add_remove_tag_argparse(args, action): photodb = find_photodb() tag = photodb.get_tag(name=args.tag_name) if args.any_id_args: photos = get_photos_from_args(args) elif args.globs: photos = get_photos_by_globs(args.globs) else: photos = search_in_cwd(yield_photos=True, yield_albums=False) for photo in photos: if action == 'add': photo.add_tag(tag) elif action == 'remove': photo.remove_tag(tag) if args.autoyes or interactive.getpermission('Commit?'): photodb.commit() def digest_directory_argparse(args): directory = pathclass.Path(args.directory) photodb = find_photodb() digest = photodb.digest_directory( directory, make_albums=args.make_albums, recurse=args.recurse, new_photo_ratelimit=args.ratelimit, yield_albums=True, yield_photos=True, ) for result in digest: print(result) if args.autoyes or interactive.getpermission('Commit?'): photodb.commit() def easybake_argparse(args): photodb = find_photodb() for eb_string in args.eb_strings: notes = photodb.easybake(eb_string) if args.autoyes or interactive.getpermission('Commit?'): photodb.commit() def export_symlinks_argparse(args): photodb = find_photodb() destination = pathclass.Path(args.destination) destination.makedirs(exist_ok=True) total_paths = set() albums = [] if args.album_id_args: albums.extend(photodb.get_albums_by_id(args.album_id_args)) if args.album_search_args: albums.extend(search_by_argparse(args.album_search_args, yield_albums=True)) export = export_symlinks_albums( albums, destination, dry_run=args.dry_run, ) total_paths.update(export) photos = [] if args.photo_id_args: photos.extend(photodb.get_photos_by_id(args.photo_id_args)) if args.photo_search_args: photos.extend(search_by_argparse(args.photo_search_args, yield_photos=True)) export = export_symlinks_photos( photos, destination, dry_run=args.dry_run, ) total_paths.update(export) if args.prune and not args.dry_run: symlinks = set(file for file in spinal.walk_generator(destination) if file.is_link) symlinks = symlinks.difference(total_paths) for old_symlink in symlinks: print(f'Pruning {old_symlink}.') os.remove(old_symlink.absolute_path) if not old_symlink.parent.listdir(): os.rmdir(old_symlink.parent.absolute_path) checkdirs = set(spinal.walk_generator(destination, yield_directories=True, yield_files=False)) while checkdirs: check = checkdirs.pop() if check not in destination: continue if len(check.listdir()) == 0: os.rmdir(check.absolute_path) checkdirs.add(check.parent) def init_argparse(args): photodb = etiquette.photodb.PhotoDB(create=True) photodb.commit() def purge_deleted_files_argparse(args): photodb = find_photodb() if args.photo_id_args or args.photo_search_args: photos = get_photos_from_args(args) else: photos = search_in_cwd(yield_photos=True, yield_albums=False) for deleted in photodb.purge_deleted_files(photos): print(deleted) if args.autoyes or interactive.getpermission('Commit?'): photodb.commit() def purge_empty_albums_argparse(args): photodb = find_photodb() # We do not check args.album_search_args because currently it is not # possible for search results to find empty albums on account of the fact # that albums are only yielded when they contain some result photo. if args.album_id_args: albums = get_albums_from_args(args) else: albums = photodb.get_albums_within_directory(pathclass.cwd()) for deleted in photodb.purge_empty_albums(albums): print(deleted) if args.autoyes or interactive.getpermission('Commit?'): photodb.commit() def relocate_argparse(args): photodb = find_photodb() photo = photodb.get_photo(args.photo_id) photo.relocate(args.filepath) if args.autoyes or interactive.getpermission('Commit?'): photodb.commit() def search_argparse(args): photos = search_by_argparse(args, yield_photos=True) for photo in photos: print(photo.real_path.absolute_path) def show_associated_directories_argparse(args): if args.album_id_args or args.album_search_args: albums = get_albums_from_args(args) else: albums = search_in_cwd(yield_photos=False, yield_albums=True) for album in albums: directories = album.get_associated_directories() if not directories: continue directories = [f'"{d.absolute_path}"' for d in directories] directories = ' '.join(directories) print(f'{album} | {directories}') def set_unset_searchhidden_argparse(args, searchhidden): photodb = find_photodb() if args.photo_search_args: args.photo_search_args.is_searchhidden = not searchhidden if args.album_search_args: args.album_search_args.is_searchhidden = not searchhidden if args.any_id_args: photos = get_photos_from_args(args) albums = get_albums_from_args(args) photos.extend(photo for album in albums for photo in album.walk_photos()) else: photos = search_in_cwd(yield_photos=True, yield_albums=False) for photo in photos: print(photo) photo.set_searchhidden(searchhidden) if args.autoyes or interactive.getpermission('Commit?'): photodb.commit() def tag_breplace_argparse(args): photodb = find_photodb() renames = [] tag_names = photodb.get_all_tag_names() all_names = tag_names.union(photodb.get_all_synonyms()) for tag_name in tag_names: if args.regex: new_name = re.sub(args.replace_from, args.replace_to, tag_name) else: new_name = tag_name.replace(args.replace_from, args.replace_to) new_name = photodb.normalize_tagname(new_name) if new_name == tag_name: continue if new_name in all_names: raise etiquette.exceptions.TagExists(new_name) if args.set_synonym: printline = f'{tag_name} -> {new_name}+{tag_name}' else: printline = f'{tag_name} -> {new_name}' renames.append((tag_name, new_name, printline)) if not args.autoyes: for (tag_name, new_name, printline) in renames: print(printline) if not interactive.getpermission('Ok?', must_pick=True): return for (tag_name, new_name, printline) in renames: print(printline) tag = photodb.get_tag(tag_name) tag.rename(new_name) if args.set_synonym: tag.add_synonym(tag_name) photodb.commit() DOCSTRING = ''' Etiquette CLI ============= {add_tag} {remove_tag} {digest} {easybake} {export_symlinks} {init} {purge_deleted_files} {purge_empty_albums} {search} {show_associated_directories} {set_searchhidden} {unset_searchhidden} {tag_breplace} At any time, you may add --silent, --quiet, --debug, --loud to change logging. You can add --yes to avoid the "Commit?" prompt. TO SEE DETAILS ON EACH COMMAND, RUN > etiquette_cli.py --help ''' SUB_DOCSTRINGS = dict( add_tag=''' add_tag: Add a tag to files by a filename glob. > etiquette_cli.py add_tag tag_name glob_pattern '''.strip(), remove_tag=''' remove_tag: Remove a tag from files by a filename glob. > etiquette_cli.py remove_tag tag_name glob_pattern '''.strip(), digest=''' digest: Digest a directory, adding new files as Photos into the database. > etiquette_cli.py digest directory flags: --no_albums: Do not create any albums the directories. By default, albums are created and nested to match the directory structure. --ratelimit X: Limit the ingest of new Photos to only one per X seconds. This can be used to reduce system load or to make sure that two photos don't get the same `created` timestamp. --no_recurse: Do not recurse into subdirectories. Only create Photos from files in the current directory. '''.strip(), easybake=''' easybake: Create and manipulate tags by easybake strings. > etiquette_cli.py easybake eb_string '''.strip(), export_symlinks=''' export_symlinks: Search for photos or albums, then create symlinks pointing to the results. THIS IS STILL A BIT EXPERIMENTAL. This can be used to gather up search results for the purpose of further uploading, transfering, etc. with other applications. Symlinks point to files (if result is a photo) or directories (if result is an album with an associated directory). Albums are limited to only one associated directory since the output symlink can't point to two places at once. > etiquette_cli.py export_symlinks --destination directory --search searchargs > etiquette_cli.py export_symlinks --destination directory --album-search searchargs flags: --destination: A path to a directory into which the symlinks will be placed. --dry: Print the results without actually creating the symlinks. --prune: In the destination directory, any existing symlinks whose target no longer exists will be deleted. See search --help for more info about searchargs. '''.strip(), init=''' init: Create a new Etiquette database in the current directory. > etiquette_cli.py init '''.strip(), purge_deleted_files=''' purge_deleted_files: Delete any Photo objects whose file no longer exists on disk. > etiquette_cli.py purge_deleted_files '''.strip(), purge_empty_albums=''' purge_empty_albums: Delete any albums which have no child albums or photos. Consider running purge_deleted_files first, so that albums containing deleted files will get cleared out and then caught by this function. > etiquette_cli.py purge_empty_albums '''.strip(), relocate=''' relocate: Change a photo's filepath. Used for updating photos that have been changed by external tools. > etiquette_cli.py relocate photo_id filepath '''.strip(), search=''' search: Search for photos and albums with complex operators. > etiquette_cli.py search searchargs > etiquette_cli.py search --album-search searchargs Searchargs: --area X-Y: Photo/video width*height between X and Y. --width X-Y: Photo/video width between X and Y. --height X-Y: Photo/video height between X and Y. --ratio X-Y: Photo/video aspect ratio between X and Y. --bytes X-Y: File size in bytes between X and Y. --duration X-Y: Media duration between X and Y seconds. --author X: Photo authored by user with username X. --created X-Y: Photo creation date between X and Y unix timestamp. --extension A,B,C: Photo with any extension of A, B, C... --extension_not A,B,C: Photo without any extension of A, B, C... --filename X: Search terms for Photo's filename. --has_tags yes/no/null: If yes, Photo must have at least one tag. If no, Photo must have no tags. If null, doesn't matter. --has_thumbnail yes/no/null: --is_searchhidden yes/no/null: --mimetype A,B,C: Photo with any mimetype of A, B, C... --tag_musts A,B,C: Photo must have all tags A and B and C... --tag_mays A,B,C: Photo must have at least one tag of A, B, C... --tag_forbids A,B,C: Photo must not have any tags of A, B, C... --tag_expression X: Complex expression string to match tags. --limit X: Limit results to first X items. --offset X: Skip the first X items. --orderby X-Y: Order the results by property X in direction Y. E.g. created-desc or bytes-asc. '''.strip(), show_associated_directories=''' show_associated_directories: Show the associated directories for albums. > etiquette_cli.py show_associated_directories > etiquette_cli.py show_associated_directories --albums id id id > etiquette_cli.py show_associated_directories --album-search searchargs See search --help for more info about searchargs. '''.strip(), set_searchhidden=''' set_searchhidden: Mark photos as searchhidden. > etiquette_cli.py set_searchhidden --photos id id id > etiquette_cli.py set_searchhidden --search searchargs See search --help for more info about searchargs. '''.strip(), unset_searchhidden=''' unset_searchhidden: Unmark photos as searchhidden. > etiquette_cli.py unset_searchhidden --photos id id id > etiquette_cli.py unset_searchhidden --search searchargs See search --help for more info about searchargs. '''.strip(), tag_breplace=''' tag_breplace: For all tags in the database, use find-and-replace to rename the tags. > etiquette_cli.py tag_breplace replace_from replace_to '''.strip(), ) DOCSTRING = betterhelp.add_previews(DOCSTRING, SUB_DOCSTRINGS) def main(argv): global LOG_LEVEL (LOG_LEVEL, argv) = vlogging.get_level_by_argv(argv) parser = argparse.ArgumentParser(description=__doc__) subparsers = parser.add_subparsers() primary_args = [] photo_id_args = [] photo_search_args = [] album_id_args = [] album_search_args = [] mode = primary_args for arg in argv: if 0: pass elif arg in {'--search', '--photo_search', '--photo-search'}: mode = photo_search_args elif arg in {'--album_search', '--album-search'}: mode = album_search_args elif arg == '--photos': mode = photo_id_args elif arg == '--albums': mode = album_id_args else: mode.append(arg) p_add_tag = subparsers.add_parser('add_tag', aliases=['add-tag']) p_add_tag.add_argument('tag_name') p_add_tag.add_argument('globs', nargs='*') p_add_tag.add_argument('--yes', dest='autoyes', action='store_true') p_add_tag.set_defaults(func=lambda args: add_remove_tag_argparse(args, action='add')) p_remove_tag = subparsers.add_parser('remove_tag', aliases=['remove-tag']) p_remove_tag.add_argument('tag_name') p_remove_tag.add_argument('globs', nargs='*') p_remove_tag.add_argument('--yes', dest='autoyes', action='store_true') p_remove_tag.set_defaults(func=lambda args: add_remove_tag_argparse(args, action='remove')) p_digest = subparsers.add_parser('digest', aliases=['digest_directory', 'digest-directory']) p_digest.add_argument('directory') p_digest.add_argument('--no_albums', '--no-albums', dest='make_albums', action='store_false', default=True) p_digest.add_argument('--ratelimit', dest='ratelimit', type=float, default=0.2) p_digest.add_argument('--no_recurse', '--no-recurse', dest='recurse', action='store_false', default=True) p_digest.add_argument('--yes', dest='autoyes', action='store_true') p_digest.set_defaults(func=digest_directory_argparse) p_easybake = subparsers.add_parser('easybake') p_easybake.add_argument('eb_strings', nargs='+') p_easybake.add_argument('--yes', dest='autoyes', action='store_true') p_easybake.set_defaults(func=easybake_argparse) p_export_symlinks = subparsers.add_parser('export_symlinks', aliases=['export-symlinks']) p_export_symlinks.add_argument('--destination', dest='destination', required=True) p_export_symlinks.add_argument('--dry', dest='dry_run', action='store_true') p_export_symlinks.add_argument('--prune', dest='prune', action='store_true') p_export_symlinks.set_defaults(func=export_symlinks_argparse) p_init = subparsers.add_parser('init', aliases=['create']) p_init.set_defaults(func=init_argparse) p_purge_deleted_files = subparsers.add_parser('purge_deleted_files', aliases=['purge-deleted-files']) p_purge_deleted_files.add_argument('--yes', dest='autoyes', action='store_true') p_purge_deleted_files.set_defaults(func=purge_deleted_files_argparse) p_purge_empty_albums = subparsers.add_parser('purge_empty_albums', aliases=['purge-empty-albums']) p_purge_empty_albums.add_argument('--yes', dest='autoyes', action='store_true') p_purge_empty_albums.set_defaults(func=purge_empty_albums_argparse) p_relocate = subparsers.add_parser('relocate') p_relocate.add_argument('photo_id') p_relocate.add_argument('filepath') p_relocate.add_argument('--yes', dest='autoyes', action='store_true') p_relocate.set_defaults(func=relocate_argparse) p_search = subparsers.add_parser('search') p_search.add_argument('--area', dest='area', default=None) p_search.add_argument('--width', dest='width', default=None) p_search.add_argument('--height', dest='height', default=None) p_search.add_argument('--ratio', dest='ratio', default=None) p_search.add_argument('--bytes', dest='bytes', default=None) p_search.add_argument('--duration', dest='duration', default=None) p_search.add_argument('--author', dest='author', default=None) p_search.add_argument('--created', dest='created', default=None) p_search.add_argument('--extension', dest='extension', default=None) p_search.add_argument('--extension_not', '--extension-not', dest='extension_not', default=None) p_search.add_argument('--filename', dest='filename', default=None) p_search.add_argument('--has_tags', '--has-tags', dest='has_tags', default=None) p_search.add_argument('--has_thumbnail', '--has-thumbnail', dest='has_thumbnail', default=None) p_search.add_argument('--is_searchhidden', '--is-searchhidden', dest='is_searchhidden', default=False) p_search.add_argument('--mimetype', dest='mimetype', default=None) p_search.add_argument('--tag_musts', '--tag-musts', dest='tag_musts', default=None) p_search.add_argument('--tag_mays', '--tag-mays', dest='tag_mays', default=None) p_search.add_argument('--tag_forbids', '--tag-forbids', dest='tag_forbids', default=None) p_search.add_argument('--tag_expression', '--tag-expression', dest='tag_expression', default=None) p_search.add_argument('--limit', dest='limit', default=None) p_search.add_argument('--offset', dest='offset', default=None) p_search.add_argument('--orderby', dest='orderby', default='basename-ASC') # p_search.add_argument('--yield_albums', '--yield-albums', dest='yield_albums', default=None) p_search.set_defaults(func=search_argparse) p_show_associated_directories = subparsers.add_parser('show_associated_directories', aliases=['show-associated-directories']) p_show_associated_directories.set_defaults(func=show_associated_directories_argparse) p_set_searchhidden = subparsers.add_parser('set_searchhidden', aliases=['set-searchhidden']) p_set_searchhidden.add_argument('--yes', dest='autoyes', action='store_true') p_set_searchhidden.set_defaults(func=lambda args: set_unset_searchhidden_argparse(args, searchhidden=True)) p_unset_searchhidden = subparsers.add_parser('unset_searchhidden', aliases=['unset-searchhidden']) p_unset_searchhidden.add_argument('--yes', dest='autoyes', action='store_true') p_unset_searchhidden.set_defaults(func=lambda args: set_unset_searchhidden_argparse(args, searchhidden=False)) p_tag_breplace = subparsers.add_parser('tag_breplace', aliases=['tag-breplace']) p_tag_breplace.add_argument('replace_from') p_tag_breplace.add_argument('replace_to') p_tag_breplace.add_argument('--set_synonym', '--set-synonym', dest='set_synonym', action='store_true') p_tag_breplace.add_argument('--regex', dest='regex', action='store_true') p_tag_breplace.add_argument('--yes', dest='autoyes', action='store_true') p_tag_breplace.set_defaults(func=tag_breplace_argparse) ## def pp(args): args.photo_search_args = p_search.parse_args(photo_search_args) if photo_search_args else None args.album_search_args = p_search.parse_args(album_search_args) if album_search_args else None args.photo_id_args = [id for arg in photo_id_args for id in stringtools.comma_space_split(arg)] args.album_id_args = [id for arg in album_id_args for id in stringtools.comma_space_split(arg)] args.any_id_args = bool( args.photo_search_args or args.album_search_args or args.photo_id_args or args.album_id_args ) return args return betterhelp.subparser_main( primary_args, parser, main_docstring=DOCSTRING, sub_docstrings=SUB_DOCSTRINGS, args_postprocessor=pp, ) if __name__ == '__main__': raise SystemExit(main(sys.argv[1:]))