From c33d16bff0ab0df024225bcceecbe1261ea4645e Mon Sep 17 00:00:00 2001 From: Ethan Dalool Date: Sun, 5 Jan 2020 21:29:27 -0800 Subject: [PATCH] Replace calls to str_to_fp with pathclass.Path. --- voussoirkit/spinal.py | 69 +++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 39 deletions(-) diff --git a/voussoirkit/spinal.py b/voussoirkit/spinal.py index 897cc0d..c271c77 100644 --- a/voussoirkit/spinal.py +++ b/voussoirkit/spinal.py @@ -70,7 +70,7 @@ def copy(source, file_args=None, file_kwargs=None, dir_args=None, dir_kwargs=Non ''' Perform copy_dir or copy_file as appropriate for the source path. ''' - source = str_to_fp(source) + source = pathclass.Path(source) if source.is_file: file_args = file_args or tuple() file_kwargs = file_kwargs or dict() @@ -198,12 +198,14 @@ def copy_dir( message += '`destination_new_root` can be passed.' raise ValueError(message) - source = str_to_fp(source) + source = pathclass.Path(source) + source.correct_case() if destination_new_root is not None: - source.correct_case() destination = new_root(source, destination_new_root) - destination = str_to_fp(destination) + + destination = pathclass.Path(destination) + destination.correct_case() if destination in source: raise RecursiveDirectory(source, destination) @@ -231,29 +233,25 @@ def copy_dir( exclude_directories=exclude_directories, exclude_filenames=exclude_filenames, ) - for source_abspath in walker: - # Terminology: - # abspath: C:\folder\subfolder\filename.txt - # location: C:\folder\subfolder - # base_name: filename.txt - # folder: subfolder + for source_file in walker: + if source_file.is_link: + continue - destination_abspath = source_abspath.absolute_path.replace( + destination_abspath = source_file.absolute_path.replace( source.absolute_path, destination.absolute_path ) - destination_abspath = str_to_fp(destination_abspath) + destination_file = pathclass.Path(destination_abspath) - if destination_abspath.is_dir: - raise DestinationIsDirectory(destination_abspath) + if destination_file.is_dir: + raise DestinationIsDirectory(destination_file) - destination_location = os.path.split(destination_abspath.absolute_path)[0] if not dry_run: - os.makedirs(destination_location, exist_ok=True) + os.makedirs(destination_file.parent.absolute_path, exist_ok=True) copied = copy_file( - source_abspath, - destination_abspath, + source_file, + destination_file, bytes_per_second=bytes_per_second, callback_progress=callback_file, callback_permission_denied=callback_permission_denied, @@ -358,7 +356,7 @@ def copy_file( message += '`destination_new_root` can be passed' raise ValueError(message) - source = str_to_fp(source) + source = pathclass.Path(source) if not source.is_file: raise SourceNotFile(source) @@ -366,7 +364,7 @@ def copy_file( if destination_new_root is not None: source.correct_case() destination = new_root(source, destination_new_root) - destination = str_to_fp(destination) + destination = pathclass.Path(destination) callback_progress = callback_progress or do_nothing @@ -381,7 +379,8 @@ def copy_file( return [destination, 0] source_modtime = source.stat.st_mtime - if source_modtime == destination.stat.st_mtime: + destination_modtime = destination.stat.st_mtime + if source_modtime == destination_modtime: return [destination, 0] # Copy @@ -469,7 +468,7 @@ def get_dir_size(path): Calculate the total number of bytes across all files in this directory and its subdirectories. ''' - path = str_to_fp(path) + path = pathclass.Path(path) if not path.is_dir: raise SourceNotDirectory(path) @@ -484,8 +483,8 @@ def is_subfolder(parent, child): ''' Determine whether parent contains child. ''' - parent = normalize(str_to_fp(parent).absolute_path) + os.sep - child = normalize(str_to_fp(child).absolute_path) + os.sep + parent = normalize(pathclass.Path(parent).absolute_path) + os.sep + child = normalize(pathclass.Path(child).absolute_path) + os.sep return child.startswith(parent) def is_xor(*args): @@ -514,12 +513,12 @@ def new_root(filepath, root): I use this so that my G: drive can have backups from my C: and D: drives while preserving directory structure in G:\\D and G:\\C. ''' - filepath = str_to_fp(filepath).absolute_path - root = str_to_fp(root).absolute_path + filepath = pathclass.Path(filepath).absolute_path + root = pathclass.Path(root).absolute_path filepath = filepath.replace(':', os.sep) filepath = os.path.normpath(filepath) filepath = os.path.join(root, filepath) - return str_to_fp(filepath) + return pathclass.Path(filepath) def normalize(text): ''' @@ -527,21 +526,13 @@ def normalize(text): ''' return os.path.normpath(os.path.normcase(text)) -def str_to_fp(path): - ''' - If `path` is a string, create a Path object, otherwise just return it. - ''' - if isinstance(path, str): - path = pathclass.Path(path) - return path - def verify_hash(path, known_size, known_hash, callback=None): ''' callback: A function that takes three parameters: path object, bytes ingested so far, bytes total ''' - path = str_to_fp(path) + path = pathclass.Path(path) log.debug('Validating hash for "%s" against %s', path.absolute_path, known_hash) file_size = os.path.getsize(path.absolute_path) if file_size != known_size: @@ -635,7 +626,7 @@ def walk_generator( exclude_filenames = {normalize(f) for f in exclude_filenames} exclude_directories = {normalize(f) for f in exclude_directories} - path = str_to_fp(path) + path = pathclass.Path(path) path.correct_case() # Considering full paths @@ -680,7 +671,7 @@ def walk_generator( callback_exclusion(absolute_name, 'directory') continue - directory = str_to_fp(absolute_name) + directory = pathclass.Path(absolute_name) directories.append(directory) elif yield_style == 'flat' and not yield_files: @@ -693,7 +684,7 @@ def walk_generator( callback_exclusion(absolute_name, 'file') continue - fp = str_to_fp(absolute_name) + fp = pathclass.Path(absolute_name) if yield_style == 'flat': yield fp else: