etiquette/frontends/etiquette_cli.py

786 lines
25 KiB
Python

import argparse
import os
import re
import sys
from voussoirkit import betterhelp
from voussoirkit import interactive
from voussoirkit import pathclass
from voussoirkit import pipeable
from voussoirkit import spinal
from voussoirkit import stringtools
from voussoirkit import vlogging
import etiquette
LOG_LEVEL = vlogging.NOTSET
photodbs = {}
def find_photodb():
cwd = pathclass.cwd()
try:
return photodbs[cwd]
except KeyError:
pass
# If this raises, main will catch it.
photodb = etiquette.photodb.PhotoDB.closest_photodb()
photodbs[cwd] = photodb
return photodb
# HELPERS ##########################################################################################
def export_symlinks_albums(albums, destination, dry_run):
album_directory_names = etiquette.helpers.decollide_names(albums, lambda a: a.display_name)
for (album, directory_name) in album_directory_names.items():
associated_directories = album.get_associated_directories()
if len(associated_directories) == 1:
album_dir = associated_directories.pop()
directory_name = etiquette.helpers.remove_path_badchars(directory_name)
symlink_dir = destination.with_child(directory_name)
if dry_run:
yield symlink_dir
continue
if not album_dir.exists:
continue
if symlink_dir.exists:
yield symlink_dir
continue
print(album, symlink_dir)
os.symlink(src=album_dir.absolute_path, dst=symlink_dir.absolute_path)
yield symlink_dir
def export_symlinks_photos(photos, destination, dry_run):
photo_filenames = etiquette.helpers.decollide_names(photos, lambda p: p.basename)
for (photo, filename) in photo_filenames.items():
symlink_path = destination.with_child(filename)
if dry_run:
yield symlink_path
continue
if not photo.real_path.exists:
continue
if symlink_path.exists:
yield symlink_path
continue
print(symlink_path.absolute_path)
os.symlink(src=photo.real_path.absolute_path, dst=symlink_path.absolute_path)
yield symlink_path
def get_photos_by_glob(pattern):
photodb = find_photodb()
pattern = pathclass.normalize_sep(pattern)
if pattern == '**':
return search_in_cwd(yield_photos=True, yield_albums=False)
cwd = pathclass.cwd()
(folder, pattern) = os.path.split(pattern)
if folder:
folder = cwd.join(folder)
else:
folder = cwd
files = [f for f in folder.glob(pattern) if f.is_file]
for file in files:
try:
photo = photodb.get_photo_by_path(file)
yield photo
except etiquette.exceptions.NoSuchPhoto:
pass
def get_photos_by_globs(patterns):
for pattern in patterns:
yield from get_photos_by_glob(pattern)
def get_photos_from_args(args):
photodb = find_photodb()
photos = []
if args.photo_id_args:
photos.extend(photodb.get_photos_by_id(args.photo_id_args))
if args.photo_search_args:
photos.extend(search_by_argparse(args.photo_search_args, yield_photos=True))
return photos
def get_albums_from_args(args):
photodb = find_photodb()
albums = []
if args.album_id_args:
albums.extend(photodb.get_albums_by_id(args.album_id_args))
if args.album_search_args:
albums.extend(search_by_argparse(args.album_search_args, yield_albums=True))
return albums
def search_in_cwd(**kwargs):
photodb = find_photodb()
cwd = pathclass.cwd()
return photodb.search(
within_directory=cwd,
**kwargs,
)
def search_by_argparse(args, yield_albums=False, yield_photos=False):
return search_in_cwd(
area=args.area,
width=args.width,
height=args.height,
ratio=args.ratio,
bytes=args.bytes,
duration=args.duration,
author=args.author,
created=args.created,
extension=args.extension,
extension_not=args.extension_not,
filename=args.filename,
has_tags=args.has_tags,
has_thumbnail=args.has_thumbnail,
is_searchhidden=args.is_searchhidden,
mimetype=args.mimetype,
tag_musts=args.tag_musts,
tag_mays=args.tag_mays,
tag_forbids=args.tag_forbids,
tag_expression=args.tag_expression,
limit=args.limit,
offset=args.offset,
orderby=args.orderby,
yield_albums=yield_albums,
yield_photos=yield_photos,
)
####################################################################################################
def add_remove_tag_argparse(args, action):
photodb = find_photodb()
tag = photodb.get_tag(name=args.tag_name)
if args.any_id_args:
photos = get_photos_from_args(args)
elif args.globs:
photos = get_photos_by_globs(args.globs)
else:
photos = search_in_cwd(yield_photos=True, yield_albums=False)
for photo in photos:
if action == 'add':
photo.add_tag(tag)
elif action == 'remove':
photo.remove_tag(tag)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def digest_directory_argparse(args):
directory = pathclass.Path(args.directory)
photodb = find_photodb()
digest = photodb.digest_directory(
directory,
make_albums=args.make_albums,
recurse=args.recurse,
new_photo_ratelimit=args.ratelimit,
yield_albums=True,
yield_photos=True,
)
for result in digest:
print(result)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def easybake_argparse(args):
photodb = find_photodb()
for eb_string in args.eb_strings:
notes = photodb.easybake(eb_string)
for (action, tagname) in notes:
print(action, tagname)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def export_symlinks_argparse(args):
photodb = find_photodb()
destination = pathclass.Path(args.destination)
destination.makedirs(exist_ok=True)
total_paths = set()
albums = []
if args.album_id_args:
albums.extend(photodb.get_albums_by_id(args.album_id_args))
if args.album_search_args:
albums.extend(search_by_argparse(args.album_search_args, yield_albums=True))
export = export_symlinks_albums(
albums,
destination,
dry_run=args.dry_run,
)
total_paths.update(export)
photos = []
if args.photo_id_args:
photos.extend(photodb.get_photos_by_id(args.photo_id_args))
if args.photo_search_args:
photos.extend(search_by_argparse(args.photo_search_args, yield_photos=True))
export = export_symlinks_photos(
photos,
destination,
dry_run=args.dry_run,
)
total_paths.update(export)
if args.prune and not args.dry_run:
symlinks = set(file for file in spinal.walk_generator(destination) if file.is_link)
symlinks = symlinks.difference(total_paths)
for old_symlink in symlinks:
print(f'Pruning {old_symlink}.')
os.remove(old_symlink.absolute_path)
if not old_symlink.parent.listdir():
os.rmdir(old_symlink.parent.absolute_path)
checkdirs = set(spinal.walk_generator(destination, yield_directories=True, yield_files=False))
while checkdirs:
check = checkdirs.pop()
if check not in destination:
continue
if len(check.listdir()) == 0:
os.rmdir(check.absolute_path)
checkdirs.add(check.parent)
def init_argparse(args):
photodb = etiquette.photodb.PhotoDB(create=True)
photodb.commit()
def purge_deleted_files_argparse(args):
photodb = find_photodb()
if args.photo_id_args or args.photo_search_args:
photos = get_photos_from_args(args)
else:
photos = search_in_cwd(yield_photos=True, yield_albums=False)
for deleted in photodb.purge_deleted_files(photos):
print(deleted)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def purge_empty_albums_argparse(args):
photodb = find_photodb()
# We do not check args.album_search_args because currently it is not
# possible for search results to find empty albums on account of the fact
# that albums are only yielded when they contain some result photo.
if args.album_id_args:
albums = get_albums_from_args(args)
else:
albums = photodb.get_albums_within_directory(pathclass.cwd())
for deleted in photodb.purge_empty_albums(albums):
print(deleted)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def relocate_argparse(args):
photodb = find_photodb()
photo = photodb.get_photo(args.photo_id)
photo.relocate(args.filepath)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def search_argparse(args):
photos = search_by_argparse(args, yield_photos=True)
for photo in photos:
print(photo.real_path.absolute_path)
def show_associated_directories_argparse(args):
if args.album_id_args or args.album_search_args:
albums = get_albums_from_args(args)
else:
albums = search_in_cwd(yield_photos=False, yield_albums=True)
for album in albums:
directories = album.get_associated_directories()
if not directories:
continue
directories = [f'"{d.absolute_path}"' for d in directories]
directories = ' '.join(directories)
print(f'{album} | {directories}')
def set_unset_searchhidden_argparse(args, searchhidden):
photodb = find_photodb()
if args.photo_search_args:
args.photo_search_args.is_searchhidden = not searchhidden
if args.album_search_args:
args.album_search_args.is_searchhidden = not searchhidden
if args.any_id_args:
photos = get_photos_from_args(args)
albums = get_albums_from_args(args)
photos.extend(photo for album in albums for photo in album.walk_photos())
else:
photos = search_in_cwd(yield_photos=True, yield_albums=False)
for photo in photos:
print(photo)
photo.set_searchhidden(searchhidden)
if args.autoyes or interactive.getpermission('Commit?'):
photodb.commit()
def tag_breplace_argparse(args):
photodb = find_photodb()
renames = []
tag_names = photodb.get_all_tag_names()
all_names = tag_names.union(photodb.get_all_synonyms())
for tag_name in tag_names:
if args.regex:
new_name = re.sub(args.replace_from, args.replace_to, tag_name)
else:
new_name = tag_name.replace(args.replace_from, args.replace_to)
new_name = photodb.normalize_tagname(new_name)
if new_name == tag_name:
continue
if new_name in all_names:
raise etiquette.exceptions.TagExists(new_name)
if args.set_synonym:
printline = f'{tag_name} -> {new_name}+{tag_name}'
else:
printline = f'{tag_name} -> {new_name}'
renames.append((tag_name, new_name, printline))
if not args.autoyes:
for (tag_name, new_name, printline) in renames:
print(printline)
if not interactive.getpermission('Ok?', must_pick=True):
return
for (tag_name, new_name, printline) in renames:
print(printline)
tag = photodb.get_tag(tag_name)
tag.rename(new_name)
if args.set_synonym:
tag.add_synonym(tag_name)
photodb.commit()
DOCSTRING = '''
Etiquette CLI
=============
{add_tag}
{remove_tag}
{digest}
{easybake}
{export_symlinks}
{init}
{purge_deleted_files}
{purge_empty_albums}
{search}
{show_associated_directories}
{set_searchhidden}
{unset_searchhidden}
{tag_breplace}
At any time, you may add --silent, --quiet, --debug, --loud to change logging.
You can add --yes to avoid the "Commit?" prompt.
TO SEE DETAILS ON EACH COMMAND, RUN
> etiquette_cli.py <command> --help
'''
SUB_DOCSTRINGS = dict(
add_tag='''
add_tag:
Add a tag to files by a filename glob.
> etiquette_cli.py add_tag tag_name glob_pattern
'''.strip(),
remove_tag='''
remove_tag:
Remove a tag from files by a filename glob.
> etiquette_cli.py remove_tag tag_name glob_pattern
'''.strip(),
digest='''
digest:
Digest a directory, adding new files as Photos into the database.
> etiquette_cli.py digest directory <flags>
flags:
--no_albums:
Do not create any albums the directories. By default, albums are created
and nested to match the directory structure.
--ratelimit X:
Limit the ingest of new Photos to only one per X seconds. This can be
used to reduce system load or to make sure that two photos don't get the
same `created` timestamp.
--no_recurse:
Do not recurse into subdirectories. Only create Photos from files in
the current directory.
'''.strip(),
easybake='''
easybake:
Create and manipulate tags by easybake strings.
> etiquette_cli.py easybake eb_string
'''.strip(),
export_symlinks='''
export_symlinks:
Search for photos or albums, then create symlinks pointing to the results.
THIS IS STILL A BIT EXPERIMENTAL.
This can be used to gather up search results for the purpose of further
uploading, transfering, etc. with other applications.
Symlinks point to files (if result is a photo) or directories (if result is
an album with an associated directory).
Albums are limited to only one associated directory since the output
symlink can't point to two places at once.
> etiquette_cli.py export_symlinks --destination directory --search searchargs
> etiquette_cli.py export_symlinks --destination directory --album-search searchargs
flags:
--destination:
A path to a directory into which the symlinks will be placed.
--dry:
Print the results without actually creating the symlinks.
--prune:
In the destination directory, any existing symlinks whose target no
longer exists will be deleted.
See search --help for more info about searchargs.
'''.strip(),
init='''
init:
Create a new Etiquette database in the current directory.
> etiquette_cli.py init
'''.strip(),
purge_deleted_files='''
purge_deleted_files:
Delete any Photo objects whose file no longer exists on disk.
> etiquette_cli.py purge_deleted_files
'''.strip(),
purge_empty_albums='''
purge_empty_albums:
Delete any albums which have no child albums or photos.
Consider running purge_deleted_files first, so that albums containing
deleted files will get cleared out and then caught by this function.
> etiquette_cli.py purge_empty_albums
'''.strip(),
relocate='''
relocate:
Change a photo's filepath. Used for updating photos that have been changed
by external tools.
> etiquette_cli.py relocate photo_id filepath
'''.strip(),
search='''
search:
Search for photos and albums with complex operators.
> etiquette_cli.py search searchargs
> etiquette_cli.py search --album-search searchargs
Searchargs:
--area X-Y:
Photo/video width*height between X and Y.
--width X-Y:
Photo/video width between X and Y.
--height X-Y:
Photo/video height between X and Y.
--ratio X-Y:
Photo/video aspect ratio between X and Y.
--bytes X-Y:
File size in bytes between X and Y.
--duration X-Y:
Media duration between X and Y seconds.
--author X:
Photo authored by user with username X.
--created X-Y:
Photo creation date between X and Y unix timestamp.
--extension A,B,C:
Photo with any extension of A, B, C...
--extension_not A,B,C:
Photo without any extension of A, B, C...
--filename X:
Search terms for Photo's filename.
--has_tags yes/no/null:
If yes, Photo must have at least one tag.
If no, Photo must have no tags.
If null, doesn't matter.
--has_thumbnail yes/no/null:
--is_searchhidden yes/no/null:
--mimetype A,B,C:
Photo with any mimetype of A, B, C...
--tag_musts A,B,C:
Photo must have all tags A and B and C...
--tag_mays A,B,C:
Photo must have at least one tag of A, B, C...
--tag_forbids A,B,C:
Photo must not have any tags of A, B, C...
--tag_expression X:
Complex expression string to match tags.
--limit X:
Limit results to first X items.
--offset X:
Skip the first X items.
--orderby X-Y:
Order the results by property X in direction Y. E.g. created-desc or
bytes-asc.
'''.strip(),
show_associated_directories='''
show_associated_directories:
Show the associated directories for albums.
> etiquette_cli.py show_associated_directories
> etiquette_cli.py show_associated_directories --albums id id id
> etiquette_cli.py show_associated_directories --album-search searchargs
See search --help for more info about searchargs.
'''.strip(),
set_searchhidden='''
set_searchhidden:
Mark photos as searchhidden.
> etiquette_cli.py set_searchhidden --photos id id id
> etiquette_cli.py set_searchhidden --search searchargs
See search --help for more info about searchargs.
'''.strip(),
unset_searchhidden='''
unset_searchhidden:
Unmark photos as searchhidden.
> etiquette_cli.py unset_searchhidden --photos id id id
> etiquette_cli.py unset_searchhidden --search searchargs
See search --help for more info about searchargs.
'''.strip(),
tag_breplace='''
tag_breplace:
For all tags in the database, use find-and-replace to rename the tags.
> etiquette_cli.py tag_breplace replace_from replace_to
'''.strip(),
)
DOCSTRING = betterhelp.add_previews(DOCSTRING, SUB_DOCSTRINGS)
def main(argv):
global LOG_LEVEL
(LOG_LEVEL, argv) = vlogging.get_level_by_argv(argv)
parser = argparse.ArgumentParser(description=__doc__)
subparsers = parser.add_subparsers()
primary_args = []
photo_id_args = []
photo_search_args = []
album_id_args = []
album_search_args = []
mode = primary_args
for arg in argv:
if 0:
pass
elif arg in {'--search', '--photo_search', '--photo-search'}:
mode = photo_search_args
elif arg in {'--album_search', '--album-search'}:
mode = album_search_args
elif arg == '--photos':
mode = photo_id_args
elif arg == '--albums':
mode = album_id_args
else:
mode.append(arg)
p_add_tag = subparsers.add_parser('add_tag', aliases=['add-tag'])
p_add_tag.add_argument('tag_name')
p_add_tag.add_argument('globs', nargs='*')
p_add_tag.add_argument('--yes', dest='autoyes', action='store_true')
p_add_tag.set_defaults(func=lambda args: add_remove_tag_argparse(args, action='add'))
p_remove_tag = subparsers.add_parser('remove_tag', aliases=['remove-tag'])
p_remove_tag.add_argument('tag_name')
p_remove_tag.add_argument('globs', nargs='*')
p_remove_tag.add_argument('--yes', dest='autoyes', action='store_true')
p_remove_tag.set_defaults(func=lambda args: add_remove_tag_argparse(args, action='remove'))
p_digest = subparsers.add_parser('digest', aliases=['digest_directory', 'digest-directory'])
p_digest.add_argument('directory')
p_digest.add_argument('--no_albums', '--no-albums', dest='make_albums', action='store_false', default=True)
p_digest.add_argument('--ratelimit', dest='ratelimit', type=float, default=0.2)
p_digest.add_argument('--no_recurse', '--no-recurse', dest='recurse', action='store_false', default=True)
p_digest.add_argument('--yes', dest='autoyes', action='store_true')
p_digest.set_defaults(func=digest_directory_argparse)
p_easybake = subparsers.add_parser('easybake')
p_easybake.add_argument('eb_strings', nargs='+')
p_easybake.add_argument('--yes', dest='autoyes', action='store_true')
p_easybake.set_defaults(func=easybake_argparse)
p_export_symlinks = subparsers.add_parser('export_symlinks', aliases=['export-symlinks'])
p_export_symlinks.add_argument('--destination', dest='destination', required=True)
p_export_symlinks.add_argument('--dry', dest='dry_run', action='store_true')
p_export_symlinks.add_argument('--prune', dest='prune', action='store_true')
p_export_symlinks.set_defaults(func=export_symlinks_argparse)
p_init = subparsers.add_parser('init', aliases=['create'])
p_init.set_defaults(func=init_argparse)
p_purge_deleted_files = subparsers.add_parser('purge_deleted_files', aliases=['purge-deleted-files'])
p_purge_deleted_files.add_argument('--yes', dest='autoyes', action='store_true')
p_purge_deleted_files.set_defaults(func=purge_deleted_files_argparse)
p_purge_empty_albums = subparsers.add_parser('purge_empty_albums', aliases=['purge-empty-albums'])
p_purge_empty_albums.add_argument('--yes', dest='autoyes', action='store_true')
p_purge_empty_albums.set_defaults(func=purge_empty_albums_argparse)
p_relocate = subparsers.add_parser('relocate')
p_relocate.add_argument('photo_id')
p_relocate.add_argument('filepath')
p_relocate.add_argument('--yes', dest='autoyes', action='store_true')
p_relocate.set_defaults(func=relocate_argparse)
p_search = subparsers.add_parser('search')
p_search.add_argument('--area', dest='area', default=None)
p_search.add_argument('--width', dest='width', default=None)
p_search.add_argument('--height', dest='height', default=None)
p_search.add_argument('--ratio', dest='ratio', default=None)
p_search.add_argument('--bytes', dest='bytes', default=None)
p_search.add_argument('--duration', dest='duration', default=None)
p_search.add_argument('--author', dest='author', default=None)
p_search.add_argument('--created', dest='created', default=None)
p_search.add_argument('--extension', dest='extension', default=None)
p_search.add_argument('--extension_not', '--extension-not', dest='extension_not', default=None)
p_search.add_argument('--filename', dest='filename', default=None)
p_search.add_argument('--has_tags', '--has-tags', dest='has_tags', default=None)
p_search.add_argument('--has_thumbnail', '--has-thumbnail', dest='has_thumbnail', default=None)
p_search.add_argument('--is_searchhidden', '--is-searchhidden', dest='is_searchhidden', default=False)
p_search.add_argument('--mimetype', dest='mimetype', default=None)
p_search.add_argument('--tag_musts', '--tag-musts', dest='tag_musts', default=None)
p_search.add_argument('--tag_mays', '--tag-mays', dest='tag_mays', default=None)
p_search.add_argument('--tag_forbids', '--tag-forbids', dest='tag_forbids', default=None)
p_search.add_argument('--tag_expression', '--tag-expression', dest='tag_expression', default=None)
p_search.add_argument('--limit', dest='limit', default=None)
p_search.add_argument('--offset', dest='offset', default=None)
p_search.add_argument('--orderby', dest='orderby', default='basename-ASC')
# p_search.add_argument('--yield_albums', '--yield-albums', dest='yield_albums', default=None)
p_search.set_defaults(func=search_argparse)
p_show_associated_directories = subparsers.add_parser('show_associated_directories', aliases=['show-associated-directories'])
p_show_associated_directories.set_defaults(func=show_associated_directories_argparse)
p_set_searchhidden = subparsers.add_parser('set_searchhidden', aliases=['set-searchhidden'])
p_set_searchhidden.add_argument('--yes', dest='autoyes', action='store_true')
p_set_searchhidden.set_defaults(func=lambda args: set_unset_searchhidden_argparse(args, searchhidden=True))
p_unset_searchhidden = subparsers.add_parser('unset_searchhidden', aliases=['unset-searchhidden'])
p_unset_searchhidden.add_argument('--yes', dest='autoyes', action='store_true')
p_unset_searchhidden.set_defaults(func=lambda args: set_unset_searchhidden_argparse(args, searchhidden=False))
p_tag_breplace = subparsers.add_parser('tag_breplace', aliases=['tag-breplace'])
p_tag_breplace.add_argument('replace_from')
p_tag_breplace.add_argument('replace_to')
p_tag_breplace.add_argument('--set_synonym', '--set-synonym', dest='set_synonym', action='store_true')
p_tag_breplace.add_argument('--regex', dest='regex', action='store_true')
p_tag_breplace.add_argument('--yes', dest='autoyes', action='store_true')
p_tag_breplace.set_defaults(func=tag_breplace_argparse)
##
def pp(args):
args.photo_search_args = p_search.parse_args(photo_search_args) if photo_search_args else None
args.album_search_args = p_search.parse_args(album_search_args) if album_search_args else None
args.photo_id_args = [id for arg in photo_id_args for id in stringtools.comma_space_split(arg)]
args.album_id_args = [id for arg in album_id_args for id in stringtools.comma_space_split(arg)]
args.any_id_args = bool(
args.photo_search_args or
args.album_search_args or
args.photo_id_args or
args.album_id_args
)
return args
try:
return betterhelp.subparser_main(
primary_args,
parser,
main_docstring=DOCSTRING,
sub_docstrings=SUB_DOCSTRINGS,
args_postprocessor=pp,
)
except etiquette.exceptions.NoClosestPhotoDB as exc:
pipeable.stderr(exc.error_message)
pipeable.stderr('Try etiquette_cli init')
return 1
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))