cmd/threaded_dl.py

300 lines
8.8 KiB
Python
Raw Normal View History

2020-09-29 21:27:48 +00:00
import argparse
2020-09-30 20:20:07 +00:00
import ast
2020-09-29 21:27:48 +00:00
import os
2022-11-03 01:42:14 +00:00
import random
import shutil
2020-09-29 21:27:48 +00:00
import sys
import threading
import time
import traceback
2020-09-29 21:27:48 +00:00
from voussoirkit import betterhelp
from voussoirkit import bytestring
2020-09-29 21:27:48 +00:00
from voussoirkit import downloady
2022-08-14 17:42:34 +00:00
from voussoirkit import pathclass
from voussoirkit import pipeable
from voussoirkit import ratelimiter
from voussoirkit import ratemeter
from voussoirkit import threadpool
from voussoirkit import vlogging
log = vlogging.getLogger(__name__, 'threaded_dl')
downloady.log.setLevel(vlogging.WARNING)
2020-09-29 21:27:48 +00:00
def clean_url_list(urls):
for url in urls:
if isinstance(url, (tuple, list)):
(url, filename) = url
else:
filename = None
2020-09-29 21:27:48 +00:00
url = url.strip()
if not url:
continue
if url.startswith('#'):
continue
if filename:
yield (url, filename)
else:
yield url
2020-09-29 21:27:48 +00:00
def download_job(
url,
filename,
*,
bytespersecond=None,
headers=None,
meter=None,
timeout=None,
):
log.info(f'Starting "{filename}"')
2021-08-10 00:37:19 +00:00
downloady.download_file(
url,
filename,
bytespersecond=bytespersecond,
headers=headers,
ratemeter=meter,
2021-08-10 00:37:19 +00:00
timeout=timeout,
)
log.info(f'Finished "{filename}"')
2020-09-29 21:27:48 +00:00
def normalize_headers(headers):
if headers is None:
return {}
if not headers:
return {}
if isinstance(headers, dict):
return headers
if isinstance(headers, list) and len(headers) == 1:
headers = headers[0]
if isinstance(headers, (list, tuple)):
keys = headers[::2]
vals = headers[1::2]
return {key: val for (key, val) in zip(keys, vals)}
if isinstance(headers, str) and os.path.isfile(headers):
2022-11-03 01:42:14 +00:00
headers = pathclass.Path(headers).read('r', encoding='utf-8')
if isinstance(headers, str):
if headers.startswith('{'):
return ast.literal_eval(headers)
else:
lines = [line for line in headers.splitlines() if line.strip()]
2022-11-03 01:42:14 +00:00
lines = [line for line in lines if not line.startswith('#')]
pairs = [line.strip().split(':', 1) for line in lines]
return {key.strip(): value.strip() for (key, value) in pairs}
return headers
def prepare_urls_filenames(urls, filename_format):
2020-09-29 21:27:48 +00:00
now = int(time.time())
if os.path.normcase(filename_format) != os.devnull:
2020-09-29 21:27:48 +00:00
index_digits = len(str(len(urls)))
filename_format = filename_format.replace('{index}', '{index:0%0dd}' % index_digits)
if '{' not in filename_format and len(urls) > 1:
filename_format += '_{index}'
if '{extension}' not in filename_format and '{basename}' not in filename_format:
filename_format += '{extension}'
urls_filenames = []
2020-09-29 21:27:48 +00:00
for (index, url) in enumerate(clean_url_list(urls)):
if isinstance(url, (tuple, list)):
(url, filename) = url
else:
2022-11-03 01:42:14 +00:00
index1 = index + 1
basename = downloady.basename_from_url(url)
extension = os.path.splitext(basename)[1]
filename = filename_format.format(
basename=basename,
ext=extension,
extension=extension,
index=index,
2022-11-03 01:42:14 +00:00
index1=index1,
now=now,
)
2020-09-29 21:27:48 +00:00
if os.path.exists(filename):
log.info(f'Skipping existing file "{filename}"')
continue
2020-09-29 21:27:48 +00:00
urls_filenames.append((url, filename))
return urls_filenames
def threaded_dl(
urls,
thread_count,
filename_format,
bytespersecond=None,
headers=None,
timeout=None,
):
urls_filenames = prepare_urls_filenames(urls, filename_format)
if not urls_filenames:
return
if bytespersecond is not None:
# It is important that we convert this to a Ratelimter now instead of
# passing the user's integer to downloady, because we want all threads
# to share a single limiter instance instead of each creating their
# own by the integer.
bytespersecond = ratelimiter.Ratelimiter(bytespersecond)
meter = ratemeter.RateMeter(span=5)
pool = threadpool.ThreadPool(thread_count, paused=True)
ui_stop_event = threading.Event()
ui_kwargs = {
'meter': meter,
'stop_event': ui_stop_event,
'pool': pool,
}
ui_thread = threading.Thread(target=ui_thread_func, kwargs=ui_kwargs, daemon=True)
ui_thread.start()
kwargss = []
for (url, filename) in urls_filenames:
kwargs = {
'function': download_job,
'kwargs': {
'bytespersecond': bytespersecond,
2020-09-30 20:20:07 +00:00
'filename': filename,
'headers': headers,
'meter': meter,
'timeout': timeout,
'url': url,
2020-09-30 20:20:07 +00:00
}
}
kwargss.append(kwargs)
pool.add_many(kwargss)
status = 0
for job in pool.result_generator():
if job.exception:
2022-11-03 01:42:14 +00:00
log.error(''.join(traceback.format_exception(None, job.exception, job.exception.__traceback__)))
status = 1
ui_stop_event.set()
ui_thread.join()
return status
def ui_thread_func(meter, pool, stop_event):
2022-02-13 03:50:00 +00:00
if pipeable.stdout_pipe():
return
while not stop_event.is_set():
width = shutil.get_terminal_size().columns
speed = meter.report()[2]
message = f'{bytestring.bytestring(speed)}/s | {pool.running_count} threads'
spaces = ' ' * (width - len(message) - 1)
pipeable.stderr(message + spaces, end='\r')
2020-09-29 21:27:48 +00:00
stop_event.wait(timeout=0.5)
2020-09-29 21:27:48 +00:00
def threaded_dl_argparse(args):
urls = pipeable.input(args.url_file, read_files=True, skip_blank=True, strip=True)
urls = [u.split(' ', 1) if ' ' in u else u for u in urls]
headers = normalize_headers(args.headers)
2022-11-03 01:42:14 +00:00
print(headers)
2020-09-30 20:20:07 +00:00
bytespersecond = args.bytespersecond
if bytespersecond is not None:
bytespersecond = bytestring.parsebytes(bytespersecond)
return threaded_dl(
2020-09-29 21:27:48 +00:00
urls,
bytespersecond=bytespersecond,
2020-09-29 21:27:48 +00:00
filename_format=args.filename_format,
2020-09-30 20:20:07 +00:00
headers=headers,
thread_count=args.thread_count,
2020-09-29 21:27:48 +00:00
timeout=args.timeout,
)
@vlogging.main_decorator
2020-09-29 21:27:48 +00:00
def main(argv):
2022-02-13 03:50:00 +00:00
parser = argparse.ArgumentParser()
parser.add_argument(
'url_file',
metavar='links',
help='''
The name of a file containing links to download, one per line.
Uses pipeable to support !c clipboard, !i stdin lines of urls.
''',
)
parser.add_argument(
'thread_count',
type=int,
help='''
Integer number of threads to use for downloading.
''',
)
parser.add_argument(
'filename_format',
nargs='?',
type=str,
default='{now}_{index}_{basename}',
help='''
A string that controls the names of the downloaded files. Uses Python's
brace-style formatting. Available formatters are:
- {basename}: The name of the file as indicated by the URL.
E.g. example.com/image.jpg -> image.jpg
- {extension}: The extension of the file as indicated by the URL, including
the dot. E.g. example.com/image.jpg -> .jpg
- {index}: The index of this URL within the sequence of all downloaded URLs.
Starts from 0.
- {now}: The unix timestamp at which this download job was started. It might
be ugly but at least it's unambiguous when doing multiple download batches
with similar filenames.
''',
)
parser.add_argument(
'--bytespersecond',
default=None,
help='''
Limit the overall download speed to X bytes per second. Uses
bytestring.parsebytes to support strings like "1m", "500k", "2 mb", etc.
''',
)
parser.add_argument(
'--timeout',
default=15,
help='''
Integer number of seconds to use as HTTP request timeout for each download.
''',
)
parser.add_argument(
'--headers', nargs='+', default=None,
2022-11-03 01:42:14 +00:00
help='''
HTTP headers to add to your request. There are many ways to specify headers:
You can provide multiple command line arguments where the first is a key,
the second is its value, the third is another key, the fourth is its value...
You can provide a single command line argument which is a JSON string containing
key:value pairs.
You can provide a single command line argument which is a filename.
The file can be a JSON file, or alternatively the file should have each
key:value on a separate line and a colon should separate each key from its value.
''',
2022-02-13 03:50:00 +00:00
)
2020-09-29 21:27:48 +00:00
parser.set_defaults(func=threaded_dl_argparse)
2022-02-13 03:50:00 +00:00
return betterhelp.go(parser, argv)
2020-09-29 21:27:48 +00:00
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))