From b288cca5192fc600daa522e7c306f803b724301f Mon Sep 17 00:00:00 2001 From: Ethan Dalool Date: Tue, 30 Nov 2021 20:14:13 -0800 Subject: [PATCH] Rewrite a lot of pathclass, spinal.walk using tuple-based Path. I was inspired by the idea of "making impossible states impossible" and using a data model that accurately represents what we intend for it to represent. Instead of storing the path as a string where "it's a string but actually you're supposed to know that the parts between os.seps are different parts and the first one is special and...", we can use a data model that directly says that. Storing the path as a tuple of (Drive, Part, Part) helps me focus on the semantics of the Path as a collection of parts joined by the os.sep. Furthermore, storing the path as a string made some operations slow. Every time we call one of the os.path functions with a string, it has to do a lot of normalization and edge-case handling even when we know it wouldn't be needed. By storing the path as a tuple, we can instantly get the drive name, parent dir name, and basename without asking os.path to split it for us every single time. It also makes relative path / common ancestor checks a lot easier to understand. Fewer operations need to go into the slow functions. --- voussoirkit/pathclass.py | 288 +++++++++++++++++++++++++-------------- voussoirkit/spinal.py | 149 ++++++++++++-------- voussoirkit/winglob.py | 14 +- 3 files changed, 288 insertions(+), 163 deletions(-) diff --git a/voussoirkit/pathclass.py b/voussoirkit/pathclass.py index 67ab085..82933c4 100644 --- a/voussoirkit/pathclass.py +++ b/voussoirkit/pathclass.py @@ -1,11 +1,15 @@ import glob import os -import re _glob = glob from voussoirkit import winglob +if os.name == 'nt': + SEPS = {'\\', '/'} +else: + SEPS = {'/'} + WINDOWS_GLOBAL_BADCHARS = {'*', '?', '<', '>', '|', '"'} WINDOWS_BASENAME_BADCHARS = {'\\', '/', ':', '*', '?', '<', '>', '|', '"'} WINDOWS_RESERVED_NAMES = { @@ -44,6 +48,14 @@ class NotFile(PathclassException): class NotLink(PathclassException): pass +class Drive: + def __init__(self, name): + name = name.rstrip(os.sep) + self._name = name + + def __eq__(self, other): + return self._name == other._name + class Extension: def __init__(self, ext): if isinstance(ext, Extension): @@ -88,57 +100,85 @@ class Path: self, path, *, - force_sep=None, _case_correct=False, ): ''' - force_sep: - Normally, the pathclass will use the default separator for your - operating system: / on unix and \\ on windows. You can use this - argument to force a particular separator. - _case_correct: True or False. If True, this indicates that the path casing is known in advance to be correct, which means calls to correct_case can be skipped. This is helpful because correct_case can be a source of slowdown. ''' - self.force_sep = force_sep - self.sep = force_sep or os.sep - self._case_correct = _case_correct + self._absolute_path = None + self._extension = None if isinstance(path, Path): - self._absolute_path = path.absolute_path + self._parts = path._parts + self._absolute_path = path._absolute_path + self._extension = path._extension return - if not isinstance(path, str): - raise TypeError(f'path must be {Path} or {str}, not {type(path)}.') + if isinstance(path, (tuple, list)): + if len(path) == 0: + raise ValueError('Empty tuple') + drive = normalize_drive(path[0]) + parts = tuple(normalize_pathpart(part) for part in path[1:]) + self._parts = (drive, *parts) + return - path = path.strip() - if re.match(r'^[A-Za-z]:$', path): - # Bare Windows drive letter. - path += self.sep - path = normalize_sep(path) - path = os.path.normpath(path) - absolute_path = os.path.abspath(path) - self._absolute_path = normalize_sep(absolute_path, self.sep) + path = os.fspath(path) + + if isinstance(path, str): + path = os.path.abspath(path) + self._absolute_path = path + (drive, remainder) = os.path.splitdrive(path) + drive = normalize_drive(drive) + remainder = remainder.lstrip(os.sep) + # If remainder == '' then splitting it will yield [''] which we + # don't want in our parts. + if remainder: + parts = (normalize_pathpart(part) for part in remainder.split(os.sep)) + self._parts = (drive, *parts) + else: + self._parts = (drive,) + return + + raise TypeError(f'path must be {Path}, {tuple} or {str}, not {type(path)}.') def __contains__(self, other): - other = self.spawn(other) + if not isinstance(other, Path): + other = Path(other) - self_norm = self.normcase - if not self_norm.endswith(self.sep): - self_norm += self.sep - return other.normcase.startswith(self_norm) + # If other is a child of self, then other._parts must be at least as + # long as self._parts plus one. + if len(self._parts) >= len(other._parts): + return False + + # Compare by normcase so that Windows's case-insensitive filenames + # behave correctly. + # It would be fitting to do this check using ._parts, but we would + # have to normcase each part anyway so let's just do the whole string + # at once. + return other.normcase.startswith(self.normcase) def __eq__(self, other): - if not hasattr(other, 'absolute_path'): - return False + if not isinstance(other, (Path, str, tuple, list)): + try: + other = os.fspath(other) + except TypeError: + return False + + if not isinstance(other, Path): + other = Path(other) + # Compare by normcase so that Windows's case-insensitive filenames # behave correctly. return self.normcase == other.normcase + def __fspath__(self): + return self.absolute_path + def __hash__(self): return hash(self.normcase) @@ -148,10 +188,19 @@ class Path: return self.normcase < other.normcase def __repr__(self): - return '{c}({path})'.format(c=self.__class__.__name__, path=repr(self.absolute_path)) + return f'{self.__class__.__name__}({repr(self.absolute_path)})' @property def absolute_path(self): + if self._absolute_path is not None: + return self._absolute_path + + # This ensures that if this Path is just the drive, it will end with + # the sep, and all other paths do not end with the sep. + drive = self._parts[0] + parts = self._parts[1:] + absolute = drive._name + os.sep + os.sep.join(part._name for part in parts) + self._absolute_path = absolute return self._absolute_path def assert_exists(self): @@ -198,19 +247,18 @@ class Path: @property def basename(self): - return os.path.basename(self.absolute_path) + return self._parts[-1]._name def correct_case(self): if self._case_correct: return self - absolute_path = get_path_casing(self._absolute_path) - self._absolute_path = normalize_sep(absolute_path, self.sep) - self._case_correct = True + absolute_path = get_path_casing(self.absolute_path) + self.__init__(absolute_path, _case_correct=True) return self @property def depth(self): - return len(self.absolute_path.rstrip(self.sep).split(self.sep)) + return len(self._parts) @property def dot_extension(self): @@ -218,10 +266,7 @@ class Path: @property def drive(self): - drive = os.path.splitdrive(self.absolute_path)[0] - if not drive.endswith(self.sep): - drive += self.sep - return self.spawn(drive) + return Path([self._parts[0]]) @property def exists(self): @@ -229,17 +274,41 @@ class Path: @property def extension(self): - return Extension(os.path.splitext(self.absolute_path)[1]) + if self._extension is not None: + return self._extension + + # Let's consider bare drives to not have an extension. + if len(self._parts) == 1: + self._extension = '' + return self._extension + + self._extension = Extension(os.path.splitext(self.basename)[1]) + return self._extension def glob(self, pattern): - if '/' in pattern or '\\' in pattern: + ''' + Return Paths that match a glob pattern within this directory. + ''' + pattern = os.path.normpath(pattern) + + if os.sep in pattern: # If the user wants to glob names in a different path, they should # create a Pathclass for that directory first and do it normally. - raise TypeError('glob pattern should not have path separators') - pattern = os.path.join(self.absolute_path, pattern) - children = winglob.glob(pattern) - children = [self.with_child(child) for child in children] - return children + raise TypeError('glob pattern should not have path separators.') + + if not pattern: + raise ValueError('glob pattern should not be empty.') + + # I would like to rewrite this using listdir + fnmatch.filter so we can + # get straight to the basenames, but I need to learn what corner cases + # are handled by glob for us before I do so. + pattern_root = f'{self.absolute_path}{os.sep}' + cut_length = len(pattern_root) + pattern = f'{pattern_root}{pattern}' + items = winglob.glob(pattern) + basenames = (item[cut_length:] for item in items) + items = [self.with_child(item, _case_correct=self._case_correct) for item in basenames] + return items def glob_directories(self, pattern): return [p for p in self.glob(pattern) if p.is_dir] @@ -264,14 +333,17 @@ class Path: return os.path.islink(self.absolute_path) def join(self, subpath, **spawn_kwargs): + ''' + Use os.path.join to join this path with any other path string. + ''' if not isinstance(subpath, str): - raise TypeError('subpath must be a string') + raise TypeError(f'subpath must be a {str}, not {type(subpath)}.') path = os.path.join(self.absolute_path, subpath) - return self.spawn(path, **spawn_kwargs) + return Path(path, **spawn_kwargs) def listdir(self): children = os.listdir(self.absolute_path) - children = [self.join(child, _case_correct=self._case_correct) for child in children] + children = [self.with_child(child, _case_correct=self._case_correct) for child in children] return children def listdir_directories(self): @@ -285,17 +357,17 @@ class Path: @property def normcase(self): - norm = os.path.normcase(self.absolute_path) - norm = norm.replace('/', self.sep).replace('\\', self.sep) - return norm + return os.path.normcase(self.absolute_path) def open(self, *args, **kwargs): return open(self.absolute_path, *args, **kwargs) @property def parent(self): - parent = os.path.dirname(self.absolute_path) - return self.spawn(parent) + if len(self._parts) == 1: + return self + + return Path(self._parts[:-1], _case_correct=self._case_correct) def read(self, mode, **kwargs): ''' @@ -313,43 +385,40 @@ class Path: @property def relative_path(self): - return self.relative_to(os.getcwd()) + return self.relative_to(cwd()) def relative_to(self, other, simple=False): - if isinstance(other, str): + if not isinstance(other, Path): other = Path(other) if self == other: return '.' - self.correct_case() - other.correct_case() - if self in other: - relative = self.absolute_path - relative = relative.replace(other.absolute_path, '', 1) - relative = relative.lstrip(self.sep) - if not simple: - relative = '.' + self.sep + relative - return relative + sub_parts = self._parts[len(other._parts):] + relative = os.sep.join(part._name for part in sub_parts) + if simple: + return relative + else: + return f'.{os.sep}{relative}' - common = common_path([other.absolute_path, self.absolute_path], fallback=None) + common = common_path([self, other], fallback=None) if common is None: return self.absolute_path - common = self.spawn(common) backsteps = other.depth - common.depth - backsteps = self.sep.join('..' for x in range(backsteps)) - common = common.absolute_path - if not common.endswith(self.sep): - common += self.sep - unique = self.absolute_path.replace(common, '', 1) - relative_path = os.path.join(backsteps, unique) - relative_path = relative_path.replace('/', self.sep).replace('\\', self.sep) + backsteps = os.sep.join('..' for x in range(backsteps)) + unique = [part._name for part in self._parts[common.depth:]] + relative_path = os.path.join(backsteps, *unique) return relative_path def replace_extension(self, extension): + ''' + Return a new Path that has the same basename as this one, but with a + different extension. If this Path does not have any extension, it is + added. + ''' extension = Extension(extension) base = os.path.splitext(self.basename)[0] @@ -366,23 +435,29 @@ class Path: elif self.is_dir: return sum(file.size for file in self.walk() if file.is_file) - def spawn(self, path, **kwargs): - return self.__class__(path, force_sep=self.force_sep, **kwargs) - @property def stat(self): return os.stat(self.absolute_path) def touch(self): + ''' + Update the file's mtime if it exists, or create it. + ''' try: os.utime(self.absolute_path) except FileNotFoundError: self.open('a').close() def walk(self): + ''' + Yield files and directories from this directory and subdirectories. + ''' directories = [] - for child in self.listdir(): - if child.is_dir: + + entries = os.scandir(self.absolute_path) + for entry in entries: + child = self.with_child(entry.name, _case_correct=self._case_correct) + if entry.is_dir(): directories.append(child) else: yield child @@ -391,8 +466,9 @@ class Path: yield directory yield from directory.walk() - def with_child(self, basename): - return self.join(os.path.basename(basename)) + def with_child(self, basename, **spawn_kwargs): + parts = (*self._parts, basename) + return Path(parts, **spawn_kwargs) def write(self, mode, data, **kwargs): ''' @@ -401,10 +477,15 @@ class Path: with self.open(mode, **kwargs) as handle: return handle.write(data) +class PathPart: + def __init__(self, name): + if any(sep in name for sep in SEPS): + raise ValueError('A path part cannot contain path separators.') + self._name = name + def common_path(paths, fallback): ''' - Given a list of file paths, determine the deepest path which all - have in common. + Given a list of paths, determine the deepest path which all have in common. ''' if isinstance(paths, (str, Path)): raise TypeError('`paths` must be a collection') @@ -414,21 +495,21 @@ def common_path(paths, fallback): if len(paths) == 0: raise ValueError('Empty list') - if hasattr(paths, 'pop'): - model = paths.pop() - else: - model = paths[0] - paths = paths[1:] - + index = 0 while True: - if all(f in model for f in paths): - return model - parent = model.parent - if parent == model: - # We just processed the root, and now we're stuck at the root. - # Which means there was no common path. - return fallback - model = parent + try: + this_level = set(os.path.normcase(path._parts[index]._name) for path in paths) + except IndexError: + break + if len(this_level) > 1: + break + index += 1 + + if index == 0: + return fallback + + parts = paths[0]._parts[:index] + return Path(parts) def cwd(): return Path(os.getcwd()) @@ -541,12 +622,15 @@ def glob_patternize(piece): break return piece -def normalize_sep(path, sep=None): - sep = sep or os.sep - path = path.replace('/', sep) - path = path.replace('\\', sep) +def normalize_drive(name): + if type(name) is Drive: + return name + return Drive(name) - return path +def normalize_pathpart(name): + if type(name) is PathPart: + return name + return PathPart(name) def system_root(): - return os.path.abspath(os.sep) + return Path(os.sep) diff --git a/voussoirkit/spinal.py b/voussoirkit/spinal.py index bdd137b..8177e4f 100644 --- a/voussoirkit/spinal.py +++ b/voussoirkit/spinal.py @@ -2,6 +2,7 @@ This module provides functions related to walking the filesystem and copying files and folders. ''' +import collections import hashlib import os import shutil @@ -20,6 +21,8 @@ from voussoirkit import winglob log = vlogging.getLogger(__name__) BAIL = sentinel.Sentinel('BAIL') +YIELD_STYLE_FLAT = sentinel.Sentinel('yield style flat') +YIELD_STYLE_NESTED = sentinel.Sentinel('yield style nested') # Number of bytes to read and write at a time CHUNK_SIZE = 2 * bytestring.MIBIBYTE @@ -123,7 +126,7 @@ def copy_directory( Passed into each `copy_file` as `callback_progress`. callback_permission_denied: - Passed into each `copy_file` as `callback_permission_denied`. + Passed into `walk` and each `copy_file` as `callback_permission_denied`. callback_pre_directory: This function will be called before each directory and subdirectory @@ -237,9 +240,11 @@ def copy_directory( # Copy walker = walk( source, + callback_permission_denied=callback_permission_denied, exclude_directories=exclude_directories, exclude_filenames=exclude_filenames, - yield_style='nested', + sort=True, + yield_style=YIELD_STYLE_NESTED, ) def denester(walker): @@ -773,9 +778,10 @@ def walk( glob_directories=None, glob_filenames=None, recurse=True, + sort=False, yield_directories=False, yield_files=True, - yield_style='flat', + yield_style=YIELD_STYLE_FLAT, ): ''' Yield pathclass.Path objects for files in the tree, similar to os.walk. @@ -805,7 +811,16 @@ def walk( at least one of these patterns. recurse: - Yield from subdirectories. If False, only immediate files are returned. + If False, we will yield only the items from the starting path and then + stop. This might seem silly for a walk function, but it makes it easier + on the calling side to have a recurse/no-recurse option without having + to call a separate function with different arguments for each case, + while still taking advantage of the other filtering features here. + + sort: + If True, items are sorted before they are yielded. Otherwise, they + come in whatever order the filesystem returns them, which may not + be alphabetical. yield_directories: Should the generator produce directories? True or False. @@ -823,10 +838,15 @@ def walk( if not yield_directories and not yield_files: raise ValueError('yield_directories and yield_files cannot both be False.') - if yield_style not in ['flat', 'nested']: + yield_style = { + 'flat': YIELD_STYLE_FLAT, + 'nested': YIELD_STYLE_NESTED, + }.get(yield_style, yield_style) + + if yield_style not in [YIELD_STYLE_FLAT, YIELD_STYLE_NESTED]: raise ValueError(f'yield_style should be "flat" or "nested", not {yield_style}.') - callback_permission_denied = callback_permission_denied or do_nothing + callback_permission_denied = callback_permission_denied or None if exclude_filenames is not None: exclude_filenames = {normalize(f) for f in exclude_filenames} @@ -858,8 +878,8 @@ def walk( exclude = not any(winglob.fnmatch(basename, whitelisted) for whitelisted in whitelist) if blacklist is not None and not exclude: - n_basename = normalize(basename) - n_abspath = normalize(abspath) + n_basename = os.path.normcase(basename) + n_abspath = os.path.normcase(abspath) exclude = any( n_basename == blacklisted or @@ -875,68 +895,81 @@ def walk( if handle_exclusion(None, exclude_directories, path.basename, path.absolute_path): return - # In the following loops, I found joining the os.sep with fstrings to be + # In the following loop, I found joining the os.sep with fstrings to be # 10x faster than `os.path.join`, reducing a 6.75 second walk to 5.7. # Because we trust the values of current_location and the child names, # we don't run the risk of producing bad values this way. - def walkstep_nested(current_location, child_dirs, child_files): - directories = [] - new_child_dirs = [] - for child_dir in child_dirs: - child_dir_abspath = f'{current_location}{os.sep}{child_dir}' - if handle_exclusion(glob_directories, exclude_directories, child_dir, child_dir_abspath): + queue = collections.deque() + queue.append(path) + while queue: + current = queue.pop() + log.debug('Scanning %s.', current) + current_rstrip = current.absolute_path.rstrip(os.sep) + + if yield_style is YIELD_STYLE_NESTED: + child_dirs = [] + child_files = [] + + try: + entries = list(os.scandir(current)) + except (OSError, PermissionError) as exc: + if callback_permission_denied is not None: + callback_permission_denied(exc) continue + else: + raise - new_child_dirs.append(child_dir) - directories.append(pathclass.Path(child_dir_abspath, _case_correct=True)) + if sort: + entries = sorted(entries, key=lambda e: os.path.normcase(e.name)) - # This will actually affect the results of the os.walk going forward! - child_dirs[:] = new_child_dirs + # The problem with stack-based depth-first search is that the last item + # from the parent dir becomes the first to be walked, leading to + # reverse-alphabetical order directory traversal. But we also don't + # want to reverse the input entries because then the files come out + # backwards. So instead we keep a more_queue to which we appendleft so + # that it's backwards, and popping will make it forward again. + more_queue = collections.deque() + for entry in entries: + entry_abspath = f'{current_rstrip}{os.sep}{entry.name}' - files = [] - for child_file in child_files: - child_file_abspath = f'{current_location}{os.sep}{child_file}' - if handle_exclusion(glob_filenames, exclude_filenames, child_file, child_file_abspath): - continue - - files.append(pathclass.Path(child_file_abspath, _case_correct=True)) - - current_location = pathclass.Path(current_location, _case_correct=True) - yield (current_location, directories, files) - - def walkstep_flat(current_location, child_dirs, child_files): - new_child_dirs = [] - for child_dir in child_dirs: - child_dir_abspath = f'{current_location}{os.sep}{child_dir}' - if handle_exclusion(glob_directories, exclude_directories, child_dir, child_dir_abspath): - continue - - new_child_dirs.append(child_dir) - if yield_directories: - yield pathclass.Path(child_dir_abspath, _case_correct=True) - - # This will actually affect the results of the os.walk going forward! - child_dirs[:] = new_child_dirs - - if yield_files: - for child_file in child_files: - child_file_abspath = f'{current_location}{os.sep}{child_file}' - if handle_exclusion(glob_filenames, exclude_filenames, child_file, child_file_abspath): + if entry.is_dir(): + if handle_exclusion( + whitelist=glob_directories, + blacklist=exclude_directories, + basename=entry.name, + abspath=entry_abspath, + ): continue - yield pathclass.Path(child_file_abspath, _case_correct=True) + child = current.with_child(entry.name, _case_correct=True) + if yield_directories and yield_style is YIELD_STYLE_FLAT: + yield child + elif yield_style is YIELD_STYLE_NESTED: + child_dirs.append(child) - walker = os.walk(path.absolute_path, onerror=callback_permission_denied, followlinks=True) - if yield_style == 'flat': - my_stepper = walkstep_flat - if yield_style == 'nested': - my_stepper = walkstep_nested + if recurse: + more_queue.appendleft(child) - for step in walker: - yield from my_stepper(*step) - if not recurse: - break + elif entry.is_file(): + if handle_exclusion( + whitelist=glob_filenames, + blacklist=exclude_filenames, + basename=entry.name, + abspath=entry_abspath, + ): + continue + + child = current.with_child(entry.name, _case_correct=True) + if yield_files and yield_style is YIELD_STYLE_FLAT: + yield child + elif yield_style is YIELD_STYLE_NESTED: + child_files.append(child) + + queue.extend(more_queue) + + if yield_style is YIELD_STYLE_NESTED: + yield (current, child_dirs, child_files) # Backwards compatibility walk_generator = walk diff --git a/voussoirkit/winglob.py b/voussoirkit/winglob.py index 0b26252..c2f91af 100644 --- a/voussoirkit/winglob.py +++ b/voussoirkit/winglob.py @@ -4,7 +4,7 @@ However, python's glob module is written for unix-style globs in which brackets represent character classes / ranges. On Windows we should escape those brackets to get results that are consistent -with a Windows' user's expectations. But calling glob.escape would also escape +with a Windows user's expectations. But calling glob.escape would also escape asterisk which may not be desired. So this module just provides a modified version of glob.glob which will escape only square brackets when called on Windows, and behave normally on Linux. @@ -14,6 +14,11 @@ import glob as python_glob import os import re +if os.name == 'nt': + GLOB_SYMBOLS = {'*', '?'} +else: + GLOB_SYMBOLS = {'*', '?', '['} + def fix(pattern): if os.name == 'nt': pattern = re.sub(r'(\[|\])', r'[\1]', pattern) @@ -22,6 +27,9 @@ def fix(pattern): def fnmatch(name, pat): return python_fnmatch.fnmatch(name, fix(pat)) +def fnmatch_filter(names, pat): + return python_fnmatch.filter(names, fix(pat)) + def glob(pathname, *, recursive=False): return python_glob.glob(fix(pathname), recursive=recursive) @@ -35,8 +43,8 @@ def glob_many(patterns, *, recursive=False): def is_glob(pattern): ''' - Improvements can be made to consider [] ranges for unix, but properly + Improvements can be made to validate [] ranges for unix, but properly parsing the range syntax is not something I'm interested in doing right now and it would become the largest function in the whole module. ''' - return any(c in pattern for c in '*?') + return len(set(pattern).intersection(GLOB_SYMBOLS)) > 0