This commit is contained in:
Ethan Dalool 2016-08-09 01:33:36 -07:00
parent 4b7416874f
commit 53645b0123
16 changed files with 327 additions and 280 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 183 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 172 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 79 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 189 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 64 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 178 KiB

View file

@ -31,8 +31,6 @@ def bytestring(size, force_unit=None):
If None, an appropriate size unit is chosen automatically. If None, an appropriate size unit is chosen automatically.
Otherwise, you can provide one of the size constants to force that divisor. Otherwise, you can provide one of the size constants to force that divisor.
''' '''
# choose which magnitutde to use as the divisor
if force_unit is None: if force_unit is None:
divisor = get_appropriate_divisor(size) divisor = get_appropriate_divisor(size)
else: else:
@ -53,9 +51,12 @@ def get_appropriate_divisor(size):
return appropriate_unit return appropriate_unit
def parsebytes(string): def parsebytes(string):
string = string.lower().replace(' ', '') '''
Given a string like "100 kib", return the appropriate integer value.
'''
string = string.lower().strip().replace(' ', '')
matches = re.findall('((\\.|\\d)+)', string) matches = re.findall('((\\.|-|\\d)+)', string)
if len(matches) == 0: if len(matches) == 0:
raise ValueError('No numbers found') raise ValueError('No numbers found')
if len(matches) > 1: if len(matches) > 1:
@ -65,16 +66,19 @@ def parsebytes(string):
if not string.startswith(byte_value): if not string.startswith(byte_value):
raise ValueError('Number is not at start of string') raise ValueError('Number is not at start of string')
# if the string has no text besides the number, just return that int.
string = string.replace(byte_value, '') string = string.replace(byte_value, '')
byte_value = float(byte_value) byte_value = float(byte_value)
if string == '': if string == '':
return byte_value return int(byte_value)
reversed_units = {value.lower():key for (key, value) in UNIT_STRINGS.items()} reversed_units = {value.lower():key for (key, value) in UNIT_STRINGS.items()}
for (unit_string, multiplier) in reversed_units.items(): for (unit_string, multiplier) in reversed_units.items():
# accept kib, k, kb
if string in (unit_string, unit_string[0], unit_string.replace('i', '')): if string in (unit_string, unit_string[0], unit_string.replace('i', '')):
break break
else: else:
raise ValueError('Could not determine byte value of %s' % string) raise ValueError('Could not determine byte value of %s' % string)
return byte_value * multiplier return int(byte_value * multiplier)

View file

@ -0,0 +1,30 @@
import bytestring
import unittest
pairs = {
100: '100.000 b',
2 ** 10: '1.000 KiB',
2 ** 20: '1.000 MiB',
2 ** 30: '1.000 GiB',
-(2 ** 30): '-1.000 GiB',
(2 ** 30) + (512 * (2 ** 20)): '1.500 GiB',
}
class BytestringTest(unittest.TestCase):
def test_bytestring(self):
for (number, text) in pairs.items():
self.assertEqual(bytestring.bytestring(number), text)
def test_parsebytes(self):
for (number, text) in pairs.items():
self.assertEqual(bytestring.parsebytes(text), number)
self.assertEqual(bytestring.parsebytes('100k'), 102400)
self.assertEqual(bytestring.parsebytes('100 k'), 102400)
self.assertEqual(bytestring.parsebytes('100 kb'), 102400)
self.assertEqual(bytestring.parsebytes('100 kib'), 102400)
self.assertEqual(bytestring.parsebytes('100.00KB'), 102400)
self.assertEqual(bytestring.parsebytes('1.5 mb'), 1572864)
self.assertEqual(bytestring.parsebytes('-1.5 mb'), -1572864)
if __name__ == '__main__':
unittest.main()

View file

@ -147,21 +147,21 @@ def download_file(
if user_range_max != '': if user_range_max != '':
user_range_max = int(user_range_max) user_range_max = int(user_range_max)
else: else:
# Included to determine whether the server supports this
headers['range'] = 'bytes=0-'
user_range_min = None user_range_min = None
user_range_max = None user_range_max = None
# Always include a range on the first request to figure out whether the
# server supports it. Use 0- so we get the right `remote_total_bytes`.
temp_headers = headers
temp_headers.update({'range': 'bytes=0-'})
# I'm using a GET instead of an actual HEAD here because some servers respond # I'm using a GET instead of an actual HEAD here because some servers respond
# differently, even though they're not supposed to. # differently, even though they're not supposed to.
head = request('get', url, stream=True, headers=headers, auth=auth) head = request('get', url, stream=True, headers=temp_headers, auth=auth)
remote_total_bytes = int(head.headers.get('content-length', 1)) remote_total_bytes = int(head.headers.get('content-length', 1))
server_respects_range = (head.status_code == 206 and 'content-range' in head.headers) server_respects_range = (head.status_code == 206 and 'content-range' in head.headers)
head.connection.close() head.connection.close()
if not user_provided_range:
del headers['range']
touch(localname) touch(localname)
file_handle = open(localname, 'r+b') file_handle = open(localname, 'r+b')
file_handle.seek(0) file_handle.seek(0)

View file

@ -198,7 +198,6 @@ HTML_TREE_HEADER = '''
<script type="text/javascript"> <script type="text/javascript">
function collapse(div) function collapse(div)
{ {
//div = document.getElementById(id);
if (div.style.display != "none") if (div.style.display != "none")
{ {
div.style.display = "none"; div.style.display = "none";
@ -339,7 +338,7 @@ class Walker:
when `self.fullscan` is False but the url is not a SKIPPABLE_FILETYPE. when `self.fullscan` is False but the url is not a SKIPPABLE_FILETYPE.
when the url is an index page. when the url is an index page.
GET: GET:
when the url is a index page. when the url is an index page.
''' '''
if url is None: if url is None:
url = self.walkurl url = self.walkurl
@ -797,7 +796,7 @@ def write(line, file_handle=None, **kwargs):
## COMMANDLINE FUNCTIONS ########################################################################### ## COMMANDLINE FUNCTIONS ###########################################################################
## ## ## ##
def digest(databasename, walkurl, fullscan=False): def digest(walkurl, databasename=None, fullscan=False):
if walkurl in ('!clipboard', '!c'): if walkurl in ('!clipboard', '!c'):
walkurl = get_clipboard() walkurl = get_clipboard()
write('From clipboard: %s' % walkurl) write('From clipboard: %s' % walkurl)
@ -811,8 +810,8 @@ def digest(databasename, walkurl, fullscan=False):
def digest_argparse(args): def digest_argparse(args):
return digest( return digest(
databasename=args.databasename, databasename=args.databasename,
walkurl=args.walkurl,
fullscan=args.fullscan, fullscan=args.fullscan,
walkurl=args.walkurl,
) )
def download( def download(
@ -1015,29 +1014,26 @@ def measure(databasename, fullscan=False, new_only=False):
filecount = 0 filecount = 0
unmeasured_file_count = 0 unmeasured_file_count = 0
try:
for fetch in items: for fetch in items:
size = fetch[SQL_CONTENT_LENGTH]
if fullscan or new_only:
url = fetch[SQL_URL]
head = do_head(url, raise_for_status=False)
fetch = smart_insert(sql, cur, head=head, commit=True)
size = fetch[SQL_CONTENT_LENGTH] size = fetch[SQL_CONTENT_LENGTH]
if size is None:
if fullscan or new_only: write('"%s" is not revealing Content-Length' % url)
url = fetch[SQL_URL]
head = do_head(url, raise_for_status=False)
fetch = smart_insert(sql, cur, head=head, commit=True)
size = fetch[SQL_CONTENT_LENGTH]
if size is None:
write('"%s" is not revealing Content-Length' % url)
size = 0
elif fetch[SQL_CONTENT_LENGTH] is None:
unmeasured_file_count += 1
size = 0 size = 0
totalsize += size
filecount += 1 elif fetch[SQL_CONTENT_LENGTH] is None:
except: unmeasured_file_count += 1
sql.commit() size = 0
raise
totalsize += size
filecount += 1
sql.commit() sql.commit()
short_string = bytestring.bytestring(totalsize) short_string = bytestring.bytestring(totalsize)

105
Pathclass/pathclass.py Normal file
View file

@ -0,0 +1,105 @@
import glob
import os
class Path:
'''
I started to use pathlib.Path, but it was too much of a pain.
'''
def __init__(self, path):
path = os.path.normpath(path)
path = os.path.abspath(path)
path = get_path_casing(path)
self.absolute_path = path
def __contains__(self, other):
return other.absolute_path.startswith(self.absolute_path)
def __hash__(self):
return hash(self.absolute_path)
@property
def basename(self):
return os.path.basename(self.absolute_path)
@property
def exists(self):
return os.path.exists(self.absolute_path)
@property
def is_dir(self):
return os.path.isdir(self.absolute_path)
@property
def is_file(self):
return os.path.isfile(self.absolute_path)
@property
def is_link(self):
return os.path.islink(self.absolute_path)
@property
def parent(self):
parent = os.path.dirname(self.absolute_path)
parent = self.__class__(parent)
return parent
@property
def relative_path(self):
relative = self.absolute_path
relative = relative.replace(os.getcwd(), '')
relative = relative.lstrip(os.sep)
return relative
@property
def size(self):
if self.is_file:
return os.path.getsize(self.absolute_path)
else:
return None
@property
def stat(self):
return os.stat(self.absolute_path)
def get_path_casing(path):
'''
Take what is perhaps incorrectly cased input and get the path's actual
casing according to the filesystem.
Thank you:
Ethan Furman http://stackoverflow.com/a/7133137/5430534
xvorsx http://stackoverflow.com/a/14742779/5430534
'''
if isinstance(path, Path):
path = path.absolute_path
(drive, subpath) = os.path.splitdrive(path)
subpath = subpath.lstrip(os.sep)
def patternize(piece):
'''
Create a pattern like "[u]ser" from "user", forcing glob to look up the
correct path name, and guaranteeing that the only result will be the correct path.
Special cases are:
!, because in glob syntax, [!x] tells glob to look for paths that don't contain
"x". [!] is invalid syntax, so we pick the first non-! character to put
in the brackets.
[, because this starts a capture group
'''
piece = glob.escape(piece)
for character in piece:
if character not in '![]':
replacement = '[%s]' % character
#print(piece, character, replacement)
piece = piece.replace(character, replacement, 1)
break
return piece
pattern = [patternize(piece) for piece in subpath.split(os.sep)]
pattern = os.sep.join(pattern)
pattern = drive.upper() + os.sep + pattern
#print(pattern)
try:
return glob.glob(pattern)[0]
except IndexError:
return path

View file

@ -133,7 +133,22 @@ Suppose you're getting data from an imaginary website which sends you items in g
&nbsp; &nbsp;
#### Sqlite3 fetch generator
This is one that I almost always include in my program when I'm doing lots of sqlite work. Sqlite cursors don't allow you to simply do a for-loop over the results of a SELECT, so this generator is very handy:
def fetch_generator(cur):
while True:
item = cur.fetchone()
if item is None:
break
yield item
cur.execute('SELECT * FROM table')
for item in fetch_generator(cur):
print(item)
&nbsp;
# Further reading # Further reading

View file

@ -0,0 +1,71 @@
Custom file extensions on Windows
=================================
In this tutorial I will create a file extension, `.vtxt` that opens in Notepad.
Note: If certain things are not taking effect right away, you may need to restart explorer.exe through the task manager.
1. Open regedit.exe to HKEY_CLASSES_ROOT
2. Right click on HKEY_CLASSES_ROOT and create a new key. I'll refer to this as the "ProgID key".
![Screenshot](/../master/.GitImages/quicktips_extension_create.png?raw=true)
3. Name it according to the ProgID standards outlined here: [Programmatic Identifiers - MSDN](https://msdn.microsoft.com/en-us/library/windows/desktop/cc144152(v=vs.85).aspx)
>The proper format of a ProgID key name is [*Vendor or Application*].[*Component*].[*Version*], separated by periods and with no spaces, as in `Word.Document.6`
I will call mine `voussoir.vtxt`
4. Right click on HKCR and create another new key, and name it after your extension. For mine, it's `.vtxt`. I'll refer to this as the "Extension key".
5. On your extension key, double-click the `(Default)` value, and enter the name of your ProgID.
![Screenshot](/../master/.GitImages/quicktips_extension_extkey.png?raw=true)
6. On your ProgID key, set the `(Default)` value to a description of your file type. This is what you'll see when you hover over the file, or view the Properties dialog of your filetype. According to the MSDN ProgID article, you should also create a value `FriendlyTypeName` with the exact same text.
![Screenshot](/../master/.GitImages/quicktips_extension_description.png?raw=true)
7. On your ProgID key, create a subkey `DefaultIcon`, and set its `(Default)` value to the filepath of a .ico file, and specify the icon's index within that file. For example: `C:\mystuff\myextension.ico,0`. My file is `C:\vtxt.ico`. It only contains one image, so I'll use the index 0.
![Screenshot](/../master/.GitImages/quicktips_extension_icon.png?raw=true)
8. Lastly, it's time to associate the extension with a program. On your ProgID key, create subkeys `shell\open\command`.
9. On the `open` subkey, you can set the `(Default)` value to be a caption that appears on the context menu to open the file. If you don't, it will just say "Open".
![Screenshot](/../master/.GitImages/quicktips_extension_caption.png?raw=true)
10. On the `command` subkey, set the `(Default)` value to a command to launch your file. This can be complex, so for a basic solution, just use something like `notepad.exe "%L"`, where %L will become the filename of your file, so notepad knows what to open. Some more info can be found [here on superuser.com](http://superuser.com/a/473602).
11. Try opening your file!
![Screenshot](/../master/.GitImages/quicktips_extension_command.png?raw=true)
That should give you the basics. The MSDN articles go into more detail about the other values your ProgID can have.
You can save this as a `.reg` file if you want:
Windows Registry Editor Version 5.00
[HKEY_CLASSES_ROOT\.vtxt]
@="voussoir.vtxt"
[HKEY_CLASSES_ROOT\voussoir.vtxt]
@="voussoir's text type"
"FriendlyTypeName"="voussoir's text type"
[HKEY_CLASSES_ROOT\voussoir.vtxt\DefaultIcon]
@="C:\\vtxt.ico,0"
[HKEY_CLASSES_ROOT\voussoir.vtxt\shell]
[HKEY_CLASSES_ROOT\voussoir.vtxt\shell\open]
@="Let 'er rip"
[HKEY_CLASSES_ROOT\voussoir.vtxt\shell\open\command]
@="notepad.exe \"%L\""

View file

@ -8,4 +8,4 @@ Note: Many projects in this repository import other projects. If you see
import sys import sys
sys.path.append('C:\\git\\else\\ratelimiter'); import ratelimiter sys.path.append('C:\\git\\else\\ratelimiter'); import ratelimiter
Just come back to this page and download those files. Arranging them is up to you. Just come back to this page and download those files. You can put them in your Lib folder, or in the same folder as the program you're trying to use, or you can modify the sys path extension.

View file

@ -9,16 +9,12 @@ import sys
import types import types
sys.path.append('C:\\git\\else\\Bytestring'); import bytestring sys.path.append('C:\\git\\else\\Bytestring'); import bytestring
sys.path.append('C:\\git\\else\\Pathclass'); import pathclass
sys.path.append('C:\\git\\else\\Ratelimiter'); import ratelimiter sys.path.append('C:\\git\\else\\Ratelimiter'); import ratelimiter
sys.path.append('C:\\git\\else\\SpinalTap'); import spinal sys.path.append('C:\\git\\else\\SpinalTap'); import spinal
FILE_READ_CHUNK = bytestring.MIBIBYTE FILE_READ_CHUNK = bytestring.MIBIBYTE
#f = open('favicon.png', 'rb')
#FAVI = f.read()
#f.close()
CWD = os.getcwd()
# The paths which the user may access. # The paths which the user may access.
# Attempting to access anything outside will 403. # Attempting to access anything outside will 403.
# These are convered to Path objects after that class definition. # These are convered to Path objects after that class definition.
@ -37,22 +33,14 @@ OPENDIR_TEMPLATE = '''
</html> </html>
''' '''
class Path: class Path(pathclass.Path):
''' '''
I started to use pathlib.Path, but it was too much of a pain. Add some server-specific abilities to the Pathclass
''' '''
def __init__(self, path): def __init__(self, path):
path = urllib.parse.unquote(path) path = urllib.parse.unquote(path)
path = path.strip('/') path = path.strip('/')
path = os.path.normpath(path) pathclass.Path.__init__(self, path)
path = spinal.get_path_casing(path).path
self.absolute_path = path
def __contains__(self, other):
return other.absolute_path.startswith(self.absolute_path)
def __hash__(self):
return hash(self.absolute_path)
@property @property
def allowed(self): def allowed(self):
@ -77,38 +65,6 @@ class Path:
) )
return a return a
@property
def basename(self):
return os.path.basename(self.absolute_path)
@property
def is_dir(self):
return os.path.isdir(self.absolute_path)
@property
def is_file(self):
return os.path.isfile(self.absolute_path)
@property
def parent(self):
parent = os.path.dirname(self.absolute_path)
parent = Path(parent)
return parent
@property
def relative_path(self):
relative = self.absolute_path
relative = relative.replace(CWD, '')
relative = relative.lstrip(os.sep)
return relative
@property
def size(self):
if self.is_file:
return os.path.getsize(self.absolute_path)
else:
return None
def table_row(self, display_name=None, shaded=False): def table_row(self, display_name=None, shaded=False):
form = '<tr style="background-color:#{bg}"><td style="width:90%">{anchor}</td><td>{size}</td></tr>' form = '<tr style="background-color:#{bg}"><td style="width:90%">{anchor}</td><td>{size}</td></tr>'
size = self.size size = self.size

View file

@ -8,24 +8,12 @@ import string
import sys import sys
import time import time
sys.path.append('C:\\git\\else\\ratelimiter'); import ratelimiter sys.path.append('C:\\git\\else\\Bytestring'); import bytestring
sys.path.append('C:\\git\\else\\Pathclass'); import pathclass
sys.path.append('C:\\git\\else\\Ratelimiter'); import ratelimiter
BYTE = 1
KIBIBYTE = BYTE * 1024
MIBIBYTE = KIBIBYTE * 1024
GIBIBYTE = MIBIBYTE * 1024
TEBIBYTE = GIBIBYTE * 1024
SIZE_UNITS = (TEBIBYTE, GIBIBYTE, MIBIBYTE, KIBIBYTE, BYTE)
UNIT_STRINGS = { CHUNK_SIZE = 128 * bytestring.KIBIBYTE
BYTE: 'b',
KIBIBYTE: 'KiB',
MIBIBYTE: 'MiB',
GIBIBYTE: 'GiB',
TEBIBYTE: 'TiB',
}
CHUNK_SIZE = 128 * KIBIBYTE
# Number of bytes to read and write at a time # Number of bytes to read and write at a time
@ -47,84 +35,6 @@ class SourceNotFile(Exception):
class SpinalError(Exception): class SpinalError(Exception):
pass pass
class FilePath:
'''
Class for consolidating lots of `os.path` operations,
and caching `os.stat` results.
'''
def __init__(self, path):
self.path = os.path.abspath(path)
self._stat = None
self._isdir = None
self._isfile = None
self._islink = None
self._size = None
def __hash__(self):
return self.path.__hash__()
def __repr__(self):
return 'FilePath(%s)' % repr(self.path)
@property
def basename(self):
return os.path.basename(self.path)
@property
def isdir(self):
return self.type_getter('_isdir', stat.S_ISDIR)
@property
def isfile(self):
return self.type_getter('_isfile', stat.S_ISREG)
@property
def islink(self):
return self.type_getter('_islink', stat.S_ISLNK)
@property
def size(self):
if self._size is None:
if self.stat is False:
self._size = None
else:
self._size = self.stat.st_size
return self._size
@property
def stat(self):
if self._stat is None:
try:
self._stat = os.stat(self.path)
except FileNotFoundError:
self._stat = False
return self._stat
def type_getter(self, attr, resolution):
'''
Try to return the cached type. Call resolution(self.stat.st_mode) if
we don't have the stat data yet.
'''
value = getattr(self, attr)
if value is None:
if self.stat is False:
return False
else:
value = resolution(self.stat.st_mode)
setattr(self, attr, value)
return value
def bytes_to_unit_string(bytes):
size_unit = 1
for unit in SIZE_UNITS:
if bytes >= unit:
size_unit = unit
break
size_unit_string = UNIT_STRINGS[size_unit]
size_string = '%.3f %s' % ((bytes / size_unit), size_unit_string)
return size_string
def callback_exclusion(name, path_type): def callback_exclusion(name, path_type):
''' '''
Example of an exclusion callback function. Example of an exclusion callback function.
@ -137,7 +47,7 @@ def callback_v1(fpobj, written_bytes, total_bytes):
Prints "filename written/total (percent%)" Prints "filename written/total (percent%)"
''' '''
filename = fpobj.path.encode('ascii', 'replace').decode() filename = fpobj.absolute_path.encode('ascii', 'replace').decode()
if written_bytes >= total_bytes: if written_bytes >= total_bytes:
ends = '\n' ends = '\n'
else: else:
@ -157,32 +67,32 @@ def copy(source, file_args=None, file_kwargs=None, dir_args=None, dir_kwargs=Non
Perform copy_dir or copy_file as appropriate for the source path. Perform copy_dir or copy_file as appropriate for the source path.
''' '''
source = str_to_fp(source) source = str_to_fp(source)
if source.isfile: if source.is_file:
file_args = file_args or tuple() file_args = file_args or tuple()
file_kwargs = file_kwargs or dict() file_kwargs = file_kwargs or dict()
return copy_file(source, *file_args, **file_kwargs) return copy_file(source, *file_args, **file_kwargs)
elif source.isdir: elif source.is_dir:
dir_args = dir_args or tuple() dir_args = dir_args or tuple()
dir_kwargs = dir_kwargs or dict() dir_kwargs = dir_kwargs or dict()
return copy_dir(source, *dir_args, **dir_kwargs) return copy_dir(source, *dir_args, **dir_kwargs)
raise SpinalError('Neither file nor dir: %s' % source) raise SpinalError('Neither file nor dir: %s' % source)
def copy_dir( def copy_dir(
source, source,
destination=None, destination=None,
destination_new_root=None, destination_new_root=None,
bytes_per_second=None, bytes_per_second=None,
callback_directory=None, callback_directory=None,
callback_exclusion=None, callback_exclusion=None,
callback_file=None, callback_file=None,
callback_permission_denied=None, callback_permission_denied=None,
callback_verbose=None, callback_verbose=None,
dry_run=False, dry_run=False,
exclude_directories=None, exclude_directories=None,
exclude_filenames=None, exclude_filenames=None,
files_per_second=None, files_per_second=None,
overwrite_old=True, overwrite_old=True,
precalcsize=False, precalcsize=False,
): ):
''' '''
Copy all of the contents from source to destination, Copy all of the contents from source to destination,
@ -205,7 +115,7 @@ def copy_dir(
bytes_per_second: bytes_per_second:
Restrict file copying to this many bytes per second. Can be an integer Restrict file copying to this many bytes per second. Can be an integer
or an existing Ratelimiter object. or an existing Ratelimiter object.
The provided BYTE, KIBIBYTE, etc constants may help. The BYTE, KIBIBYTE, etc constants from module 'bytestring' may help.
Default = None Default = None
@ -286,8 +196,8 @@ def copy_dir(
m += '`destination_new_root` can be passed.' m += '`destination_new_root` can be passed.'
raise ValueError(m) raise ValueError(m)
source = pathclass.get_path_casing(source)
source = str_to_fp(source) source = str_to_fp(source)
source = get_path_casing(source)
if destination_new_root is not None: if destination_new_root is not None:
destination = new_root(source, destination_new_root) destination = new_root(source, destination_new_root)
@ -299,10 +209,10 @@ def copy_dir(
if is_subfolder(source, destination): if is_subfolder(source, destination):
raise RecursiveDirectory(source, destination) raise RecursiveDirectory(source, destination)
if not source.isdir: if not source.is_dir:
raise SourceNotDirectory(source) raise SourceNotDirectory(source)
if destination.isfile: if destination.is_file:
raise DestinationIsFile(destination) raise DestinationIsFile(destination)
if precalcsize is True: if precalcsize is True:
@ -321,7 +231,7 @@ def copy_dir(
callback_verbose=callback_verbose, callback_verbose=callback_verbose,
exclude_directories=exclude_directories, exclude_directories=exclude_directories,
exclude_filenames=exclude_filenames, exclude_filenames=exclude_filenames,
) )
for (source_abspath) in walker: for (source_abspath) in walker:
# Terminology: # Terminology:
# abspath: C:\folder\subfolder\filename.txt # abspath: C:\folder\subfolder\filename.txt
@ -329,13 +239,16 @@ def copy_dir(
# base_name: filename.txt # base_name: filename.txt
# folder: subfolder # folder: subfolder
destination_abspath = source_abspath.path.replace(source.path, destination.path) destination_abspath = source_abspath.absolute_path.replace(
source.absolute_path,
destination.absolute_path
)
destination_abspath = str_to_fp(destination_abspath) destination_abspath = str_to_fp(destination_abspath)
if destination_abspath.isdir: if destination_abspath.is_dir:
raise DestinationIsDirectory(destination_abspath) raise DestinationIsDirectory(destination_abspath)
destination_location = os.path.split(destination_abspath.path)[0] destination_location = os.path.split(destination_abspath.absolute_path)[0]
os.makedirs(destination_location, exist_ok=True) os.makedirs(destination_location, exist_ok=True)
copied = copy_file( copied = copy_file(
@ -363,15 +276,15 @@ def copy_dir(
return [destination, written_bytes] return [destination, written_bytes]
def copy_file( def copy_file(
source, source,
destination=None, destination=None,
destination_new_root=None, destination_new_root=None,
bytes_per_second=None, bytes_per_second=None,
callback=None, callback=None,
callback_verbose=None, callback_verbose=None,
dry_run=False, dry_run=False,
overwrite_old=True, overwrite_old=True,
callback_permission_denied=None, callback_permission_denied=None,
): ):
''' '''
Copy a file from one place to another. Copy a file from one place to another.
@ -398,7 +311,7 @@ def copy_file(
callback: callback:
If provided, this function will be called after writing If provided, this function will be called after writing
each CHUNK_SIZE bytes to destination with three parameters: each CHUNK_SIZE bytes to destination with three parameters:
the FilePath object being copied, number of bytes written so far, the Path object being copied, number of bytes written so far,
total number of bytes needed. total number of bytes needed.
Default = None Default = None
@ -437,8 +350,8 @@ def copy_file(
m += '`destination_new_root` can be passed' m += '`destination_new_root` can be passed'
raise ValueError(m) raise ValueError(m)
source = pathclass.get_path_casing(source)
source = str_to_fp(source) source = str_to_fp(source)
source = get_path_casing(source)
if destination_new_root is not None: if destination_new_root is not None:
destination = new_root(source, destination_new_root) destination = new_root(source, destination_new_root)
@ -447,16 +360,16 @@ def copy_file(
callback = callback or do_nothing callback = callback or do_nothing
callback_verbose = callback_verbose or do_nothing callback_verbose = callback_verbose or do_nothing
if not source.isfile: if not source.is_file:
raise SourceNotFile(source) raise SourceNotFile(source)
if destination.isdir: if destination.is_dir:
raise DestinationIsDirectory(destination) raise DestinationIsDirectory(destination)
bytes_per_second = limiter_or_none(bytes_per_second) bytes_per_second = limiter_or_none(bytes_per_second)
# Determine overwrite # Determine overwrite
if destination.stat is not False: if destination.exists:
destination_modtime = destination.stat.st_mtime destination_modtime = destination.stat.st_mtime
if overwrite_old is False: if overwrite_old is False:
@ -473,14 +386,14 @@ def copy_file(
return [destination, 0] return [destination, 0]
source_bytes = source.size source_bytes = source.size
destination_location = os.path.split(destination.path)[0] destination_location = os.path.split(destination.absolute_path)[0]
os.makedirs(destination_location, exist_ok=True) os.makedirs(destination_location, exist_ok=True)
written_bytes = 0 written_bytes = 0
try: try:
callback_verbose('Opening handles.') callback_verbose('Opening handles.')
source_file = open(source.path, 'rb') source_file = open(source.absolute_path, 'rb')
destination_file = open(destination.path, 'wb') destination_file = open(destination.absolute_path, 'wb')
except PermissionError as exception: except PermissionError as exception:
if callback_permission_denied is not None: if callback_permission_denied is not None:
callback_permission_denied(source, exception) callback_permission_denied(source, exception)
@ -507,7 +420,7 @@ def copy_file(
source_file.close() source_file.close()
destination_file.close() destination_file.close()
callback_verbose('Copying metadata') callback_verbose('Copying metadata')
shutil.copystat(source.path, destination.path) shutil.copystat(source.absolute_path, destination.absolute_path)
return [destination, written_bytes] return [destination, written_bytes]
def do_nothing(*args): def do_nothing(*args):
@ -516,49 +429,6 @@ def do_nothing(*args):
''' '''
return return
def get_path_casing(path):
'''
Take what is perhaps incorrectly cased input and get the path's actual
casing according to the filesystem.
Thank you:
Ethan Furman http://stackoverflow.com/a/7133137/5430534
xvorsx http://stackoverflow.com/a/14742779/5430534
'''
p = str_to_fp(path)
path = p.path
(drive, subpath) = os.path.splitdrive(path)
subpath = subpath.lstrip(os.sep)
def patternize(piece):
'''
Create a pattern like "[u]ser" from "user", forcing glob to look up the
correct path name, and guaranteeing that the only result will be the correct path.
Special cases are:
!, because in glob syntax, [!x] tells glob to look for paths that don't contain
"x". [!] is invalid syntax, so we pick the first non-! character to put
in the brackets.
[, because this starts a capture group
'''
piece = glob.escape(piece)
for character in piece:
if character not in '![]':
replacement = '[%s]' % character
#print(piece, character, replacement)
piece = piece.replace(character, replacement, 1)
break
return piece
pattern = [patternize(piece) for piece in subpath.split(os.sep)]
pattern = os.sep.join(pattern)
pattern = drive.upper() + os.sep + pattern
#print(pattern)
try:
return str_to_fp(glob.glob(pattern)[0])
except IndexError:
return p
def get_dir_size(path): def get_dir_size(path):
''' '''
Calculate the total number of bytes across all files in this directory Calculate the total number of bytes across all files in this directory
@ -566,7 +436,7 @@ def get_dir_size(path):
''' '''
path = str_to_fp(path) path = str_to_fp(path)
if not path.isdir: if not path.is_dir:
raise SourceNotDirectory(path) raise SourceNotDirectory(path)
total_bytes = 0 total_bytes = 0
@ -579,8 +449,8 @@ def is_subfolder(parent, child):
''' '''
Determine whether parent contains child. Determine whether parent contains child.
''' '''
parent = normalize(str_to_fp(parent).path) + os.sep parent = normalize(str_to_fp(parent).absolute_path) + os.sep
child = normalize(str_to_fp(child).path) + os.sep child = normalize(str_to_fp(child).absolute_path) + os.sep
return child.startswith(parent) return child.startswith(parent)
def is_xor(*args): def is_xor(*args):
@ -607,8 +477,8 @@ def new_root(filepath, root):
I use this so that my G: drive can have backups from my C: and D: drives I use this so that my G: drive can have backups from my C: and D: drives
while preserving directory structure in G:\\D and G:\\C. while preserving directory structure in G:\\D and G:\\C.
''' '''
filepath = str_to_fp(filepath).path filepath = str_to_fp(filepath).absolute_path
root = str_to_fp(root).path root = str_to_fp(root).absolute_path
filepath = filepath.replace(':', os.sep) filepath = filepath.replace(':', os.sep)
filepath = os.path.normpath(filepath) filepath = os.path.normpath(filepath)
filepath = os.path.join(root, filepath) filepath = os.path.join(root, filepath)
@ -622,21 +492,21 @@ def normalize(text):
def str_to_fp(path): def str_to_fp(path):
''' '''
If `path` is a string, create a FilePath object, otherwise just return it. If `path` is a string, create a Path object, otherwise just return it.
''' '''
if isinstance(path, str): if isinstance(path, str):
path = FilePath(path) path = pathclass.Path(path)
return path return path
def walk_generator( def walk_generator(
path='.', path='.',
callback_exclusion=None, callback_exclusion=None,
callback_verbose=None, callback_verbose=None,
exclude_directories=None, exclude_directories=None,
exclude_filenames=None, exclude_filenames=None,
): ):
''' '''
Yield FilePath objects from the file tree similar to os.walk. Yield Path objects from the file tree similar to os.walk.
callback_exclusion: callback_exclusion:
This function will be called when a file or directory is excluded with This function will be called when a file or directory is excluded with
@ -677,7 +547,7 @@ def walk_generator(
exclude_filenames = {normalize(f) for f in exclude_filenames} exclude_filenames = {normalize(f) for f in exclude_filenames}
exclude_directories = {normalize(f) for f in exclude_directories} exclude_directories = {normalize(f) for f in exclude_directories}
path = str_to_fp(path).path path = str_to_fp(path).absolute_path
if normalize(path) in exclude_directories: if normalize(path) in exclude_directories:
callback_exclusion(path, 'directory') callback_exclusion(path, 'directory')