From 8584dc5e7f4690f2fe8447761eb8fbf8850d3dee Mon Sep 17 00:00:00 2001 From: Ethan Dalool Date: Sun, 31 Mar 2024 19:03:37 -0700 Subject: [PATCH] Improve handling of compressed streams. --- voussoirkit/downloady.py | 68 ++++++++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 17 deletions(-) diff --git a/voussoirkit/downloady.py b/voussoirkit/downloady.py index 813e340..a4dcef1 100644 --- a/voussoirkit/downloady.py +++ b/voussoirkit/downloady.py @@ -5,6 +5,7 @@ import requests import sys import time import urllib +import zlib from voussoirkit import bytestring from voussoirkit import dotdict @@ -18,10 +19,7 @@ from voussoirkit import vlogging log = vlogging.getLogger(__name__, 'downloady') -USERAGENT = ''' -'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) -Chrome/42.0.2311.152 Safari/537.36' -'''.strip().replace('\n', ' ') +USERAGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36' HEADERS = { 'User-Agent': USERAGENT, @@ -71,6 +69,7 @@ def download_file( localname=None, auth=None, bytespersecond=None, + custom_validator=None, progressbar=None, do_head=True, headers=None, @@ -86,6 +85,7 @@ def download_file( localname, auth=auth, bytespersecond=bytespersecond, + custom_validator=custom_validator, progressbar=progressbar, do_head=do_head, headers=headers, @@ -153,28 +153,25 @@ def download_plan(plan): else: chunk_size = 128 * bytestring.KIBIBYTE - while True: - chunk_start = time.perf_counter() - chunk = download_stream.raw.read(chunk_size) - chunk_bytes = len(chunk) - if chunk_bytes == 0: - break + chunks = stream_to_chunks(download_stream, chunk_size, limiter=plan.limiter) + if download_stream.headers.get('Content-Encoding') == 'gzip': + log.debug('Content is gzipped. Decompressor active.') + chunks = decompress_gzip(chunks) + for chunk in chunks: + httperrors.raise_for_status(download_stream) + chunk_bytes = len(chunk) + + # log.loud(f'Writing {chunk_bytes} to disk.') file_handle.write(chunk) bytes_downloaded += chunk_bytes if progressbar is not None: progressbar.step(bytes_downloaded) - if plan.limiter is not None: - plan.limiter.limit(chunk_bytes) - if plan.ratemeter is not None: plan.ratemeter.digest(chunk_bytes) - chunk_time = time.perf_counter() - chunk_start - chunk_size = dynamic_chunk_sizer(chunk_size, chunk_time, IDEAL_CHUNK_TIME) - if progressbar is not None: progressbar.done() @@ -195,15 +192,26 @@ def download_plan(plan): temp_localsize < plan.remote_total_bytes ) if undersized and plan.raise_for_undersized: - message = 'File does not contain expected number of bytes. Received {size} / {total}' + message = 'File does not contain expected number of bytes. Received {size} / {total}.' message = message.format(size=temp_localsize, total=plan.remote_total_bytes) raise NotEnoughBytes(message) + if plan.custom_validator: + plan.custom_validator(plan.download_into) + if plan.download_into != plan.real_localname: os.rename(plan.download_into, plan.real_localname) return plan.real_localname +def decompress_gzip(stream): + decompressor = zlib.decompressobj(16 | zlib.MAX_WBITS) + for chunk in stream: + chunk = decompressor.decompress(chunk) + yield chunk + + yield decompressor.flush() + def dynamic_chunk_sizer(chunk_size, chunk_time, ideal_chunk_time): ''' Calculates a new chunk size based on the time it took to do the previous @@ -228,6 +236,7 @@ def prepare_plan( localname, auth=None, bytespersecond=None, + custom_validator=None, progressbar=None, do_head=True, headers=None, @@ -313,7 +322,9 @@ def prepare_plan( remote_total_bytes = head.headers.get('content-length', None) remote_total_bytes = None if remote_total_bytes is None else int(remote_total_bytes) server_respects_range = (head.status_code == 206 and 'content-range' in head.headers) + log.debug(f'Server respects range: {server_respects_range}') head.connection.close() + del head else: remote_total_bytes = None server_respects_range = False @@ -325,6 +336,7 @@ def prepare_plan( plan_base = { 'url': url, 'auth': auth, + 'custom_validator': custom_validator, 'progressbar': progressbar, 'limiter': limiter, 'headers': headers, @@ -381,6 +393,8 @@ def prepare_plan( log.info('Resume from byte %d' % plan_resume.seek_to) return plan_resume + return plan_fulldownload + else: if user_provided_range: return plan_partial @@ -429,7 +443,9 @@ def request(method, url, headers=None, timeout=TIMEOUT, verify_ssl=True, **kwarg 'post': session.post, }[method] + log.debug(f'Request headers: {headers}') response = method(url, headers=headers, timeout=timeout, verify=verify_ssl, **kwargs) + log.debug(f'Response headers: {response.status_code} {response.headers}') httperrors.raise_for_status(response) return response @@ -451,6 +467,24 @@ def sanitize_url(url): url = url.replace('%3A//', '://') return url +def stream_to_chunks(download_stream, chunk_size, limiter): + while True: + chunk_start = time.perf_counter() + # log.loud(f'Reading {chunk_size} from stream.') + chunk = download_stream.raw.read(chunk_size) + chunk_bytes = len(chunk) + # log.loud(f'Got {chunk_bytes} from stream.') + if chunk_bytes == 0: + break + + if limiter is not None: + limiter.limit(chunk_bytes) + + chunk_time = time.perf_counter() - chunk_start + chunk_size = dynamic_chunk_sizer(chunk_size, chunk_time, IDEAL_CHUNK_TIME) + + yield chunk + def download_argparse(args): url = pipeable.input(args.url, split_lines=False)