etiquette/frontends/etiquette_cli.py

528 lines
19 KiB
Python

import argparse
import os
import re
import sys
from voussoirkit import interactive
from voussoirkit import pathclass
from voussoirkit import spinal
from voussoirkit import stringtools
from voussoirkit import vlogging
import etiquette
LOG_LEVEL = vlogging.NOTSET
class CantFindPhotoDB(Exception):
pass
photodbs = {}
def find_photodb():
path = pathclass.cwd()
while True:
try:
return photodbs[path]
except KeyError:
pass
if path.with_child('_etiquette').is_dir:
break
if path == path.parent:
raise CantFindPhotoDB()
path = path.parent
photodb = etiquette.photodb.PhotoDB(
path.with_child('_etiquette'),
create=False,
log_level=LOG_LEVEL,
)
photodbs[path] = photodb
return photodb
# HELPERS ##########################################################################################
def export_symlinks_albums(albums, destination, dry_run):
album_directory_names = etiquette.helpers.decollide_names(albums, lambda a: a.display_name)
for album in albums:
associated_directories = album.get_associated_directories()
if len(associated_directories) == 1:
album_dir = associated_directories.pop()
symlink_dir = destination.with_child(etiquette.helpers.remove_path_badchars(album.display_name))
if dry_run:
yield symlink_dir
continue
if symlink_dir.exists:
yield symlink_dir
continue
print(album, symlink_dir)
os.symlink(src=album_dir.absolute_path, dst=symlink_dir.absolute_path)
yield symlink_dir
# photo_filenames = etiquette.helpers.album_photos_as_filename_map(
# album,
# once_each=False,
# naming='simplified',
# root_name=album_directory_names[album],
# )
# for (photo, filepaths) in photo_filenames.items():
# if not include_searchhidden and photo.searchhidden:
# continue
# if not photo.real_path.exists:
# continue
# for filepath in filepaths:
# filepath = destination.join(filepath)
# print(filepath.absolute_path)
# if dry_run:
# yield filepath
# continue
# if filepath.exists:
# yield filepath
# continue
# print(filepath, filepath.exists)
# filepath.parent.makedirs(exist_ok=True)
# os.symlink(src=photo.real_path.absolute_path, dst=filepath.absolute_path)
# yield filepath
def export_symlinks_photos(photos, destination, dry_run):
photo_filenames = etiquette.helpers.decollide_names(photos, lambda p: p.basename)
for (photo, filename) in photo_filenames.items():
filepath = destination.with_child(filename)
print(filepath.absolute_path)
if dry_run:
yield filepath
continue
if filepath.exists:
yield filepath
continue
os.symlink(src=photo.real_path.absolute_path, dst=filepath.absolute_path)
yield filepath
def get_photos_by_glob(pattern):
photodb = find_photodb()
pattern = pathclass.normalize_sep(pattern)
if pattern == '**':
return search_in_cwd(yield_photos=True, yield_albums=False)
cwd = pathclass.cwd()
(folder, pattern) = os.path.split(pattern)
if folder:
folder = cwd.join(folder)
else:
folder = cwd
files = [f for f in folder.glob(pattern) if f.is_file]
for file in files:
try:
photo = photodb.get_photo_by_path(file)
yield photo
except etiquette.exceptions.NoSuchPhoto:
pass
def get_photos_by_globs(patterns):
for pattern in patterns:
yield from get_photos_by_glob(pattern)
def get_photos_from_args(args):
photodb = find_photodb()
photos = []
if args.photo_id_args:
photos.extend(photodb.get_photos_by_id(args.photo_id_args))
if args.photo_search_args:
photos.extend(search_by_argparse(args.photo_search_args, yield_photos=True))
return photos
def get_albums_from_args(args):
photodb = find_photodb()
albums = []
if args.album_id_args:
albums.extend(photodb.get_albums_by_id(args.album_id_args))
if args.album_search_args:
albums.extend(search_by_argparse(args.album_search_args, yield_albums=True))
return albums
def search_in_cwd(**kwargs):
photodb = find_photodb()
cwd = pathclass.cwd()
return photodb.search(
within_directory=cwd,
**kwargs,
)
def search_by_argparse(args, yield_albums=False, yield_photos=False):
return search_in_cwd(
area=args.area,
width=args.width,
height=args.height,
ratio=args.ratio,
bytes=args.bytes,
duration=args.duration,
author=args.author,
created=args.created,
extension=args.extension,
extension_not=args.extension_not,
filename=args.filename,
has_tags=args.has_tags,
has_thumbnail=args.has_thumbnail,
is_searchhidden=args.is_searchhidden,
mimetype=args.mimetype,
tag_musts=args.tag_musts,
tag_mays=args.tag_mays,
tag_forbids=args.tag_forbids,
tag_expression=args.tag_expression,
limit=args.limit,
offset=args.offset,
orderby=args.orderby,
yield_albums=yield_albums,
yield_photos=yield_photos,
)
####################################################################################################
def add_remove_tag_argparse(args, action):
photodb = find_photodb()
tag = photodb.get_tag(name=args.tag_name)
if args.any_id_args:
photos = get_photos_from_args(args)
elif args.globs:
photos = get_photos_by_globs(args.globs)
else:
photos = search_in_cwd(yield_photos=True, yield_albums=False)
for photo in photos:
if action == 'add':
photo.add_tag(tag)
elif action == 'remove':
photo.remove_tag(tag)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def digest_directory_argparse(args):
directory = pathclass.Path(args.directory)
photodb = find_photodb()
digest = photodb.digest_directory(
directory,
make_albums=args.make_albums,
recurse=args.recurse,
new_photo_ratelimit=args.ratelimit,
yield_albums=True,
yield_photos=True,
)
for result in digest:
print(result)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def easybake_argparse(args):
photodb = find_photodb()
for eb_string in args.eb_strings:
notes = photodb.easybake(eb_string)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def export_symlinks_argparse(args):
photodb = find_photodb()
destination = pathclass.Path(args.destination)
destination.makedirs(exist_ok=True)
total_paths = set()
albums = []
if args.album_id_args:
albums.extend(photodb.get_albums_by_id(args.album_id_args))
if args.album_search_args:
albums.extend(search_by_argparse(args.album_search_args, yield_albums=True))
export = export_symlinks_albums(
albums,
destination,
dry_run=args.dry_run,
)
total_paths.update(export)
photos = []
if args.photo_id_args:
photos.extend(photodb.get_photos_by_id(args.photo_id_args))
if args.photo_search_args:
photos.extend(search_by_argparse(args.photo_search_args, yield_photos=True))
export = export_symlinks_photos(
photos,
destination,
dry_run=args.dry_run,
)
total_paths.update(export)
if args.prune and not args.dry_run:
symlinks = set(file for file in spinal.walk_generator(destination) if file.is_link)
symlinks = symlinks.difference(total_paths)
for old_symlink in symlinks:
print(f'Pruning {old_symlink}.')
os.remove(old_symlink.absolute_path)
if not old_symlink.parent.listdir():
os.rmdir(old_symlink.parent.absolute_path)
checkdirs = set(spinal.walk_generator(destination, yield_directories=True, yield_files=False))
while checkdirs:
check = checkdirs.pop()
if check not in destination:
continue
if len(check.listdir()) == 0:
os.rmdir(check.absolute_path)
checkdirs.add(check.parent)
def init_argparse(args):
photodb = etiquette.photodb.PhotoDB(create=True)
photodb.commit()
def purge_deleted_files_argparse(args):
photodb = find_photodb()
if args.photo_id_args or args.photo_search_args:
photos = get_photos_from_args(args)
else:
photos = search_in_cwd(yield_photos=True, yield_albums=False)
for deleted in photodb.purge_deleted_files(photos):
print(deleted)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def purge_empty_albums_argparse(args):
photodb = find_photodb()
# We do not check args.album_search_args because currently it is not
# possible for search results to find empty albums on account of the fact
# that albums are only yielded when they contain some result photo.
if args.album_id_args:
albums = get_albums_from_args(args)
else:
albums = photodb.get_albums_within_directory(pathclass.cwd())
for deleted in photodb.purge_empty_albums(albums):
print(deleted)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def search_argparse(args):
photos = search_by_argparse(args, yield_photos=True)
for photo in photos:
print(photo.real_path.absolute_path)
def show_associated_directories_argparse(args):
if args.album_id_args or args.album_search_args:
albums = get_albums_from_args(args)
else:
albums = search_in_cwd(yield_photos=False, yield_albums=True)
for album in albums:
directories = album.get_associated_directories()
if not directories:
continue
directories = [f'"{d.absolute_path}"' for d in directories]
directories = ' '.join(directories)
print(f'{album} | {directories}')
def set_unset_searchhidden_argparse(args, searchhidden):
photodb = find_photodb()
if args.photo_search_args:
args.photo_search_args.is_searchhidden = not searchhidden
if args.album_search_args:
args.album_search_args.is_searchhidden = not searchhidden
if args.any_id_args:
photos = get_photos_from_args(args)
albums = get_albums_from_args(args)
photos.extend(photo for album in albums for photo in album.walk_photos())
else:
photos = search_in_cwd(yield_photos=True, yield_albums=False)
for photo in photos:
print(photo)
photo.set_searchhidden(searchhidden)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def tag_breplace_argparse(args):
photodb = find_photodb()
renames = []
tag_names = photodb.get_all_tag_names()
all_names = tag_names.union(photodb.get_all_synonyms())
for tag_name in tag_names:
if args.regex:
new_name = re.sub(args.replace_from, args.replace_to, tag_name)
else:
new_name = tag_name.replace(args.replace_from, args.replace_to)
new_name = photodb.normalize_tagname(new_name)
if new_name == tag_name:
continue
if new_name in all_names:
raise etiquette.exceptions.TagExists(new_name)
if args.set_synonym:
printline = f'{tag_name} -> {new_name}+{tag_name}'
else:
printline = f'{tag_name} -> {new_name}'
renames.append((tag_name, new_name, printline))
if not args.autoyes:
for (tag_name, new_name, printline) in renames:
print(printline)
if not interactive.getpermission('Ok?', must_pick=True):
return
for (tag_name, new_name, printline) in renames:
print(printline)
tag = photodb.get_tag(tag_name)
tag.rename(new_name)
if args.set_synonym:
tag.add_synonym(tag_name)
photodb.commit()
def main(argv):
global LOG_LEVEL
(LOG_LEVEL, argv) = vlogging.get_level_by_argv(argv)
parser = argparse.ArgumentParser(description=__doc__)
subparsers = parser.add_subparsers()
primary_args = []
photo_id_args = []
photo_search_args = []
album_id_args = []
album_search_args = []
mode = primary_args
for arg in argv:
if 0:
pass
elif arg in {'--search', '--photo_search', '--photo-search'}:
mode = photo_search_args
elif arg in {'--album_search', '--album-search'}:
mode = album_search_args
elif arg == '--photos':
mode = photo_id_args
elif arg == '--albums':
mode = album_id_args
else:
mode.append(arg)
p_add_tag = subparsers.add_parser('add_tag', aliases=['add-tag'])
p_add_tag.add_argument('tag_name')
p_add_tag.add_argument('globs', nargs='*')
p_add_tag.add_argument('--yes', dest='autoyes', action='store_true')
p_add_tag.set_defaults(func=lambda args: add_remove_tag_argparse(args, action='add'))
p_remove_tag = subparsers.add_parser('remove_tag', aliases=['remove-tag'])
p_remove_tag.add_argument('tag_name')
p_remove_tag.add_argument('globs', nargs='*')
p_remove_tag.add_argument('--yes', dest='autoyes', action='store_true')
p_remove_tag.set_defaults(func=lambda args: add_remove_tag_argparse(args, action='remove'))
p_easybake = subparsers.add_parser('easybake')
p_easybake.add_argument('eb_strings', nargs='+')
p_easybake.add_argument('--yes', dest='autoyes', action='store_true')
p_easybake.set_defaults(func=easybake_argparse)
p_digest = subparsers.add_parser('digest', aliases=['digest_directory', 'digest-directory'])
p_digest.add_argument('directory')
p_digest.add_argument('--no_albums', '--no-albums', dest='make_albums', action='store_false', default=True)
p_digest.add_argument('--ratelimit', dest='ratelimit', type=float, default=0.2)
p_digest.add_argument('--no_recurse', '--no-recurse', dest='recurse', action='store_false', default=True)
p_digest.add_argument('--yes', dest='autoyes', action='store_true')
p_digest.set_defaults(func=digest_directory_argparse)
p_export_symlinks = subparsers.add_parser('export_symlinks', aliases=['export-symlinks'])
p_export_symlinks.add_argument('--destination', dest='destination', required=True)
p_export_symlinks.add_argument('--dry', dest='dry_run', action='store_true')
p_export_symlinks.add_argument('--prune', dest='prune', action='store_true')
p_export_symlinks.set_defaults(func=export_symlinks_argparse)
p_init = subparsers.add_parser('init', aliases=['create'])
p_init.set_defaults(func=init_argparse)
p_purge_deleted_files = subparsers.add_parser('purge_deleted_files', aliases=['purge-deleted-files'])
p_purge_deleted_files.add_argument('--yes', dest='autoyes', action='store_true')
p_purge_deleted_files.set_defaults(func=purge_deleted_files_argparse)
p_purge_empty_albums = subparsers.add_parser('purge_empty_albums', aliases=['purge-empty-albums'])
p_purge_empty_albums.add_argument('--yes', dest='autoyes', action='store_true')
p_purge_empty_albums.set_defaults(func=purge_empty_albums_argparse)
p_search = subparsers.add_parser('search')
p_search.add_argument('--area', dest='area', default=None)
p_search.add_argument('--width', dest='width', default=None)
p_search.add_argument('--height', dest='height', default=None)
p_search.add_argument('--ratio', dest='ratio', default=None)
p_search.add_argument('--bytes', dest='bytes', default=None)
p_search.add_argument('--duration', dest='duration', default=None)
p_search.add_argument('--author', dest='author', default=None)
p_search.add_argument('--created', dest='created', default=None)
p_search.add_argument('--extension', dest='extension', default=None)
p_search.add_argument('--extension_not', '--extension-not', dest='extension_not', default=None)
p_search.add_argument('--filename', dest='filename', default=None)
p_search.add_argument('--has_tags', '--has-tags', dest='has_tags', default=None)
p_search.add_argument('--has_thumbnail', '--has-thumbnail', dest='has_thumbnail', default=None)
p_search.add_argument('--is_searchhidden', '--is-searchhidden', dest='is_searchhidden', default=False)
p_search.add_argument('--mimetype', dest='mimetype', default=None)
p_search.add_argument('--tag_musts', '--tag-musts', dest='tag_musts', default=None)
p_search.add_argument('--tag_mays', '--tag-mays', dest='tag_mays', default=None)
p_search.add_argument('--tag_forbids', '--tag-forbids', dest='tag_forbids', default=None)
p_search.add_argument('--tag_expression', '--tag-expression', dest='tag_expression', default=None)
p_search.add_argument('--limit', dest='limit', default=None)
p_search.add_argument('--offset', dest='offset', default=None)
p_search.add_argument('--orderby', dest='orderby', default='basename-ASC')
# p_search.add_argument('--yield_albums', '--yield-albums', dest='yield_albums', default=None)
p_search.set_defaults(func=search_argparse)
p_show_associated_directories = subparsers.add_parser('show_associated_directories', aliases=['show-associated-directories'])
p_show_associated_directories.set_defaults(func=show_associated_directories_argparse)
p_set_searchhidden = subparsers.add_parser('set_searchhidden', aliases=['set-searchhidden'])
p_set_searchhidden.add_argument('--yes', dest='autoyes', action='store_true')
p_set_searchhidden.set_defaults(func=lambda args: set_unset_searchhidden_argparse(args, searchhidden=True))
p_unset_searchhidden = subparsers.add_parser('unset_searchhidden', aliases=['unset-searchhidden'])
p_unset_searchhidden.add_argument('--yes', dest='autoyes', action='store_true')
p_unset_searchhidden.set_defaults(func=lambda args: set_unset_searchhidden_argparse(args, searchhidden=False))
p_tag_breplace = subparsers.add_parser('tag_breplace', aliases=['tag-breplace'])
p_tag_breplace.add_argument('replace_from')
p_tag_breplace.add_argument('replace_to')
p_tag_breplace.add_argument('--set_synonym', '--set-synonym', dest='set_synonym', action='store_true')
p_tag_breplace.add_argument('--regex', dest='regex', action='store_true')
p_tag_breplace.add_argument('--yes', dest='autoyes', action='store_true')
p_tag_breplace.set_defaults(func=tag_breplace_argparse)
##
args = parser.parse_args(primary_args)
photo_search_args = p_search.parse_args(photo_search_args) if photo_search_args else None
album_search_args = p_search.parse_args(album_search_args) if album_search_args else None
photo_id_args = [id for arg in photo_id_args for id in stringtools.comma_space_split(arg)]
album_id_args = [id for arg in album_id_args for id in stringtools.comma_space_split(arg)]
args.photo_search_args = photo_search_args
args.album_search_args = album_search_args
args.photo_id_args = photo_id_args
args.album_id_args = album_id_args
args.any_id_args = bool(photo_search_args or album_search_args or photo_id_args or album_id_args)
return args.func(args)
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))