Replace calls to str_to_fp with pathclass.Path.

This commit is contained in:
Ethan Dalool 2020-01-05 21:29:27 -08:00
parent 76abee4607
commit c33d16bff0

View file

@ -70,7 +70,7 @@ def copy(source, file_args=None, file_kwargs=None, dir_args=None, dir_kwargs=Non
'''
Perform copy_dir or copy_file as appropriate for the source path.
'''
source = str_to_fp(source)
source = pathclass.Path(source)
if source.is_file:
file_args = file_args or tuple()
file_kwargs = file_kwargs or dict()
@ -198,12 +198,14 @@ def copy_dir(
message += '`destination_new_root` can be passed.'
raise ValueError(message)
source = str_to_fp(source)
source = pathclass.Path(source)
source.correct_case()
if destination_new_root is not None:
source.correct_case()
destination = new_root(source, destination_new_root)
destination = str_to_fp(destination)
destination = pathclass.Path(destination)
destination.correct_case()
if destination in source:
raise RecursiveDirectory(source, destination)
@ -231,29 +233,25 @@ def copy_dir(
exclude_directories=exclude_directories,
exclude_filenames=exclude_filenames,
)
for source_abspath in walker:
# Terminology:
# abspath: C:\folder\subfolder\filename.txt
# location: C:\folder\subfolder
# base_name: filename.txt
# folder: subfolder
for source_file in walker:
if source_file.is_link:
continue
destination_abspath = source_abspath.absolute_path.replace(
destination_abspath = source_file.absolute_path.replace(
source.absolute_path,
destination.absolute_path
)
destination_abspath = str_to_fp(destination_abspath)
destination_file = pathclass.Path(destination_abspath)
if destination_abspath.is_dir:
raise DestinationIsDirectory(destination_abspath)
if destination_file.is_dir:
raise DestinationIsDirectory(destination_file)
destination_location = os.path.split(destination_abspath.absolute_path)[0]
if not dry_run:
os.makedirs(destination_location, exist_ok=True)
os.makedirs(destination_file.parent.absolute_path, exist_ok=True)
copied = copy_file(
source_abspath,
destination_abspath,
source_file,
destination_file,
bytes_per_second=bytes_per_second,
callback_progress=callback_file,
callback_permission_denied=callback_permission_denied,
@ -358,7 +356,7 @@ def copy_file(
message += '`destination_new_root` can be passed'
raise ValueError(message)
source = str_to_fp(source)
source = pathclass.Path(source)
if not source.is_file:
raise SourceNotFile(source)
@ -366,7 +364,7 @@ def copy_file(
if destination_new_root is not None:
source.correct_case()
destination = new_root(source, destination_new_root)
destination = str_to_fp(destination)
destination = pathclass.Path(destination)
callback_progress = callback_progress or do_nothing
@ -381,7 +379,8 @@ def copy_file(
return [destination, 0]
source_modtime = source.stat.st_mtime
if source_modtime == destination.stat.st_mtime:
destination_modtime = destination.stat.st_mtime
if source_modtime == destination_modtime:
return [destination, 0]
# Copy
@ -469,7 +468,7 @@ def get_dir_size(path):
Calculate the total number of bytes across all files in this directory
and its subdirectories.
'''
path = str_to_fp(path)
path = pathclass.Path(path)
if not path.is_dir:
raise SourceNotDirectory(path)
@ -484,8 +483,8 @@ def is_subfolder(parent, child):
'''
Determine whether parent contains child.
'''
parent = normalize(str_to_fp(parent).absolute_path) + os.sep
child = normalize(str_to_fp(child).absolute_path) + os.sep
parent = normalize(pathclass.Path(parent).absolute_path) + os.sep
child = normalize(pathclass.Path(child).absolute_path) + os.sep
return child.startswith(parent)
def is_xor(*args):
@ -514,12 +513,12 @@ def new_root(filepath, root):
I use this so that my G: drive can have backups from my C: and D: drives
while preserving directory structure in G:\\D and G:\\C.
'''
filepath = str_to_fp(filepath).absolute_path
root = str_to_fp(root).absolute_path
filepath = pathclass.Path(filepath).absolute_path
root = pathclass.Path(root).absolute_path
filepath = filepath.replace(':', os.sep)
filepath = os.path.normpath(filepath)
filepath = os.path.join(root, filepath)
return str_to_fp(filepath)
return pathclass.Path(filepath)
def normalize(text):
'''
@ -527,21 +526,13 @@ def normalize(text):
'''
return os.path.normpath(os.path.normcase(text))
def str_to_fp(path):
'''
If `path` is a string, create a Path object, otherwise just return it.
'''
if isinstance(path, str):
path = pathclass.Path(path)
return path
def verify_hash(path, known_size, known_hash, callback=None):
'''
callback:
A function that takes three parameters:
path object, bytes ingested so far, bytes total
'''
path = str_to_fp(path)
path = pathclass.Path(path)
log.debug('Validating hash for "%s" against %s', path.absolute_path, known_hash)
file_size = os.path.getsize(path.absolute_path)
if file_size != known_size:
@ -635,7 +626,7 @@ def walk_generator(
exclude_filenames = {normalize(f) for f in exclude_filenames}
exclude_directories = {normalize(f) for f in exclude_directories}
path = str_to_fp(path)
path = pathclass.Path(path)
path.correct_case()
# Considering full paths
@ -680,7 +671,7 @@ def walk_generator(
callback_exclusion(absolute_name, 'directory')
continue
directory = str_to_fp(absolute_name)
directory = pathclass.Path(absolute_name)
directories.append(directory)
elif yield_style == 'flat' and not yield_files:
@ -693,7 +684,7 @@ def walk_generator(
callback_exclusion(absolute_name, 'file')
continue
fp = str_to_fp(absolute_name)
fp = pathclass.Path(absolute_name)
if yield_style == 'flat':
yield fp
else: