Use threadpool in threaded_dl with separate ui thread.
The downside is that the result_generator is difficult to ctrlc on windows due to signals not occuring during the Queue.get. Pressing ctrlc will kill the program after one file finishes downloading, otherwise you have to press ctrlbreak. The end result is that the program actually feels a little worse to use than before, but it's more, like, pure. Don't know. Might revert or change again.
This commit is contained in:
parent
9cec1d8e2f
commit
a35200d594
1 changed files with 132 additions and 37 deletions
173
threaded_dl.py
173
threaded_dl.py
|
@ -1,18 +1,64 @@
|
||||||
|
'''
|
||||||
|
threaded_dl
|
||||||
|
===========
|
||||||
|
|
||||||
|
> threaded_dl links thread_count filename_format <flags>
|
||||||
|
|
||||||
|
links:
|
||||||
|
The name of a file containing links to download, one per line.
|
||||||
|
Uses pipeable to support !c clipboard, !i stdin lines of urls.
|
||||||
|
|
||||||
|
thread_count:
|
||||||
|
Integer number of threads to use for downloading.
|
||||||
|
|
||||||
|
filename_format:
|
||||||
|
A string that controls the names of the downloaded files. Uses Python's
|
||||||
|
brace-style formatting. Available formatters are:
|
||||||
|
- {basename}: The name of the file as indicated by the URL.
|
||||||
|
E.g. example.com/image.jpg -> image.jpg
|
||||||
|
- {extension}: The extension of the file as indicated by the URL, including
|
||||||
|
the dot. E.g. example.com/image.jpg -> .jpg
|
||||||
|
- {index}: The index of this URL within the sequence of all downloaded URLs.
|
||||||
|
Starts from 0.
|
||||||
|
- {now}: The unix timestamp at which this download job was started. It might
|
||||||
|
be ugly but at least it's unambiguous when doing multiple download batches
|
||||||
|
with similar filenames.
|
||||||
|
|
||||||
|
flags:
|
||||||
|
--bytespersecond X:
|
||||||
|
Limit the overall download speed to X bytes per second. Uses
|
||||||
|
bytestring.parsebytes to support strings like "1m", "500k", "2 mb", etc.
|
||||||
|
|
||||||
|
--headers X:
|
||||||
|
;
|
||||||
|
|
||||||
|
--timeout X:
|
||||||
|
Integer number of seconds to use as HTTP request timeout for each download.
|
||||||
|
'''
|
||||||
import argparse
|
import argparse
|
||||||
import ast
|
import ast
|
||||||
import os
|
import os
|
||||||
|
import queue
|
||||||
|
import shutil
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
from voussoirkit import betterhelp
|
||||||
from voussoirkit import bytestring
|
from voussoirkit import bytestring
|
||||||
from voussoirkit import downloady
|
from voussoirkit import downloady
|
||||||
from voussoirkit import pipeable
|
from voussoirkit import pipeable
|
||||||
|
from voussoirkit import ratelimiter
|
||||||
|
from voussoirkit import ratemeter
|
||||||
|
from voussoirkit import sentinel
|
||||||
|
from voussoirkit import threadpool
|
||||||
from voussoirkit import vlogging
|
from voussoirkit import vlogging
|
||||||
|
|
||||||
log = vlogging.getLogger(__name__, 'threaded_dl')
|
log = vlogging.getLogger(__name__, 'threaded_dl')
|
||||||
downloady.log.setLevel(vlogging.WARNING)
|
downloady.log.setLevel(vlogging.WARNING)
|
||||||
|
|
||||||
|
THREAD_FINISHED = sentinel.Sentinel('thread finished')
|
||||||
|
|
||||||
def clean_url_list(urls):
|
def clean_url_list(urls):
|
||||||
for url in urls:
|
for url in urls:
|
||||||
if isinstance(url, (tuple, list)):
|
if isinstance(url, (tuple, list)):
|
||||||
|
@ -32,36 +78,30 @@ def clean_url_list(urls):
|
||||||
else:
|
else:
|
||||||
yield url
|
yield url
|
||||||
|
|
||||||
def download_thread(url, filename, *, bytespersecond=None, headers=None, timeout=None):
|
def download_job(
|
||||||
|
url,
|
||||||
|
filename,
|
||||||
|
*,
|
||||||
|
bytespersecond=None,
|
||||||
|
headers=None,
|
||||||
|
meter=None,
|
||||||
|
timeout=None,
|
||||||
|
):
|
||||||
log.info(f'Starting "{filename}"')
|
log.info(f'Starting "{filename}"')
|
||||||
downloady.download_file(
|
downloady.download_file(
|
||||||
url,
|
url,
|
||||||
filename,
|
filename,
|
||||||
bytespersecond=bytespersecond,
|
bytespersecond=bytespersecond,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
|
ratemeter=meter,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
)
|
)
|
||||||
log.info(f'Finished "{filename}"')
|
log.info(f'Finished "{filename}"')
|
||||||
|
|
||||||
def remove_finished(threads):
|
def prepare_urls_filenames(urls, filename_format):
|
||||||
return [t for t in threads if t.is_alive()]
|
|
||||||
|
|
||||||
def threaded_dl(
|
|
||||||
urls,
|
|
||||||
thread_count,
|
|
||||||
filename_format,
|
|
||||||
bytespersecond=None,
|
|
||||||
headers=None,
|
|
||||||
timeout=None,
|
|
||||||
):
|
|
||||||
now = int(time.time())
|
now = int(time.time())
|
||||||
threads = []
|
|
||||||
|
|
||||||
bytespersecond_thread = bytespersecond
|
if os.path.normcase(filename_format) != os.devnull:
|
||||||
if bytespersecond_thread is not None:
|
|
||||||
bytespersecond_thread = int(bytespersecond_thread / thread_count)
|
|
||||||
|
|
||||||
if filename_format != os.devnull:
|
|
||||||
index_digits = len(str(len(urls)))
|
index_digits = len(str(len(urls)))
|
||||||
filename_format = filename_format.replace('{index}', '{index:0%0dd}' % index_digits)
|
filename_format = filename_format.replace('{index}', '{index:0%0dd}' % index_digits)
|
||||||
|
|
||||||
|
@ -71,12 +111,9 @@ def threaded_dl(
|
||||||
if '{extension}' not in filename_format and '{basename}' not in filename_format:
|
if '{extension}' not in filename_format and '{basename}' not in filename_format:
|
||||||
filename_format += '{extension}'
|
filename_format += '{extension}'
|
||||||
|
|
||||||
|
urls_filenames = []
|
||||||
|
|
||||||
for (index, url) in enumerate(clean_url_list(urls)):
|
for (index, url) in enumerate(clean_url_list(urls)):
|
||||||
|
|
||||||
while len(threads) >= thread_count:
|
|
||||||
threads = remove_finished(threads)
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
if isinstance(url, (tuple, list)):
|
if isinstance(url, (tuple, list)):
|
||||||
(url, filename) = url
|
(url, filename) = url
|
||||||
else:
|
else:
|
||||||
|
@ -92,23 +129,82 @@ def threaded_dl(
|
||||||
|
|
||||||
if os.path.exists(filename):
|
if os.path.exists(filename):
|
||||||
log.info(f'Skipping existing file "{filename}"')
|
log.info(f'Skipping existing file "{filename}"')
|
||||||
|
continue
|
||||||
|
|
||||||
else:
|
urls_filenames.append((url, filename))
|
||||||
kwargs = {
|
|
||||||
'url': url,
|
return urls_filenames
|
||||||
'bytespersecond': bytespersecond_thread,
|
|
||||||
'filename': filename,
|
def threaded_dl(
|
||||||
'timeout': timeout,
|
urls,
|
||||||
'headers': headers,
|
thread_count,
|
||||||
|
filename_format,
|
||||||
|
bytespersecond=None,
|
||||||
|
headers=None,
|
||||||
|
timeout=None,
|
||||||
|
):
|
||||||
|
urls_filenames = prepare_urls_filenames(urls, filename_format)
|
||||||
|
|
||||||
|
if not urls_filenames:
|
||||||
|
return
|
||||||
|
|
||||||
|
if bytespersecond is not None:
|
||||||
|
# It is important that we convert this to a Ratelimter now instead of
|
||||||
|
# passing the user's integer to downloady, because we want all threads
|
||||||
|
# to share a single limiter instance instead of each creating their
|
||||||
|
# own by the integer.
|
||||||
|
bytespersecond = ratelimiter.Ratelimiter(bytespersecond)
|
||||||
|
|
||||||
|
meter = ratemeter.RateMeter(span=5)
|
||||||
|
|
||||||
|
pool = threadpool.ThreadPool(thread_count, paused=True)
|
||||||
|
|
||||||
|
ui_stop_event = threading.Event()
|
||||||
|
ui_kwargs = {
|
||||||
|
'meter': meter,
|
||||||
|
'stop_event': ui_stop_event,
|
||||||
|
'pool': pool,
|
||||||
}
|
}
|
||||||
t = threading.Thread(target=download_thread, kwargs=kwargs, daemon=True)
|
ui_thread = threading.Thread(target=ui_thread_func, kwargs=ui_kwargs, daemon=True)
|
||||||
threads.append(t)
|
ui_thread.start()
|
||||||
t.start()
|
|
||||||
|
|
||||||
while len(threads) > 0:
|
kwargss = []
|
||||||
threads = remove_finished(threads)
|
for (url, filename) in urls_filenames:
|
||||||
pipeable.stderr(f'{len(threads)} threads remaining\r', end='')
|
kwargs = {
|
||||||
time.sleep(0.1)
|
'function': download_job,
|
||||||
|
'kwargs': {
|
||||||
|
'bytespersecond': bytespersecond,
|
||||||
|
'filename': filename,
|
||||||
|
'headers': headers,
|
||||||
|
'meter': meter,
|
||||||
|
'timeout': timeout,
|
||||||
|
'url': url,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
kwargss.append(kwargs)
|
||||||
|
pool.add_many(kwargss)
|
||||||
|
|
||||||
|
for job in pool.result_generator():
|
||||||
|
if job.exception:
|
||||||
|
ui_stop_event.set()
|
||||||
|
ui_thread.join()
|
||||||
|
raise job.exception
|
||||||
|
|
||||||
|
ui_stop_event.set()
|
||||||
|
ui_thread.join()
|
||||||
|
|
||||||
|
def ui_thread_func(meter, pool, stop_event):
|
||||||
|
if pipeable.OUT_PIPE:
|
||||||
|
return
|
||||||
|
|
||||||
|
while not stop_event.is_set():
|
||||||
|
width = shutil.get_terminal_size().columns
|
||||||
|
speed = meter.report()[2]
|
||||||
|
message = f'{bytestring.bytestring(speed)}/s | {pool.running_count} threads'
|
||||||
|
spaces = ' ' * (width - len(message) - 1)
|
||||||
|
pipeable.stderr(message + spaces, end='\r')
|
||||||
|
|
||||||
|
stop_event.wait(timeout=0.5)
|
||||||
|
|
||||||
def threaded_dl_argparse(args):
|
def threaded_dl_argparse(args):
|
||||||
urls = pipeable.input(args.url_file, read_files=True, skip_blank=True, strip=True)
|
urls = pipeable.input(args.url_file, read_files=True, skip_blank=True, strip=True)
|
||||||
|
@ -150,8 +246,7 @@ def main(argv):
|
||||||
parser.add_argument('--headers', nargs='+', default=None)
|
parser.add_argument('--headers', nargs='+', default=None)
|
||||||
parser.set_defaults(func=threaded_dl_argparse)
|
parser.set_defaults(func=threaded_dl_argparse)
|
||||||
|
|
||||||
args = parser.parse_args(argv)
|
return betterhelp.single_main(argv, parser, __doc__)
|
||||||
return args.func(args)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
raise SystemExit(main(sys.argv[1:]))
|
raise SystemExit(main(sys.argv[1:]))
|
||||||
|
|
Loading…
Reference in a new issue