Rewrite threadpool with persistent threads that pick up jobs.

This commit is contained in:
Ethan Dalool 2020-10-12 16:34:26 -07:00
parent 4e9b43be8b
commit c0e9870b91

View file

@ -4,28 +4,49 @@ of threadpool in use:
1. Powering a single api scraping generator with many threads: 1. Powering a single api scraping generator with many threads:
pool = threadpool.ThreadPool(thread_count, paused=True) >>> pool = threadpool.ThreadPool(thread_count, paused=True)
job_gen = ({'function': api.get_item, 'kwargs': {'id': i}} for i in range(lower, upper+1)) >>> job_gen = ({'function': api.get_item, 'kwargs': {'id': i}} for i in range(lower, upper+1))
pool.add_generator(job_gen) >>> pool.add_generator(job_gen)
for job in pool.result_generator(): >>> for job in pool.result_generator():
if job.exception: >>> if job.exception:
raise job.exception >>> raise job.exception
if job.value is not None: >>> if job.value is not None:
yield job.value >>> yield job.value
2. Git-fetching a bunch of repositories with no error handling:
>>> def git_fetch(d):
>>> command = [GIT, '-C', d, 'fetch', '--all']
>>> print(command)
>>> subprocess.check_output(command, stderr=subprocess.STDOUT)
>>>
>>> def callback(job):
>>> if job.exception:
>>> print(f'{job.name} caused {job.exception}.')
>>>
>>> pool = threadpool.ThreadPool(thread_count, paused=False)
>>> kwargss = [{'function': git_fetch, 'args': [d], 'name': d, 'callback': callback} for d in dirs]
>>> pool.add_many(kwargss)
>>> pool.join()
''' '''
import collections import collections
import logging
import queue import queue
import threading import threading
import traceback
from voussoirkit import lazychain from voussoirkit import lazychain
from voussoirkit import sentinel from voussoirkit import sentinel
log = logging.getLogger('threadpool')
PENDING = sentinel.Sentinel('PENDING') PENDING = sentinel.Sentinel('PENDING')
RUNNING = sentinel.Sentinel('RUNNING') RUNNING = sentinel.Sentinel('RUNNING')
FINISHED = sentinel.Sentinel('FINISHED') FINISHED = sentinel.Sentinel('FINISHED')
RAISED = sentinel.Sentinel('RAISED') RAISED = sentinel.Sentinel('RAISED')
NO_MORE_JOBS = sentinel.Sentinel('NO_MORE_JOBS')
NO_RETURN = sentinel.Sentinel('NO_RETURN', truthyness=False) NO_RETURN = sentinel.Sentinel('NO_RETURN', truthyness=False)
NO_EXCEPTION = sentinel.Sentinel('NO_EXCEPTION', truthyness=False) NO_EXCEPTION = sentinel.Sentinel('NO_EXCEPTION', truthyness=False)
@ -35,6 +56,61 @@ class ThreadPoolException(Exception):
class PoolClosed(ThreadPoolException): class PoolClosed(ThreadPoolException):
pass pass
class PooledThread:
def __init__(self, pool):
self.pool = pool
self.thread = threading.Thread(target=self.start)
self.thread.daemon = True
self.thread.start()
def __repr__(self):
return f'PooledThread {self.thread}'
def _run_once(self):
# Any exceptions caused by the job's primary function are already
# wrapped safely, but there are two other sources of potential
# exceptions:
# 1. A generator given to add_generator that encounters an exception
# while generating the kwargs causes get_next_job to raise.
# 2. The callback function given to the Job raises.
# It's hard to say what the correct course of action is, but I
# realllly don't want them taking down the whole worker thread.
try:
job = self.pool.get_next_job()
except BaseException:
traceback.print_traceback()
return
if job is NO_MORE_JOBS:
return NO_MORE_JOBS
log.debug('%s is running job %s.', self, job)
self.pool._running_count += 1
try:
job.run()
except BaseException:
traceback.print_traceback()
self.pool._running_count -= 1
def join(self):
log.debug('%s is joining.', self)
self.thread.join()
def start(self):
while True:
# Let's wait for jobs_available first and unpaused second.
# If the time between the two waits is very long, the worst thing
# that can happen is there are no more jobs by the time we get
# there, and the loop comes around again. On the other hand, if
# unpaused.wait is first and the time until available.wait is very
# long, we might wind up running a job despite the user pausing
# the pool in the interim.
self.pool._jobs_available.wait()
self.pool._unpaused_event.wait()
status = self._run_once()
if status is NO_MORE_JOBS and self.pool.closed:
break
class ThreadPool: class ThreadPool:
''' '''
The ThreadPool is used to perform large numbers of tasks using a pool of The ThreadPool is used to perform large numbers of tasks using a pool of
@ -70,34 +146,37 @@ class ThreadPool:
if size < 1: if size < 1:
raise ValueError(f'size must be >= 1, not {size}.') raise ValueError(f'size must be >= 1, not {size}.')
self.max_size = size self._unpaused_event = threading.Event()
self.paused = paused if not paused:
self._unpaused_event.set()
self._jobs_available = threading.Event()
self._closed = False self._closed = False
self._running_count = 0 self._running_count = 0
self._result_queue = None self._result_queue = None
self._pending_jobs = lazychain.LazyChain() self._pending_jobs = lazychain.LazyChain()
self._job_manager_lock = threading.Lock() self._job_manager_lock = threading.Lock()
self._all_done_event = threading.Event()
self._all_done_event.set()
def _job_finished(self): self._size = size
''' self._threads = [PooledThread(pool=self) for x in range(size)]
When a job finishes, it will call here so that a new job can be started.
'''
self._running_count -= 1
if not self.paused:
self.start()
@property @property
def closed(self): def closed(self):
return self.closed return self._closed
@property
def paused(self):
return not self._unpaused_event.is_set()
@property @property
def running_count(self): def running_count(self):
return self._running_count return self._running_count
@property
def size(self):
return self._size
def assert_not_closed(self): def assert_not_closed(self):
''' '''
If the pool is closed (because you called `join`), raise PoolClosed. If the pool is closed (because you called `join`), raise PoolClosed.
@ -122,9 +201,7 @@ class ThreadPool:
kwargs=kwargs, kwargs=kwargs,
) )
self._pending_jobs.append(job) self._pending_jobs.append(job)
self._jobs_available.set()
if not self.paused:
self.start()
return job return job
@ -142,9 +219,7 @@ class ThreadPool:
these_jobs = (Job(pool=self, **kwargs) for kwargs in kwargs_gen) these_jobs = (Job(pool=self, **kwargs) for kwargs in kwargs_gen)
self._pending_jobs.extend(these_jobs) self._pending_jobs.extend(these_jobs)
self._jobs_available.set()
if not self.paused:
self.start()
def add_many(self, kwargss): def add_many(self, kwargss):
''' '''
@ -161,24 +236,48 @@ class ThreadPool:
''' '''
self.assert_not_closed() self.assert_not_closed()
kwargss = list(kwargss)
if not kwargss:
raise ValueError(f'{kwargss} must not be empty.')
these_jobs = [Job(pool=self, **kwargs) for kwargs in kwargss] these_jobs = [Job(pool=self, **kwargs) for kwargs in kwargss]
self._pending_jobs.extend(these_jobs) self._pending_jobs.extend(these_jobs)
self._jobs_available.set()
if not self.paused:
self.start()
return these_jobs return these_jobs
def get_next_job(self):
with self._job_manager_lock:
try:
job = next(self._pending_jobs)
except StopIteration:
# If we ARE closed, we want to keep the flag set so that all
# the threads can keep waking up and seeing no more jobs.
if not self.closed:
self._jobs_available.clear()
return NO_MORE_JOBS
if self._result_queue is not None:
# This will block if the queue is full.
self._result_queue.put(job)
return job
def join(self): def join(self):
''' '''
Permanently close the pool, preventing any new jobs from being added, Permanently close the pool, preventing any new jobs from being added,
and block until all jobs are complete. and block until all jobs are complete.
''' '''
log.debug('%s is joining.', self)
self._closed = True self._closed = True
# The threads which are currently paused at _jobs_available.wait() need
# to be woken up so they can realize the pool is closed and break.
self._jobs_available.set()
self.start() self.start()
self._all_done_event.wait() for thread in self._threads:
thread.join()
def result_generator(self): def result_generator(self, *, buffer_size=None):
''' '''
This generator will start the job pool, then yield finished/raised Job This generator will start the job pool, then yield finished/raised Job
objects in the order they were added. Note that a slow job will objects in the order they were added. Note that a slow job will
@ -197,52 +296,60 @@ class ThreadPool:
When there are no more outstanding jobs, the generator will stop When there are no more outstanding jobs, the generator will stop
iteration and return. If the pool was paused before generating, it iteration and return. If the pool was paused before generating, it
will be paused again. will be paused again. This prevents subsequently added jobs from being
lost as described.
buffer_size:
The size of the buffer which holds jobs before they are yielded.
If you expect your production to outpace your consumption, you may
wish to set this value to prevent high memory usage. When the buffer
is full, new jobs will be blocked from starting.
''' '''
if self._result_queue is not None: if self._result_queue is not None:
raise TypeError('The result generator is already open.') raise TypeError('The result generator is already open.')
self._result_queue = queue.Queue()
self._result_queue = queue.Queue(maxsize=buffer_size or 0)
was_paused = self.paused was_paused = self.paused
self.start() self.start()
while (not self._all_done_event.is_set()) or (not self._result_queue.empty()): # Considerations for the while loop condition:
# Why `jobs_available.is_set`: Consider a group of slow-running threads
# are launched and the jobs are added to the result_queue. The caller
# of this generator consumes all of them before the threads finish and
# start a new job. So, we need to watch jobs_available.is_set to know
# that even though the result_queue is currently empty, we can expect
# more to be ready soon and shouldn't break yet.
# Why `not results_queue.empty`: Consider a group of fast-running
# threads are launched, and exhaust all available jobs. So, we need to
# watch that result_queue is not empty and has more results.
# Why not `not closed`: After the pool is closed, the outstanding jobs
# still need to finish. Closing does not imply pausing or cancelling
# jobs.
while self._jobs_available.is_set() or not self._result_queue.empty():
job = self._result_queue.get() job = self._result_queue.get()
job.join() job.join()
yield job yield job
self._result_queue.task_done() self._result_queue.task_done()
self._result_queue = None self._result_queue = None
if was_paused: if was_paused:
self.paused = True self.pause()
def pause(self):
self._unpaused_event.clear()
def start(self): def start(self):
self.paused = False self._unpaused_event.set()
with self._job_manager_lock:
available = self.max_size - self._running_count
no_more_jobs = False
for x in range(available):
try:
job = next(self._pending_jobs)
except StopIteration:
no_more_jobs = True
break
self._all_done_event.clear()
job.start()
self._running_count += 1
if self._result_queue is not None:
self._result_queue.put(job)
if self._running_count == 0 and no_more_jobs:
self._all_done_event.set()
class Job: class Job:
''' '''
Each job contains one function that it will call when it is started. Each job contains one function that it will call when it is started.
If the function completes successfully you will find the return value in If the function completes successfully (status is threadpool.FINISHED) you
`job.value`. If it raises an exception, you'll find it in `job.exception`, will find the return value in `job.value`. If it raises an exception
although the thread itself will not raise. (status is threadpool.RAISED), you'll find it in `job.exception`, although
the thread itself will not raise.
All job threads are daemons and will not prevent the main thread from All job threads are daemons and will not prevent the main thread from
terminating. Call `job.join()` or `pool.join()` in the main thread to terminating. Call `job.join()` or `pool.join()` in the main thread to
@ -272,14 +379,8 @@ class Job:
self.kwargs = kwargs self.kwargs = kwargs
self.value = NO_RETURN self.value = NO_RETURN
self.exception = NO_EXCEPTION self.exception = NO_EXCEPTION
self._thread = None
# _joinme_lock works because it is possible for a single thread to block self._done_event = threading.Event()
# itself by calling `lock.acquire()` twice. The first call is here,
# and the second call is in `join` so that join will block until the
# lock is released by the job's finishing phase.
self._joinme_lock = threading.Lock()
self._joinme_lock.acquire()
def __repr__(self): def __repr__(self):
if self.name: if self.name:
@ -287,28 +388,22 @@ class Job:
else: else:
return f'<{self.status.name} Job on {self.function}>' return f'<{self.status.name} Job on {self.function}>'
def _run(self): def run(self):
self.status = RUNNING
try: try:
self.value = self.function(*self.args, **self.kwargs) self.value = self.function(*self.args, **self.kwargs)
self.status = FINISHED self.status = FINISHED
except BaseException as exc: except BaseException as exc:
self.exception = exc self.exception = exc
self.status = RAISED self.status = RAISED
self._thread = None
self._joinme_lock.release()
self.pool._job_finished()
if self.callback is not None: if self.callback is not None:
self.callback(self) self.callback(self)
self._done_event.set()
def join(self): def join(self):
''' '''
Block until this job runs and completes. Block until this job runs and completes.
''' '''
self._joinme_lock.acquire() self._done_event.wait()
self._joinme_lock.release()
def start(self):
self.status = RUNNING
self._thread = threading.Thread(target=self._run)
self._thread.daemon = True
self._thread.start()