From acafdac830b529911c0e002b5a5ddbd98a077914 Mon Sep 17 00:00:00 2001 From: Ethan Dalool Date: Tue, 11 Feb 2020 17:24:02 -0800 Subject: [PATCH] Use threadpool to do multithreaded chunk downloads. --- pixelcanvas.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/pixelcanvas.py b/pixelcanvas.py index b95f244..e6e12cc 100644 --- a/pixelcanvas.py +++ b/pixelcanvas.py @@ -8,6 +8,8 @@ import sqlite3 import sys import time +from voussoirkit import threadpool + logging.basicConfig(level=logging.DEBUG) log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) @@ -167,16 +169,29 @@ def download_bigchunk(bigchunk_x, bigchunk_y): chunks = split_bigchunk(bigchunk_x, bigchunk_y, bigchunk_data) return chunks -def download_bigchunk_range(bigchunk_xy1, bigchunk_xy2): +def download_bigchunk_range(bigchunk_xy1, bigchunk_xy2, threads=1): ''' Given (UPPERLEFT_X, UPPERLEFT_Y), (LOWERRIGHT_X, LOWERRIGHT_Y), - download multiple bigchunks, and return the total list of small chunks. + download multiple bigchunks, and yield all of the small chunks. ''' - chunks = [] - for (x, y) in bigchunk_range_iterator(bigchunk_xy1, bigchunk_xy2): - bigchunk = download_bigchunk(x, y) - chunks.extend(bigchunk) - return chunks + bigchunks = bigchunk_range_iterator(bigchunk_xy1, bigchunk_xy2) + if threads < 1: + raise ValueError(threads) + if threads == 1: + for (x, y) in bigchunks: + chunks = download_bigchunk(x, y) + yield from chunks + else: + pool = threadpool.ThreadPool(size=threads) + jobs = [] + for (x, y) in bigchunks: + job = pool.add(download_bigchunk, args=(x, y), name=(x, y)) + jobs.append(job) + for job in jobs: + job.join() + if job.exception: + raise job.exception + yield from job.value # CHUNK FUNCTIONS ################################################################################ @@ -511,7 +526,7 @@ def update_argparse(args): bigchunk_range = chunk_range_to_bigchunk_range(*coordinates) else: bigchunk_range = pixel_range_to_bigchunk_range(*coordinates) - chunks = download_bigchunk_range(*bigchunk_range) + chunks = download_bigchunk_range(*bigchunk_range, threads=args.threads) insert_chunks(chunks) parser = argparse.ArgumentParser() @@ -520,6 +535,7 @@ subparsers = parser.add_subparsers() p_update = subparsers.add_parser('update') p_update.add_argument('coordinates') p_update.add_argument('--chunks', dest='is_chunks', action='store_true') +p_update.add_argument('--threads', dest='threads', type=int, default=1) p_update.set_defaults(func=update_argparse) p_render = subparsers.add_parser('render')