Use threadpool to do multithreaded chunk downloads.
This commit is contained in:
parent
6afa3b0e8d
commit
acafdac830
1 changed files with 24 additions and 8 deletions
|
@ -8,6 +8,8 @@ import sqlite3
|
|||
import sys
|
||||
import time
|
||||
|
||||
from voussoirkit import threadpool
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
log = logging.getLogger(__name__)
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
@ -167,16 +169,29 @@ def download_bigchunk(bigchunk_x, bigchunk_y):
|
|||
chunks = split_bigchunk(bigchunk_x, bigchunk_y, bigchunk_data)
|
||||
return chunks
|
||||
|
||||
def download_bigchunk_range(bigchunk_xy1, bigchunk_xy2):
|
||||
def download_bigchunk_range(bigchunk_xy1, bigchunk_xy2, threads=1):
|
||||
'''
|
||||
Given (UPPERLEFT_X, UPPERLEFT_Y), (LOWERRIGHT_X, LOWERRIGHT_Y),
|
||||
download multiple bigchunks, and return the total list of small chunks.
|
||||
download multiple bigchunks, and yield all of the small chunks.
|
||||
'''
|
||||
chunks = []
|
||||
for (x, y) in bigchunk_range_iterator(bigchunk_xy1, bigchunk_xy2):
|
||||
bigchunk = download_bigchunk(x, y)
|
||||
chunks.extend(bigchunk)
|
||||
return chunks
|
||||
bigchunks = bigchunk_range_iterator(bigchunk_xy1, bigchunk_xy2)
|
||||
if threads < 1:
|
||||
raise ValueError(threads)
|
||||
if threads == 1:
|
||||
for (x, y) in bigchunks:
|
||||
chunks = download_bigchunk(x, y)
|
||||
yield from chunks
|
||||
else:
|
||||
pool = threadpool.ThreadPool(size=threads)
|
||||
jobs = []
|
||||
for (x, y) in bigchunks:
|
||||
job = pool.add(download_bigchunk, args=(x, y), name=(x, y))
|
||||
jobs.append(job)
|
||||
for job in jobs:
|
||||
job.join()
|
||||
if job.exception:
|
||||
raise job.exception
|
||||
yield from job.value
|
||||
|
||||
# CHUNK FUNCTIONS
|
||||
################################################################################
|
||||
|
@ -511,7 +526,7 @@ def update_argparse(args):
|
|||
bigchunk_range = chunk_range_to_bigchunk_range(*coordinates)
|
||||
else:
|
||||
bigchunk_range = pixel_range_to_bigchunk_range(*coordinates)
|
||||
chunks = download_bigchunk_range(*bigchunk_range)
|
||||
chunks = download_bigchunk_range(*bigchunk_range, threads=args.threads)
|
||||
insert_chunks(chunks)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
|
@ -520,6 +535,7 @@ subparsers = parser.add_subparsers()
|
|||
p_update = subparsers.add_parser('update')
|
||||
p_update.add_argument('coordinates')
|
||||
p_update.add_argument('--chunks', dest='is_chunks', action='store_true')
|
||||
p_update.add_argument('--threads', dest='threads', type=int, default=1)
|
||||
p_update.set_defaults(func=update_argparse)
|
||||
|
||||
p_render = subparsers.add_parser('render')
|
||||
|
|
Loading…
Reference in a new issue