Rewrite a lot of pathclass, spinal.walk using tuple-based Path.
I was inspired by the idea of "making impossible states impossible" and using a data model that accurately represents what we intend for it to represent. Instead of storing the path as a string where "it's a string but actually you're supposed to know that the parts between os.seps are different parts and the first one is special and...", we can use a data model that directly says that. Storing the path as a tuple of (Drive, Part, Part) helps me focus on the semantics of the Path as a collection of parts joined by the os.sep. Furthermore, storing the path as a string made some operations slow. Every time we call one of the os.path functions with a string, it has to do a lot of normalization and edge-case handling even when we know it wouldn't be needed. By storing the path as a tuple, we can instantly get the drive name, parent dir name, and basename without asking os.path to split it for us every single time. It also makes relative path / common ancestor checks a lot easier to understand. Fewer operations need to go into the slow functions.
This commit is contained in:
3 changed files with 288 additions and 163 deletions
@ -1,11 +1,15 @@
import glob
import os
import re
_glob = glob
from voussoirkit import winglob
if == 'nt':
SEPS = {'\\', '/'}
SEPS = {'/'}
WINDOWS_GLOBAL_BADCHARS = {'*', '?', '<', '>', '|', '"'}
WINDOWS_BASENAME_BADCHARS = {'\\', '/', ':', '*', '?', '<', '>', '|', '"'}
@ -44,6 +48,14 @@ class NotFile(PathclassException):
class NotLink(PathclassException):
class Drive:
def __init__(self, name):
name = name.rstrip(os.sep)
self._name = name
def __eq__(self, other):
return self._name == other._name
class Extension:
def __init__(self, ext):
if isinstance(ext, Extension):
@ -88,57 +100,85 @@ class Path:
Normally, the pathclass will use the default separator for your
operating system: / on unix and \\ on windows. You can use this
argument to force a particular separator.
True or False. If True, this indicates that the path casing is
known in advance to be correct, which means calls to correct_case
can be skipped. This is helpful because correct_case can be a
source of slowdown.
self.force_sep = force_sep
self.sep = force_sep or os.sep
self._case_correct = _case_correct
self._absolute_path = None
self._extension = None
if isinstance(path, Path):
self._absolute_path = path.absolute_path
self._parts = path._parts
self._absolute_path = path._absolute_path
self._extension = path._extension
if not isinstance(path, str):
raise TypeError(f'path must be {Path} or {str}, not {type(path)}.')
if isinstance(path, (tuple, list)):
if len(path) == 0:
raise ValueError('Empty tuple')
drive = normalize_drive(path[0])
parts = tuple(normalize_pathpart(part) for part in path[1:])
self._parts = (drive, *parts)
path = path.strip()
if re.match(r'^[A-Za-z]:$', path):
# Bare Windows drive letter.
path += self.sep
path = normalize_sep(path)
path = os.path.normpath(path)
absolute_path = os.path.abspath(path)
self._absolute_path = normalize_sep(absolute_path, self.sep)
path = os.fspath(path)
if isinstance(path, str):
path = os.path.abspath(path)
self._absolute_path = path
(drive, remainder) = os.path.splitdrive(path)
drive = normalize_drive(drive)
remainder = remainder.lstrip(os.sep)
# If remainder == '' then splitting it will yield [''] which we
# don't want in our parts.
if remainder:
parts = (normalize_pathpart(part) for part in remainder.split(os.sep))
self._parts = (drive, *parts)
self._parts = (drive,)
raise TypeError(f'path must be {Path}, {tuple} or {str}, not {type(path)}.')
def __contains__(self, other):
other = self.spawn(other)
if not isinstance(other, Path):
other = Path(other)
self_norm = self.normcase
if not self_norm.endswith(self.sep):
self_norm += self.sep
return other.normcase.startswith(self_norm)
# If other is a child of self, then other._parts must be at least as
# long as self._parts plus one.
if len(self._parts) >= len(other._parts):
return False
# Compare by normcase so that Windows's case-insensitive filenames
# behave correctly.
# It would be fitting to do this check using ._parts, but we would
# have to normcase each part anyway so let's just do the whole string
# at once.
return other.normcase.startswith(self.normcase)
def __eq__(self, other):
if not hasattr(other, 'absolute_path'):
if not isinstance(other, (Path, str, tuple, list)):
other = os.fspath(other)
except TypeError:
return False
if not isinstance(other, Path):
other = Path(other)
# Compare by normcase so that Windows's case-insensitive filenames
# behave correctly.
return self.normcase == other.normcase
def __fspath__(self):
return self.absolute_path
def __hash__(self):
return hash(self.normcase)
@ -148,10 +188,19 @@ class Path:
return self.normcase < other.normcase
def __repr__(self):
return '{c}({path})'.format(c=self.__class__.__name__, path=repr(self.absolute_path))
return f'{self.__class__.__name__}({repr(self.absolute_path)})'
def absolute_path(self):
if self._absolute_path is not None:
return self._absolute_path
# This ensures that if this Path is just the drive, it will end with
# the sep, and all other paths do not end with the sep.
drive = self._parts[0]
parts = self._parts[1:]
absolute = drive._name + os.sep + os.sep.join(part._name for part in parts)
self._absolute_path = absolute
return self._absolute_path
def assert_exists(self):
@ -198,19 +247,18 @@ class Path:
def basename(self):
return os.path.basename(self.absolute_path)
return self._parts[-1]._name
def correct_case(self):
if self._case_correct:
return self
absolute_path = get_path_casing(self._absolute_path)
self._absolute_path = normalize_sep(absolute_path, self.sep)
self._case_correct = True
absolute_path = get_path_casing(self.absolute_path)
self.__init__(absolute_path, _case_correct=True)
return self
def depth(self):
return len(self.absolute_path.rstrip(self.sep).split(self.sep))
return len(self._parts)
def dot_extension(self):
@ -218,10 +266,7 @@ class Path:
def drive(self):
drive = os.path.splitdrive(self.absolute_path)[0]
if not drive.endswith(self.sep):
drive += self.sep
return self.spawn(drive)
return Path([self._parts[0]])
def exists(self):
@ -229,17 +274,41 @@ class Path:
def extension(self):
return Extension(os.path.splitext(self.absolute_path)[1])
if self._extension is not None:
return self._extension
# Let's consider bare drives to not have an extension.
if len(self._parts) == 1:
self._extension = ''
return self._extension
self._extension = Extension(os.path.splitext(self.basename)[1])
return self._extension
def glob(self, pattern):
if '/' in pattern or '\\' in pattern:
Return Paths that match a glob pattern within this directory.
pattern = os.path.normpath(pattern)
if os.sep in pattern:
# If the user wants to glob names in a different path, they should
# create a Pathclass for that directory first and do it normally.
raise TypeError('glob pattern should not have path separators')
pattern = os.path.join(self.absolute_path, pattern)
children = winglob.glob(pattern)
children = [self.with_child(child) for child in children]
return children
raise TypeError('glob pattern should not have path separators.')
if not pattern:
raise ValueError('glob pattern should not be empty.')
# I would like to rewrite this using listdir + fnmatch.filter so we can
# get straight to the basenames, but I need to learn what corner cases
# are handled by glob for us before I do so.
pattern_root = f'{self.absolute_path}{os.sep}'
cut_length = len(pattern_root)
pattern = f'{pattern_root}{pattern}'
items = winglob.glob(pattern)
basenames = (item[cut_length:] for item in items)
items = [self.with_child(item, _case_correct=self._case_correct) for item in basenames]
return items
def glob_directories(self, pattern):
return [p for p in self.glob(pattern) if p.is_dir]
@ -264,14 +333,17 @@ class Path:
return os.path.islink(self.absolute_path)
def join(self, subpath, **spawn_kwargs):
Use os.path.join to join this path with any other path string.
if not isinstance(subpath, str):
raise TypeError('subpath must be a string')
raise TypeError(f'subpath must be a {str}, not {type(subpath)}.')
path = os.path.join(self.absolute_path, subpath)
return self.spawn(path, **spawn_kwargs)
return Path(path, **spawn_kwargs)
def listdir(self):
children = os.listdir(self.absolute_path)
children = [self.join(child, _case_correct=self._case_correct) for child in children]
children = [self.with_child(child, _case_correct=self._case_correct) for child in children]
return children
def listdir_directories(self):
@ -285,17 +357,17 @@ class Path:
def normcase(self):
norm = os.path.normcase(self.absolute_path)
norm = norm.replace('/', self.sep).replace('\\', self.sep)
return norm
return os.path.normcase(self.absolute_path)
def open(self, *args, **kwargs):
return open(self.absolute_path, *args, **kwargs)
def parent(self):
parent = os.path.dirname(self.absolute_path)
return self.spawn(parent)
if len(self._parts) == 1:
return self
return Path(self._parts[:-1], _case_correct=self._case_correct)
def read(self, mode, **kwargs):
@ -313,43 +385,40 @@ class Path:
def relative_path(self):
return self.relative_to(os.getcwd())
return self.relative_to(cwd())
def relative_to(self, other, simple=False):
if isinstance(other, str):
if not isinstance(other, Path):
other = Path(other)
if self == other:
return '.'
if self in other:
relative = self.absolute_path
relative = relative.replace(other.absolute_path, '', 1)
relative = relative.lstrip(self.sep)
if not simple:
relative = '.' + self.sep + relative
sub_parts = self._parts[len(other._parts):]
relative = os.sep.join(part._name for part in sub_parts)
if simple:
return relative
return f'.{os.sep}{relative}'
common = common_path([other.absolute_path, self.absolute_path], fallback=None)
common = common_path([self, other], fallback=None)
if common is None:
return self.absolute_path
common = self.spawn(common)
backsteps = other.depth - common.depth
backsteps = self.sep.join('..' for x in range(backsteps))
common = common.absolute_path
if not common.endswith(self.sep):
common += self.sep
unique = self.absolute_path.replace(common, '', 1)
relative_path = os.path.join(backsteps, unique)
relative_path = relative_path.replace('/', self.sep).replace('\\', self.sep)
backsteps = os.sep.join('..' for x in range(backsteps))
unique = [part._name for part in self._parts[common.depth:]]
relative_path = os.path.join(backsteps, *unique)
return relative_path
def replace_extension(self, extension):
Return a new Path that has the same basename as this one, but with a
different extension. If this Path does not have any extension, it is
extension = Extension(extension)
base = os.path.splitext(self.basename)[0]
@ -366,23 +435,29 @@ class Path:
elif self.is_dir:
return sum(file.size for file in self.walk() if file.is_file)
def spawn(self, path, **kwargs):
return self.__class__(path, force_sep=self.force_sep, **kwargs)
def stat(self):
return os.stat(self.absolute_path)
def touch(self):
Update the file's mtime if it exists, or create it.
except FileNotFoundError:
def walk(self):
Yield files and directories from this directory and subdirectories.
directories = []
for child in self.listdir():
if child.is_dir:
entries = os.scandir(self.absolute_path)
for entry in entries:
child = self.with_child(, _case_correct=self._case_correct)
if entry.is_dir():
yield child
@ -391,8 +466,9 @@ class Path:
yield directory
yield from directory.walk()
def with_child(self, basename):
return self.join(os.path.basename(basename))
def with_child(self, basename, **spawn_kwargs):
parts = (*self._parts, basename)
return Path(parts, **spawn_kwargs)
def write(self, mode, data, **kwargs):
@ -401,10 +477,15 @@ class Path:
with, **kwargs) as handle:
return handle.write(data)
class PathPart:
def __init__(self, name):
if any(sep in name for sep in SEPS):
raise ValueError('A path part cannot contain path separators.')
self._name = name
def common_path(paths, fallback):
Given a list of file paths, determine the deepest path which all
have in common.
Given a list of paths, determine the deepest path which all have in common.
if isinstance(paths, (str, Path)):
raise TypeError('`paths` must be a collection')
@ -414,21 +495,21 @@ def common_path(paths, fallback):
if len(paths) == 0:
raise ValueError('Empty list')
if hasattr(paths, 'pop'):
model = paths.pop()
model = paths[0]
paths = paths[1:]
index = 0
while True:
if all(f in model for f in paths):
return model
parent = model.parent
if parent == model:
# We just processed the root, and now we're stuck at the root.
# Which means there was no common path.
this_level = set(os.path.normcase(path._parts[index]._name) for path in paths)
except IndexError:
if len(this_level) > 1:
index += 1
if index == 0:
return fallback
model = parent
parts = paths[0]._parts[:index]
return Path(parts)
def cwd():
return Path(os.getcwd())
@ -541,12 +622,15 @@ def glob_patternize(piece):
return piece
def normalize_sep(path, sep=None):
sep = sep or os.sep
path = path.replace('/', sep)
path = path.replace('\\', sep)
def normalize_drive(name):
if type(name) is Drive:
return name
return Drive(name)
return path
def normalize_pathpart(name):
if type(name) is PathPart:
return name
return PathPart(name)
def system_root():
return os.path.abspath(os.sep)
return Path(os.sep)
@ -2,6 +2,7 @@
This module provides functions related to walking the filesystem and
copying files and folders.
import collections
import hashlib
import os
import shutil
@ -20,6 +21,8 @@ from voussoirkit import winglob
log = vlogging.getLogger(__name__)
BAIL = sentinel.Sentinel('BAIL')
YIELD_STYLE_FLAT = sentinel.Sentinel('yield style flat')
YIELD_STYLE_NESTED = sentinel.Sentinel('yield style nested')
# Number of bytes to read and write at a time
CHUNK_SIZE = 2 * bytestring.MIBIBYTE
@ -123,7 +126,7 @@ def copy_directory(
Passed into each `copy_file` as `callback_progress`.
Passed into each `copy_file` as `callback_permission_denied`.
Passed into `walk` and each `copy_file` as `callback_permission_denied`.
This function will be called before each directory and subdirectory
@ -237,9 +240,11 @@ def copy_directory(
# Copy
walker = walk(
def denester(walker):
@ -773,9 +778,10 @@ def walk(
Yield pathclass.Path objects for files in the tree, similar to os.walk.
@ -805,7 +811,16 @@ def walk(
at least one of these patterns.
Yield from subdirectories. If False, only immediate files are returned.
If False, we will yield only the items from the starting path and then
stop. This might seem silly for a walk function, but it makes it easier
on the calling side to have a recurse/no-recurse option without having
to call a separate function with different arguments for each case,
while still taking advantage of the other filtering features here.
If True, items are sorted before they are yielded. Otherwise, they
come in whatever order the filesystem returns them, which may not
be alphabetical.
Should the generator produce directories? True or False.
@ -823,10 +838,15 @@ def walk(
if not yield_directories and not yield_files:
raise ValueError('yield_directories and yield_files cannot both be False.')
if yield_style not in ['flat', 'nested']:
yield_style = {
}.get(yield_style, yield_style)
if yield_style not in [YIELD_STYLE_FLAT, YIELD_STYLE_NESTED]:
raise ValueError(f'yield_style should be "flat" or "nested", not {yield_style}.')
callback_permission_denied = callback_permission_denied or do_nothing
callback_permission_denied = callback_permission_denied or None
if exclude_filenames is not None:
exclude_filenames = {normalize(f) for f in exclude_filenames}
@ -858,8 +878,8 @@ def walk(
exclude = not any(winglob.fnmatch(basename, whitelisted) for whitelisted in whitelist)
if blacklist is not None and not exclude:
n_basename = normalize(basename)
n_abspath = normalize(abspath)
n_basename = os.path.normcase(basename)
n_abspath = os.path.normcase(abspath)
exclude = any(
n_basename == blacklisted or
@ -875,68 +895,81 @@ def walk(
if handle_exclusion(None, exclude_directories, path.basename, path.absolute_path):
# In the following loops, I found joining the os.sep with fstrings to be
# In the following loop, I found joining the os.sep with fstrings to be
# 10x faster than `os.path.join`, reducing a 6.75 second walk to 5.7.
# Because we trust the values of current_location and the child names,
# we don't run the risk of producing bad values this way.
def walkstep_nested(current_location, child_dirs, child_files):
directories = []
new_child_dirs = []
for child_dir in child_dirs:
child_dir_abspath = f'{current_location}{os.sep}{child_dir}'
if handle_exclusion(glob_directories, exclude_directories, child_dir, child_dir_abspath):
queue = collections.deque()
while queue:
current = queue.pop()
log.debug('Scanning %s.', current)
current_rstrip = current.absolute_path.rstrip(os.sep)
if yield_style is YIELD_STYLE_NESTED:
child_dirs = []
child_files = []
entries = list(os.scandir(current))
except (OSError, PermissionError) as exc:
if callback_permission_denied is not None:
if sort:
entries = sorted(entries, key=lambda e: os.path.normcase(
# The problem with stack-based depth-first search is that the last item
# from the parent dir becomes the first to be walked, leading to
# reverse-alphabetical order directory traversal. But we also don't
# want to reverse the input entries because then the files come out
# backwards. So instead we keep a more_queue to which we appendleft so
# that it's backwards, and popping will make it forward again.
more_queue = collections.deque()
for entry in entries:
entry_abspath = f'{current_rstrip}{os.sep}{}'
if entry.is_dir():
if handle_exclusion(
directories.append(pathclass.Path(child_dir_abspath, _case_correct=True))
child = current.with_child(, _case_correct=True)
if yield_directories and yield_style is YIELD_STYLE_FLAT:
yield child
elif yield_style is YIELD_STYLE_NESTED:
# This will actually affect the results of the os.walk going forward!
child_dirs[:] = new_child_dirs
if recurse:
files = []
for child_file in child_files:
child_file_abspath = f'{current_location}{os.sep}{child_file}'
if handle_exclusion(glob_filenames, exclude_filenames, child_file, child_file_abspath):
elif entry.is_file():
if handle_exclusion(
files.append(pathclass.Path(child_file_abspath, _case_correct=True))
child = current.with_child(, _case_correct=True)
if yield_files and yield_style is YIELD_STYLE_FLAT:
yield child
elif yield_style is YIELD_STYLE_NESTED:
current_location = pathclass.Path(current_location, _case_correct=True)
yield (current_location, directories, files)
def walkstep_flat(current_location, child_dirs, child_files):
new_child_dirs = []
for child_dir in child_dirs:
child_dir_abspath = f'{current_location}{os.sep}{child_dir}'
if handle_exclusion(glob_directories, exclude_directories, child_dir, child_dir_abspath):
if yield_directories:
yield pathclass.Path(child_dir_abspath, _case_correct=True)
# This will actually affect the results of the os.walk going forward!
child_dirs[:] = new_child_dirs
if yield_files:
for child_file in child_files:
child_file_abspath = f'{current_location}{os.sep}{child_file}'
if handle_exclusion(glob_filenames, exclude_filenames, child_file, child_file_abspath):
yield pathclass.Path(child_file_abspath, _case_correct=True)
walker = os.walk(path.absolute_path, onerror=callback_permission_denied, followlinks=True)
if yield_style == 'flat':
my_stepper = walkstep_flat
if yield_style == 'nested':
my_stepper = walkstep_nested
for step in walker:
yield from my_stepper(*step)
if not recurse:
if yield_style is YIELD_STYLE_NESTED:
yield (current, child_dirs, child_files)
# Backwards compatibility
walk_generator = walk
@ -4,7 +4,7 @@ However, python's glob module is written for unix-style globs in which brackets
represent character classes / ranges.
On Windows we should escape those brackets to get results that are consistent
with a Windows' user's expectations. But calling glob.escape would also escape
with a Windows user's expectations. But calling glob.escape would also escape
asterisk which may not be desired. So this module just provides a modified
version of glob.glob which will escape only square brackets when called on
Windows, and behave normally on Linux.
@ -14,6 +14,11 @@ import glob as python_glob
import os
import re
if == 'nt':
GLOB_SYMBOLS = {'*', '?'}
GLOB_SYMBOLS = {'*', '?', '['}
def fix(pattern):
if == 'nt':
pattern = re.sub(r'(\[|\])', r'[\1]', pattern)
@ -22,6 +27,9 @@ def fix(pattern):
def fnmatch(name, pat):
return python_fnmatch.fnmatch(name, fix(pat))
def fnmatch_filter(names, pat):
return python_fnmatch.filter(names, fix(pat))
def glob(pathname, *, recursive=False):
return python_glob.glob(fix(pathname), recursive=recursive)
@ -35,8 +43,8 @@ def glob_many(patterns, *, recursive=False):
def is_glob(pattern):
Improvements can be made to consider [] ranges for unix, but properly
Improvements can be made to validate [] ranges for unix, but properly
parsing the range syntax is not something I'm interested in doing right now
and it would become the largest function in the whole module.
return any(c in pattern for c in '*?')
return len(set(pattern).intersection(GLOB_SYMBOLS)) > 0
Reference in a new issue