etiquette/frontends/etiquette_cli.py

513 lines
19 KiB
Python
Raw Normal View History

import argparse
2020-12-30 23:32:34 +00:00
import os
import re
import sys
from voussoirkit import interactive
from voussoirkit import pathclass
2020-12-30 23:32:34 +00:00
from voussoirkit import spinal
from voussoirkit import stringtools
2020-11-09 04:20:04 +00:00
from voussoirkit import vlogging
import etiquette
2020-11-09 04:20:04 +00:00
LOG_LEVEL = vlogging.NOTSET
class CantFindPhotoDB(Exception):
pass
2020-09-28 21:22:28 +00:00
photodbs = {}
def find_photodb():
path = pathclass.cwd()
2020-09-28 21:22:28 +00:00
while True:
2020-09-28 21:22:28 +00:00
try:
return photodbs[path]
except KeyError:
pass
if path.with_child('_etiquette').is_dir:
break
if path == path.parent:
raise CantFindPhotoDB()
path = path.parent
2020-11-09 04:20:04 +00:00
photodb = etiquette.photodb.PhotoDB(
path.with_child('_etiquette'),
create=False,
log_level=LOG_LEVEL,
)
2020-09-28 21:22:28 +00:00
photodbs[path] = photodb
return photodb
# HELPERS ##########################################################################################
2020-12-30 23:32:34 +00:00
def export_symlinks_albums(albums, destination, dry_run):
album_directory_names = etiquette.helpers.decollide_names(albums, lambda a: a.display_name)
for album in albums:
associated_directories = album.get_associated_directories()
if len(associated_directories) == 1:
album_dir = associated_directories.pop()
symlink_dir = destination.with_child(etiquette.helpers.remove_path_badchars(album.display_name))
if dry_run:
yield symlink_dir
continue
if symlink_dir.exists:
yield symlink_dir
continue
print(album, symlink_dir)
os.symlink(src=album_dir.absolute_path, dst=symlink_dir.absolute_path)
yield symlink_dir
# photo_filenames = etiquette.helpers.album_photos_as_filename_map(
# album,
# once_each=False,
# naming='simplified',
# root_name=album_directory_names[album],
# )
# for (photo, filepaths) in photo_filenames.items():
# if not include_searchhidden and photo.searchhidden:
# continue
# if not photo.real_path.exists:
# continue
# for filepath in filepaths:
# filepath = destination.join(filepath)
# print(filepath.absolute_path)
# if dry_run:
# yield filepath
# continue
# if filepath.exists:
# yield filepath
# continue
# print(filepath, filepath.exists)
# filepath.parent.makedirs(exist_ok=True)
# os.symlink(src=photo.real_path.absolute_path, dst=filepath.absolute_path)
# yield filepath
def export_symlinks_photos(photos, destination, dry_run):
photo_filenames = etiquette.helpers.decollide_names(photos, lambda p: p.basename)
for (photo, filename) in photo_filenames.items():
filepath = destination.with_child(filename)
print(filepath.absolute_path)
if dry_run:
yield filepath
continue
if filepath.exists:
yield filepath
continue
os.symlink(src=photo.real_path.absolute_path, dst=filepath.absolute_path)
yield filepath
def get_photos_by_glob(pattern):
photodb = find_photodb()
pattern = pathclass.normalize_sep(pattern)
if pattern == '**':
return search_in_cwd(yield_photos=True, yield_albums=False)
cwd = pathclass.cwd()
(folder, pattern) = os.path.split(pattern)
if folder:
folder = cwd.join(folder)
else:
folder = cwd
files = [f for f in folder.glob(pattern) if f.is_file]
for file in files:
try:
photo = photodb.get_photo_by_path(file)
yield photo
except etiquette.exceptions.NoSuchPhoto:
pass
def get_photos_by_globs(patterns):
for pattern in patterns:
yield from get_photos_by_glob(pattern)
def get_photos_from_args(args):
photodb = find_photodb()
photos = []
if args.photo_id_args:
photos.extend(photodb.get_photos_by_id(args.photo_id_args))
if args.photo_search_args:
photos.extend(search_by_argparse(args.photo_search_args, yield_photos=True))
return photos
def get_albums_from_args(args):
photodb = find_photodb()
albums = []
if args.album_id_args:
albums.extend(photodb.get_albums_by_id(args.album_id_args))
if args.album_search_args:
albums.extend(search_by_argparse(args.album_search_args, yield_albums=True))
return albums
def search_in_cwd(**kwargs):
photodb = find_photodb()
cwd = pathclass.cwd()
return photodb.search(
within_directory=cwd,
**kwargs,
)
def search_by_argparse(args, yield_albums=False, yield_photos=False):
return search_in_cwd(
area=args.area,
width=args.width,
height=args.height,
ratio=args.ratio,
bytes=args.bytes,
duration=args.duration,
author=args.author,
created=args.created,
extension=args.extension,
extension_not=args.extension_not,
filename=args.filename,
has_tags=args.has_tags,
has_thumbnail=args.has_thumbnail,
is_searchhidden=args.is_searchhidden,
mimetype=args.mimetype,
tag_musts=args.tag_musts,
tag_mays=args.tag_mays,
tag_forbids=args.tag_forbids,
tag_expression=args.tag_expression,
limit=args.limit,
offset=args.offset,
orderby=args.orderby,
yield_albums=yield_albums,
yield_photos=yield_photos,
)
####################################################################################################
2020-11-27 23:39:58 +00:00
def add_remove_tag_argparse(args, action):
photodb = find_photodb()
tag = photodb.get_tag(name=args.tag_name)
if args.any_id_args:
photos = get_photos_from_args(args)
elif args.globs:
photos = get_photos_by_globs(args.globs)
else:
photos = search_in_cwd(yield_photos=True, yield_albums=False)
for photo in photos:
2020-11-27 23:39:58 +00:00
if action == 'add':
photo.add_tag(tag)
elif action == 'remove':
photo.remove_tag(tag)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
2020-09-28 21:28:15 +00:00
def digest_directory_argparse(args):
directory = pathclass.Path(args.directory)
photodb = find_photodb()
digest = photodb.digest_directory(
directory,
make_albums=args.make_albums,
recurse=args.recurse,
new_photo_ratelimit=args.ratelimit,
yield_albums=True,
yield_photos=True,
)
for result in digest:
print(result)
if args.autoyes or interactive.getpermission('Commit?'):
2020-09-28 21:28:15 +00:00
photodb.commit()
2020-10-18 02:18:25 +00:00
def easybake_argparse(args):
photodb = find_photodb()
for eb_string in args.eb_strings:
notes = photodb.easybake(eb_string)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
2020-12-30 23:32:34 +00:00
def export_symlinks_argparse(args):
photodb = find_photodb()
destination = pathclass.Path(args.destination)
destination.makedirs(exist_ok=True)
total_paths = set()
albums = []
if args.album_id_args:
albums.extend(photodb.get_albums_by_id(args.album_id_args))
if args.album_search_args:
albums.extend(search_by_argparse(args.album_search_args, yield_albums=True))
export = export_symlinks_albums(
albums,
destination,
dry_run=args.dry_run,
)
total_paths.update(export)
photos = []
if args.photo_id_args:
photos.extend(photodb.get_photos_by_id(args.photo_id_args))
if args.photo_search_args:
photos.extend(search_by_argparse(args.photo_search_args, yield_photos=True))
export = export_symlinks_photos(
photos,
destination,
dry_run=args.dry_run,
)
total_paths.update(export)
if args.prune and not args.dry_run:
symlinks = set(file for file in spinal.walk_generator(destination) if file.is_link)
symlinks = symlinks.difference(total_paths)
for old_symlink in symlinks:
print(f'Pruning {old_symlink}.')
os.remove(old_symlink.absolute_path)
if not old_symlink.parent.listdir():
os.rmdir(old_symlink.parent.absolute_path)
checkdirs = set(spinal.walk_generator(destination, yield_directories=True, yield_files=False))
while checkdirs:
check = checkdirs.pop()
if check not in destination:
continue
if len(check.listdir()) == 0:
os.rmdir(check.absolute_path)
checkdirs.add(check.parent)
def init_argparse(args):
photodb = etiquette.photodb.PhotoDB(create=True)
2020-10-18 02:18:25 +00:00
photodb.commit()
def purge_deleted_files_argparse(args):
photodb = find_photodb()
for deleted in photodb.purge_deleted_files():
print(deleted)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def purge_empty_albums_argparse(args):
photodb = find_photodb()
for deleted in photodb.purge_empty_albums():
print(deleted)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def search_argparse(args):
photos = search_by_argparse(args, yield_photos=True)
for photo in photos:
print(photo.real_path.absolute_path)
def show_associated_directories_argparse(args):
if args.album_id_args or args.album_search_args:
albums = get_albums_from_args(args)
else:
albums = search_in_cwd(yield_photos=False, yield_albums=True)
for album in albums:
directories = album.get_associated_directories()
if not directories:
continue
directories = [f'"{d.absolute_path}"' for d in directories]
directories = ' '.join(directories)
print(f'{album} | {directories}')
def set_unset_searchhidden_argparse(args, searchhidden):
photodb = find_photodb()
if args.photo_search_args:
args.photo_search_args.is_searchhidden = not searchhidden
if args.album_search_args:
args.album_search_args.is_searchhidden = not searchhidden
if args.any_id_args:
photos = get_photos_from_args(args)
albums = get_albums_from_args(args)
photos.extend(photo for album in albums for photo in album.walk_photos())
else:
photos = search_in_cwd(yield_photos=True, yield_albums=False)
for photo in photos:
print(photo)
photo.set_searchhidden(searchhidden)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def tag_breplace_argparse(args):
photodb = find_photodb()
renames = []
tag_names = photodb.get_all_tag_names()
all_names = tag_names.union(photodb.get_all_synonyms())
for tag_name in tag_names:
if args.regex:
new_name = re.sub(args.replace_from, args.replace_to, tag_name)
else:
new_name = tag_name.replace(args.replace_from, args.replace_to)
new_name = photodb.normalize_tagname(new_name)
if new_name == tag_name:
continue
if new_name in all_names:
raise etiquette.exceptions.TagExists(new_name)
if args.set_synonym:
printline = f'{tag_name} -> {new_name}+{tag_name}'
else:
printline = f'{tag_name} -> {new_name}'
renames.append((tag_name, new_name, printline))
if not args.autoyes:
for (tag_name, new_name, printline) in renames:
print(printline)
if not interactive.getpermission('Ok?', must_pick=True):
return
for (tag_name, new_name, printline) in renames:
print(printline)
tag = photodb.get_tag(tag_name)
tag.rename(new_name)
if args.set_synonym:
tag.add_synonym(tag_name)
photodb.commit()
def main(argv):
2020-11-09 04:20:04 +00:00
global LOG_LEVEL
(LOG_LEVEL, argv) = vlogging.get_level_by_argv(argv)
parser = argparse.ArgumentParser(description=__doc__)
subparsers = parser.add_subparsers()
primary_args = []
photo_id_args = []
photo_search_args = []
album_id_args = []
album_search_args = []
mode = primary_args
for arg in argv:
if 0:
pass
elif arg in {'--search', '--photo_search', '--photo-search'}:
mode = photo_search_args
elif arg in {'--album_search', '--album-search'}:
mode = album_search_args
elif arg == '--photos':
mode = photo_id_args
elif arg == '--albums':
mode = album_id_args
else:
mode.append(arg)
p_add_tag = subparsers.add_parser('add_tag', aliases=['add-tag'])
p_add_tag.add_argument('tag_name')
p_add_tag.add_argument('globs', nargs='*')
p_add_tag.add_argument('--yes', dest='autoyes', action='store_true')
2020-11-27 23:39:58 +00:00
p_add_tag.set_defaults(func=lambda args: add_remove_tag_argparse(args, action='add'))
p_remove_tag = subparsers.add_parser('remove_tag', aliases=['remove-tag'])
p_remove_tag.add_argument('tag_name')
p_remove_tag.add_argument('globs', nargs='*')
p_remove_tag.add_argument('--yes', dest='autoyes', action='store_true')
p_remove_tag.set_defaults(func=lambda args: add_remove_tag_argparse(args, action='remove'))
2020-10-18 02:18:25 +00:00
p_easybake = subparsers.add_parser('easybake')
p_easybake.add_argument('eb_strings', nargs='+')
p_easybake.add_argument('--yes', dest='autoyes', action='store_true')
2020-10-18 02:18:25 +00:00
p_easybake.set_defaults(func=easybake_argparse)
2020-09-28 21:28:15 +00:00
p_digest = subparsers.add_parser('digest', aliases=['digest_directory', 'digest-directory'])
p_digest.add_argument('directory')
p_digest.add_argument('--no_albums', '--no-albums', dest='make_albums', action='store_false', default=True)
p_digest.add_argument('--ratelimit', dest='ratelimit', type=float, default=0.2)
p_digest.add_argument('--no_recurse', '--no-recurse', dest='recurse', action='store_false', default=True)
p_digest.add_argument('--yes', dest='autoyes', action='store_true')
p_digest.set_defaults(func=digest_directory_argparse)
2020-12-30 23:32:34 +00:00
p_export_symlinks = subparsers.add_parser('export_symlinks', aliases=['export-symlinks'])
p_export_symlinks.add_argument('--destination', dest='destination', required=True)
p_export_symlinks.add_argument('--dry', dest='dry_run', action='store_true')
p_export_symlinks.add_argument('--prune', dest='prune', action='store_true')
p_export_symlinks.set_defaults(func=export_symlinks_argparse)
p_init = subparsers.add_parser('init', aliases=['create'])
p_init.set_defaults(func=init_argparse)
p_purge_deleted_files = subparsers.add_parser('purge_deleted_files', aliases=['purge-deleted-files'])
p_purge_deleted_files.add_argument('--yes', dest='autoyes', action='store_true')
p_purge_deleted_files.set_defaults(func=purge_deleted_files_argparse)
p_purge_empty_albums = subparsers.add_parser('purge_empty_albums', aliases=['purge-empty-albums'])
p_purge_empty_albums.add_argument('--yes', dest='autoyes', action='store_true')
p_purge_empty_albums.set_defaults(func=purge_empty_albums_argparse)
p_search = subparsers.add_parser('search')
p_search.add_argument('--area', dest='area', default=None)
p_search.add_argument('--width', dest='width', default=None)
p_search.add_argument('--height', dest='height', default=None)
p_search.add_argument('--ratio', dest='ratio', default=None)
p_search.add_argument('--bytes', dest='bytes', default=None)
p_search.add_argument('--duration', dest='duration', default=None)
p_search.add_argument('--author', dest='author', default=None)
p_search.add_argument('--created', dest='created', default=None)
p_search.add_argument('--extension', dest='extension', default=None)
p_search.add_argument('--extension_not', '--extension-not', dest='extension_not', default=None)
p_search.add_argument('--filename', dest='filename', default=None)
p_search.add_argument('--has_tags', '--has-tags', dest='has_tags', default=None)
p_search.add_argument('--has_thumbnail', '--has-thumbnail', dest='has_thumbnail', default=None)
p_search.add_argument('--is_searchhidden', '--is-searchhidden', dest='is_searchhidden', default=False)
p_search.add_argument('--mimetype', dest='mimetype', default=None)
p_search.add_argument('--tag_musts', '--tag-musts', dest='tag_musts', default=None)
p_search.add_argument('--tag_mays', '--tag-mays', dest='tag_mays', default=None)
p_search.add_argument('--tag_forbids', '--tag-forbids', dest='tag_forbids', default=None)
p_search.add_argument('--tag_expression', '--tag-expression', dest='tag_expression', default=None)
p_search.add_argument('--limit', dest='limit', default=None)
p_search.add_argument('--offset', dest='offset', default=None)
p_search.add_argument('--orderby', dest='orderby', default='basename-ASC')
# p_search.add_argument('--yield_albums', '--yield-albums', dest='yield_albums', default=None)
p_search.set_defaults(func=search_argparse)
p_show_associated_directories = subparsers.add_parser('show_associated_directories', aliases=['show-associated-directories'])
p_show_associated_directories.set_defaults(func=show_associated_directories_argparse)
p_set_searchhidden = subparsers.add_parser('set_searchhidden', aliases=['set-searchhidden'])
p_set_searchhidden.add_argument('--yes', dest='autoyes', action='store_true')
p_set_searchhidden.set_defaults(func=lambda args: set_unset_searchhidden_argparse(args, searchhidden=True))
p_unset_searchhidden = subparsers.add_parser('unset_searchhidden', aliases=['unset-searchhidden'])
p_unset_searchhidden.add_argument('--yes', dest='autoyes', action='store_true')
p_unset_searchhidden.set_defaults(func=lambda args: set_unset_searchhidden_argparse(args, searchhidden=False))
2020-11-27 23:40:13 +00:00
p_tag_breplace = subparsers.add_parser('tag_breplace', aliases=['tag-breplace'])
2020-10-26 03:21:27 +00:00
p_tag_breplace.add_argument('replace_from')
p_tag_breplace.add_argument('replace_to')
p_tag_breplace.add_argument('--set_synonym', '--set-synonym', dest='set_synonym', action='store_true')
p_tag_breplace.add_argument('--regex', dest='regex', action='store_true')
p_tag_breplace.add_argument('--yes', dest='autoyes', action='store_true')
p_tag_breplace.set_defaults(func=tag_breplace_argparse)
##
args = parser.parse_args(primary_args)
photo_search_args = p_search.parse_args(photo_search_args) if photo_search_args else None
album_search_args = p_search.parse_args(album_search_args) if album_search_args else None
photo_id_args = [id for arg in photo_id_args for id in stringtools.comma_space_split(arg)]
album_id_args = [id for arg in album_id_args for id in stringtools.comma_space_split(arg)]
args.photo_search_args = photo_search_args
args.album_search_args = album_search_args
args.photo_id_args = photo_id_args
args.album_id_args = album_id_args
args.any_id_args = bool(photo_search_args or album_search_args or photo_id_args or album_id_args)
return args.func(args)
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))