284 lines
8 KiB
Python
284 lines
8 KiB
Python
import argparse
|
|
import ast
|
|
import os
|
|
import queue
|
|
import shutil
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
from voussoirkit import betterhelp
|
|
from voussoirkit import bytestring
|
|
from voussoirkit import downloady
|
|
from voussoirkit import pipeable
|
|
from voussoirkit import ratelimiter
|
|
from voussoirkit import ratemeter
|
|
from voussoirkit import sentinel
|
|
from voussoirkit import threadpool
|
|
from voussoirkit import vlogging
|
|
|
|
log = vlogging.getLogger(__name__, 'threaded_dl')
|
|
downloady.log.setLevel(vlogging.WARNING)
|
|
|
|
THREAD_FINISHED = sentinel.Sentinel('thread finished')
|
|
|
|
def clean_url_list(urls):
|
|
for url in urls:
|
|
if isinstance(url, (tuple, list)):
|
|
(url, filename) = url
|
|
else:
|
|
filename = None
|
|
url = url.strip()
|
|
|
|
if not url:
|
|
continue
|
|
|
|
if url.startswith('#'):
|
|
continue
|
|
|
|
if filename:
|
|
yield (url, filename)
|
|
else:
|
|
yield url
|
|
|
|
def download_job(
|
|
url,
|
|
filename,
|
|
*,
|
|
bytespersecond=None,
|
|
headers=None,
|
|
meter=None,
|
|
timeout=None,
|
|
):
|
|
log.info(f'Starting "{filename}"')
|
|
downloady.download_file(
|
|
url,
|
|
filename,
|
|
bytespersecond=bytespersecond,
|
|
headers=headers,
|
|
ratemeter=meter,
|
|
timeout=timeout,
|
|
)
|
|
log.info(f'Finished "{filename}"')
|
|
|
|
def normalize_headers(headers):
|
|
if headers is None:
|
|
return {}
|
|
|
|
if not headers:
|
|
return {}
|
|
|
|
if isinstance(headers, dict):
|
|
return headers
|
|
|
|
if isinstance(headers, list) and len(headers) == 1:
|
|
headers = headers[0]
|
|
|
|
if isinstance(headers, (list, tuple)):
|
|
keys = headers[::2]
|
|
vals = headers[1::2]
|
|
return {key: val for (key, val) in zip(keys, vals)}
|
|
|
|
if isinstance(headers, str) and os.path.isfile(headers):
|
|
headers = pathclass.Path(headers).readlines('r', encoding='utf-8')
|
|
|
|
if isinstance(headers, str):
|
|
if headers.startswith('{'):
|
|
return ast.literal_eval(headers)
|
|
else:
|
|
lines = [line for line in headers.splitlines() if line.strip()]
|
|
pairs = [line.strip().split(': ', 1) for line in lines]
|
|
return {key: value for (key, value) in pairs}
|
|
|
|
return headers
|
|
|
|
def prepare_urls_filenames(urls, filename_format):
|
|
now = int(time.time())
|
|
|
|
if os.path.normcase(filename_format) != os.devnull:
|
|
index_digits = len(str(len(urls)))
|
|
filename_format = filename_format.replace('{index}', '{index:0%0dd}' % index_digits)
|
|
|
|
if '{' not in filename_format and len(urls) > 1:
|
|
filename_format += '_{index}'
|
|
|
|
if '{extension}' not in filename_format and '{basename}' not in filename_format:
|
|
filename_format += '{extension}'
|
|
|
|
urls_filenames = []
|
|
|
|
for (index, url) in enumerate(clean_url_list(urls)):
|
|
if isinstance(url, (tuple, list)):
|
|
(url, filename) = url
|
|
else:
|
|
basename = downloady.basename_from_url(url)
|
|
extension = os.path.splitext(basename)[1]
|
|
filename = filename_format.format(
|
|
basename=basename,
|
|
ext=extension,
|
|
extension=extension,
|
|
index=index,
|
|
now=now,
|
|
)
|
|
|
|
if os.path.exists(filename):
|
|
log.info(f'Skipping existing file "{filename}"')
|
|
continue
|
|
|
|
urls_filenames.append((url, filename))
|
|
|
|
return urls_filenames
|
|
|
|
def threaded_dl(
|
|
urls,
|
|
thread_count,
|
|
filename_format,
|
|
bytespersecond=None,
|
|
headers=None,
|
|
timeout=None,
|
|
):
|
|
urls_filenames = prepare_urls_filenames(urls, filename_format)
|
|
|
|
if not urls_filenames:
|
|
return
|
|
|
|
if bytespersecond is not None:
|
|
# It is important that we convert this to a Ratelimter now instead of
|
|
# passing the user's integer to downloady, because we want all threads
|
|
# to share a single limiter instance instead of each creating their
|
|
# own by the integer.
|
|
bytespersecond = ratelimiter.Ratelimiter(bytespersecond)
|
|
|
|
meter = ratemeter.RateMeter(span=5)
|
|
|
|
pool = threadpool.ThreadPool(thread_count, paused=True)
|
|
|
|
ui_stop_event = threading.Event()
|
|
ui_kwargs = {
|
|
'meter': meter,
|
|
'stop_event': ui_stop_event,
|
|
'pool': pool,
|
|
}
|
|
ui_thread = threading.Thread(target=ui_thread_func, kwargs=ui_kwargs, daemon=True)
|
|
ui_thread.start()
|
|
|
|
kwargss = []
|
|
for (url, filename) in urls_filenames:
|
|
kwargs = {
|
|
'function': download_job,
|
|
'kwargs': {
|
|
'bytespersecond': bytespersecond,
|
|
'filename': filename,
|
|
'headers': headers,
|
|
'meter': meter,
|
|
'timeout': timeout,
|
|
'url': url,
|
|
}
|
|
}
|
|
kwargss.append(kwargs)
|
|
pool.add_many(kwargss)
|
|
|
|
for job in pool.result_generator():
|
|
if job.exception:
|
|
ui_stop_event.set()
|
|
ui_thread.join()
|
|
raise job.exception
|
|
|
|
ui_stop_event.set()
|
|
ui_thread.join()
|
|
|
|
def ui_thread_func(meter, pool, stop_event):
|
|
if pipeable.stdout_pipe():
|
|
return
|
|
|
|
while not stop_event.is_set():
|
|
width = shutil.get_terminal_size().columns
|
|
speed = meter.report()[2]
|
|
message = f'{bytestring.bytestring(speed)}/s | {pool.running_count} threads'
|
|
spaces = ' ' * (width - len(message) - 1)
|
|
pipeable.stderr(message + spaces, end='\r')
|
|
|
|
stop_event.wait(timeout=0.5)
|
|
|
|
def threaded_dl_argparse(args):
|
|
urls = pipeable.input(args.url_file, read_files=True, skip_blank=True, strip=True)
|
|
urls = [u.split(' ', 1) if ' ' in u else u for u in urls]
|
|
|
|
headers = normalize_headers(args.headers)
|
|
|
|
bytespersecond = args.bytespersecond
|
|
if bytespersecond is not None:
|
|
bytespersecond = bytestring.parsebytes(bytespersecond)
|
|
|
|
threaded_dl(
|
|
urls,
|
|
bytespersecond=bytespersecond,
|
|
filename_format=args.filename_format,
|
|
headers=headers,
|
|
thread_count=args.thread_count,
|
|
timeout=args.timeout,
|
|
)
|
|
|
|
return 0
|
|
|
|
@vlogging.main_decorator
|
|
def main(argv):
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
'url_file',
|
|
metavar='links',
|
|
help='''
|
|
The name of a file containing links to download, one per line.
|
|
Uses pipeable to support !c clipboard, !i stdin lines of urls.
|
|
''',
|
|
)
|
|
parser.add_argument(
|
|
'thread_count',
|
|
type=int,
|
|
help='''
|
|
Integer number of threads to use for downloading.
|
|
''',
|
|
)
|
|
parser.add_argument(
|
|
'filename_format',
|
|
nargs='?',
|
|
type=str,
|
|
default='{now}_{index}_{basename}',
|
|
help='''
|
|
A string that controls the names of the downloaded files. Uses Python's
|
|
brace-style formatting. Available formatters are:
|
|
- {basename}: The name of the file as indicated by the URL.
|
|
E.g. example.com/image.jpg -> image.jpg
|
|
- {extension}: The extension of the file as indicated by the URL, including
|
|
the dot. E.g. example.com/image.jpg -> .jpg
|
|
- {index}: The index of this URL within the sequence of all downloaded URLs.
|
|
Starts from 0.
|
|
- {now}: The unix timestamp at which this download job was started. It might
|
|
be ugly but at least it's unambiguous when doing multiple download batches
|
|
with similar filenames.
|
|
''',
|
|
)
|
|
parser.add_argument(
|
|
'--bytespersecond',
|
|
default=None,
|
|
help='''
|
|
Limit the overall download speed to X bytes per second. Uses
|
|
bytestring.parsebytes to support strings like "1m", "500k", "2 mb", etc.
|
|
''',
|
|
)
|
|
parser.add_argument(
|
|
'--timeout',
|
|
default=15,
|
|
help='''
|
|
Integer number of seconds to use as HTTP request timeout for each download.
|
|
''',
|
|
)
|
|
parser.add_argument(
|
|
'--headers', nargs='+', default=None,
|
|
)
|
|
parser.set_defaults(func=threaded_dl_argparse)
|
|
|
|
return betterhelp.go(parser, argv)
|
|
|
|
if __name__ == '__main__':
|
|
raise SystemExit(main(sys.argv[1:]))
|