Add dynamic chunk sizing to keep consistent progress bar pace.

master
voussoir 2021-05-18 17:48:35 -07:00
parent e7798574b3
commit 375b00bfd5
No known key found for this signature in database
GPG Key ID: 5F7554F8C26DACCB
1 changed files with 65 additions and 3 deletions

View File

@ -1,8 +1,13 @@
'''
This module provides functions related to walking the filesystem and
copying files and folders.
'''
import hashlib import hashlib
import logging import logging
import os import os
import shutil import shutil
import sys import sys
import time
from voussoirkit import bytestring from voussoirkit import bytestring
from voussoirkit import dotdict from voussoirkit import dotdict
@ -20,6 +25,10 @@ BAIL = sentinel.Sentinel('BAIL')
# Number of bytes to read and write at a time # Number of bytes to read and write at a time
CHUNK_SIZE = 2 * bytestring.MIBIBYTE CHUNK_SIZE = 2 * bytestring.MIBIBYTE
# When using dynamic chunk sizing, this is the ideal time to process a
# single chunk, in seconds.
IDEAL_CHUNK_TIME = 0.2
HASH_CLASS = hashlib.md5 HASH_CLASS = hashlib.md5
class SpinalException(Exception): class SpinalException(Exception):
@ -91,7 +100,7 @@ def copy_dir(
callback_pre_directory=None, callback_pre_directory=None,
callback_pre_file=None, callback_pre_file=None,
callback_post_file=None, callback_post_file=None,
chunk_size=CHUNK_SIZE, chunk_size='dynamic',
destination_new_root=None, destination_new_root=None,
dry_run=False, dry_run=False,
exclude_directories=None, exclude_directories=None,
@ -150,6 +159,9 @@ def copy_dir(
If you think copy_dir should be rewritten as a generator instead, If you think copy_dir should be rewritten as a generator instead,
I agree! I agree!
chunk_size:
Passed into each `copy_file` as `chunk_size`.
destination_new_root: destination_new_root:
Determine the destination path by calling Determine the destination path by calling
`new_root(source, destination_new_root)`. `new_root(source, destination_new_root)`.
@ -334,7 +346,7 @@ def copy_file(
callback_permission_denied=None, callback_permission_denied=None,
callback_pre_copy=None, callback_pre_copy=None,
callback_validate_hash=None, callback_validate_hash=None,
chunk_size=CHUNK_SIZE, chunk_size='dynamic',
dry_run=False, dry_run=False,
hash_class=None, hash_class=None,
overwrite_old=True, overwrite_old=True,
@ -383,6 +395,11 @@ def copy_file(
callback_hash_progress: callback_hash_progress:
Passed into `hash_file` as callback_progress when validating the hash. Passed into `hash_file` as callback_progress when validating the hash.
chunk_size:
An integer number of bytes to read and write at a time.
Or, the string 'dynamic' to enable dynamic chunk sizing that aims to
keep a consistent pace of progress bar updates.
dry_run: dry_run:
Do everything except the actual file copying. Do everything except the actual file copying.
@ -492,7 +509,13 @@ def copy_file(
hash_class = HASH_CLASS hash_class = HASH_CLASS
results.hash = HASH_CLASS() results.hash = HASH_CLASS()
dynamic_chunk_size = chunk_size == 'dynamic'
if dynamic_chunk_size:
chunk_size = bytestring.MIBIBYTE
while True: while True:
chunk_start = time.perf_counter()
try: try:
data_chunk = source_handle.read(chunk_size) data_chunk = source_handle.read(chunk_size)
except PermissionError as exception: except PermissionError as exception:
@ -517,6 +540,10 @@ def copy_file(
if bytes_per_second is not None: if bytes_per_second is not None:
bytes_per_second.limit(data_bytes) bytes_per_second.limit(data_bytes)
if dynamic_chunk_size:
chunk_time = time.perf_counter() - chunk_start
chunk_size = dynamic_chunk_sizer(chunk_size, chunk_time, IDEAL_CHUNK_TIME)
if results.written_bytes == 0: if results.written_bytes == 0:
# For zero-length files, we want to get at least one call in there. # For zero-length files, we want to get at least one call in there.
callback_progress(destination, results.written_bytes, source_bytes) callback_progress(destination, results.written_bytes, source_bytes)
@ -547,6 +574,25 @@ def do_nothing(*args, **kwargs):
''' '''
return return
def dynamic_chunk_sizer(chunk_size, chunk_time, ideal_chunk_time):
'''
Calculates a new chunk size based on the time it took to do the previous
chunk versus the ideal chunk time.
'''
# If chunk_time = scale * ideal_chunk_time,
# Then ideal_chunk_size = chunk_size / scale
scale = chunk_time / ideal_chunk_time
scale = min(scale, 2)
scale = max(scale, 0.5)
suggestion = chunk_size / scale
# Give the current size double weight so small fluctuations don't send
# the needle bouncing all over.
new_size = int((chunk_size + chunk_size + suggestion) / 3)
# I doubt any real-world scenario will dynamically suggest a chunk_size of
# zero, but let's enforce a one-byte minimum anyway.
new_size = max(new_size, 1)
return new_size
def get_dir_size(path): def get_dir_size(path):
''' '''
Calculate the total number of bytes across all files in this directory Calculate the total number of bytes across all files in this directory
@ -569,7 +615,7 @@ def hash_file(
*, *,
bytes_per_second=None, bytes_per_second=None,
callback_progress=None, callback_progress=None,
chunk_size=CHUNK_SIZE, chunk_size='dynamic',
): ):
''' '''
hash_class: hash_class:
@ -578,6 +624,11 @@ def hash_file(
callback_progress: callback_progress:
A function that takes three parameters: A function that takes three parameters:
path object, bytes ingested so far, bytes total path object, bytes ingested so far, bytes total
chunk_size:
An integer number of bytes to read at a time.
Or, the string 'dynamic' to enable dynamic chunk sizing that aims to
keep a consistent pace of progress bar updates.
''' '''
path = pathclass.Path(path) path = pathclass.Path(path)
path.assert_is_file() path.assert_is_file()
@ -590,8 +641,15 @@ def hash_file(
file_size = path.size file_size = path.size
handle = path.open('rb') handle = path.open('rb')
dynamic_chunk_size = chunk_size == 'dynamic'
if dynamic_chunk_size:
chunk_size = bytestring.MIBIBYTE
with handle: with handle:
while True: while True:
chunk_start = time.perf_counter()
chunk = handle.read(chunk_size) chunk = handle.read(chunk_size)
if not chunk: if not chunk:
break break
@ -605,6 +663,10 @@ def hash_file(
if bytes_per_second is not None: if bytes_per_second is not None:
bytes_per_second.limit(this_size) bytes_per_second.limit(this_size)
if dynamic_chunk_size:
chunk_time = time.perf_counter() - chunk_start
chunk_size = dynamic_chunk_sizer(chunk_size, chunk_time, IDEAL_CHUNK_TIME)
return hasher return hasher
def is_xor(*args): def is_xor(*args):