Rewrite sole_subdir_lift to be more recursively useful.

This commit is contained in:
voussoir 2021-03-31 19:40:47 -07:00
parent 4e10633155
commit 04b53dae8a
No known key found for this signature in database
GPG key ID: 5F7554F8C26DACCB

View file

@ -1,9 +1,5 @@
'''
This program takes a directory and does the following: If that directory
contains nothing but a single child directory, then the contents of that child
will be moved to the parent, and the empty child will be deleted.
'''
import argparse import argparse
import collections
import os import os
import shutil import shutil
import sys import sys
@ -11,40 +7,63 @@ import sys
from voussoirkit import passwordy from voussoirkit import passwordy
from voussoirkit import pathclass from voussoirkit import pathclass
from voussoirkit import pipeable from voussoirkit import pipeable
from voussoirkit import spinal
from voussoirkit import vlogging
from voussoirkit import winglob from voussoirkit import winglob
def sole_lift(starting): log = vlogging.getLogger(__name__, 'sole_subdir_lift')
starting = pathclass.Path(starting)
children = starting.listdir()
if len(children) != 1:
return
child = children[0]
if not child.is_dir:
return
temp_dir = starting.with_child(passwordy.urandom_hex(32))
os.rename(child.absolute_path, temp_dir.absolute_path)
for grandchild in temp_dir.listdir():
shutil.move(grandchild.absolute_path, starting.absolute_path)
if temp_dir.listdir():
raise Exception()
os.rmdir(temp_dir.absolute_path)
def sole_lift_argparse(args):
patterns = pipeable.input_many(args.patterns, skip_blank=True, strip=True)
directories = (pathclass.Path(d) for pattern in patterns for d in winglob.glob(pattern))
directories = (d for d in directories if d.is_dir)
for directory in directories:
sole_lift(directory)
def main(argv): def main(argv):
argv = vlogging.set_level_by_argv(log, argv)
def sole_lift_argparse(args):
starting = pathclass.Path(args.starting)
queue = collections.deque()
queue.extend(spinal.walk(starting, yield_files=False, yield_directories=True))
while len(queue) > 0:
directory = queue.popleft()
if not directory.exists:
log.debug('%s no longer exists.', directory)
continue
if directory not in starting:
log.debug('%s is outside of starting.', directory)
continue
children = directory.listdir()
child_count = len(children)
if child_count != 1:
log.debug('%s has %d children.', directory, child_count)
continue
child = children[0]
if not child.is_dir:
log.debug('%s contains a file, not a dir.', directory)
continue
log.info('Lifting contents of %s.', child.absolute_path)
# child is renamed to random hex so that the grandchildren we are about
# to lift don't have name conflicts with the child dir itself.
# Consider .\abc\abc where the grandchild can't be moved.
temp_dir = directory.with_child(passwordy.urandom_hex(32))
os.rename(child.absolute_path, temp_dir.absolute_path)
for grandchild in temp_dir.listdir():
shutil.move(grandchild.absolute_path, directory.absolute_path)
if temp_dir.listdir():
raise Exception()
os.rmdir(temp_dir.absolute_path)
queue.append(directory.parent)
def main(argv):
argv = vlogging.set_level_by_argv(log, argv)
parser = argparse.ArgumentParser(description=__doc__) parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('patterns', nargs='+', default='.') parser.add_argument('starting', nargs='?', default='.')
parser.set_defaults(func=sole_lift_argparse) parser.set_defaults(func=sole_lift_argparse)
args = parser.parse_args(argv) args = parser.parse_args(argv)