Big threadpool update.

master
Ethan Dalool 2020-10-09 13:10:24 -07:00
parent 12175c23a7
commit 4e9b43be8b
1 changed files with 195 additions and 110 deletions

View File

@ -1,11 +1,30 @@
'''
The documentation for the classes and methods are below. Here are some examples
of threadpool in use:
1. Powering a single api scraping generator with many threads:
pool = threadpool.ThreadPool(thread_count, paused=True)
job_gen = ({'function': api.get_item, 'kwargs': {'id': i}} for i in range(lower, upper+1))
pool.add_generator(job_gen)
for job in pool.result_generator():
if job.exception:
raise job.exception
if job.value is not None:
yield job.value
'''
import collections
import queue
import threading
from voussoirkit import lazychain
from voussoirkit import sentinel
PENDING = 'pending'
RUNNING = 'running'
FINISHED = 'finished'
RAISED = 'raised'
PENDING = sentinel.Sentinel('PENDING')
RUNNING = sentinel.Sentinel('RUNNING')
FINISHED = sentinel.Sentinel('FINISHED')
RAISED = sentinel.Sentinel('RAISED')
NO_RETURN = sentinel.Sentinel('NO_RETURN', truthyness=False)
NO_EXCEPTION = sentinel.Sentinel('NO_EXCEPTION', truthyness=False)
@ -17,134 +36,136 @@ class PoolClosed(ThreadPoolException):
pass
class ThreadPool:
def __init__(self, size, paused=False):
'''
The ThreadPool is used to perform large numbers of tasks using a pool of
worker threads. Jobs are run in the order they are added.
The pool supports two main paradigms of usage:
1. Callback / async style
If the job function performs your desired side effects by itself, or is
given a callback function, you can simply add it to the pool and wait
for it to run.
2. Generator style
If you want to yield the job results back to the main thread for
processing (e.g. you are feeding the results into sqlite, which must be
done on the thread which opened the sqlite connection), you can use
`result_generator` to get each job in the order they were added to the
pool. This style also makes it easier to terminate the main thread when
a single job encounters an issue. Just `raise job.exception`.
'''
def __init__(self, size, paused=True):
'''
size:
The number of worker threads.
paused:
The pool will start in a paused state and you will have to call
`start` to start it.
If True, the pool will start in a paused state and you will have to
call `start` to start it. If False, the pool will run as soon as
jobs are added to it.
'''
if not isinstance(size, int):
raise TypeError(f'size must be an int, not {type(size)}.')
if size < 1:
raise ValueError(f'size must be >= 1, not {size}.')
self.max_size = size
self.closed = False
self.paused = paused
self._jobs = []
self._closed = False
self._running_count = 0
self._result_queue = None
self._pending_jobs = lazychain.LazyChain()
self._job_manager_lock = threading.Lock()
def _clear_done_jobs(self):
'''
This function assumes that _job_manager_lock is acquired!!
You should call start instead!
'''
self._jobs[:] = [j for j in self._jobs if j.status in {PENDING, RUNNING}]
def _start_jobs(self):
'''
This function assumes that _job_manager_lock is acquired!!
You should call start instead!
'''
available = self.max_size - self.running_count
available = max(0, available)
if available == 0:
return
for job in list(self._jobs):
if job.status == PENDING:
job.start()
available -= 1
if available == 0:
break
def _clear_done_and_start_jobs(self):
'''
This function assumes that _job_manager_lock is acquired!!
You should call start instead!
'''
self._clear_done_jobs()
self._start_jobs()
self._all_done_event = threading.Event()
self._all_done_event.set()
def _job_finished(self):
'''
When a job finishes, it will call here.
When a job finishes, it will call here so that a new job can be started.
'''
if self.paused:
return
self._running_count -= 1
# Although this method is private, we are calling the public `start`
# instead of the private method because we do not hold the lock.
self.start()
if not self.paused:
self.start()
@property
def closed(self):
return self.closed
@property
def running_count(self):
return sum(1 for job in list(self._jobs) if job.status is RUNNING)
@property
def unfinished_count(self):
return sum(1 for job in list(self._jobs) if job.status in {PENDING, RUNNING})
return self._running_count
def assert_not_closed(self):
'''
If the pool is closed (because you called `join`), raise PoolClosed.
Otherwise do nothing.
'''
if self.closed:
if self._closed:
raise PoolClosed()
def add(self, function, *, name=None, args=tuple(), kwargs=dict()):
def add(self, function, *, name=None, callback=None, args=tuple(), kwargs=dict()):
'''
Add a new job to the pool. Jobs are run in the order they are added.
Add a new job to the pool.
Don't forget that in order to write a tuple of length 1 you must still
add a comma on the end. `add(print, args=(4))` is an error, you need to
`add(print, args=(4,))` or use a list instead: `add(print, args=[4])`.
name:
An optional value that will appear in the repr of the job and
has no other purpose. Use this if you intend to print(job) and want
a human friendly name string.
See the Job class for parameter details.
'''
self.assert_not_closed()
with self._job_manager_lock:
job = Job(
pool=self,
function=function,
name=name,
args=args,
kwargs=kwargs,
)
self._jobs.append(job)
job = Job(
pool=self,
function=function,
name=name,
args=args,
kwargs=kwargs,
)
self._pending_jobs.append(job)
if not self.paused:
self._clear_done_and_start_jobs()
if not self.paused:
self.start()
return job
def add_generator(self, kwargs_gen):
'''
Add jobs from a generator which yields kwarg dictionaries. Unlike
`add` and `add_many`, the Job objects are not returned by this method
(since they don't exist yet!). If you want them, use `result_generator`
to iterate the pool's jobs as they complete. Otherwise, they should
have their own side effects or use a callback.
See the Job class for kwarg details.
'''
self.assert_not_closed()
these_jobs = (Job(pool=self, **kwargs) for kwargs in kwargs_gen)
self._pending_jobs.extend(these_jobs)
if not self.paused:
self.start()
def add_many(self, kwargss):
'''
Add multiple new jobs to the pool at once. Useful to prevent the
excessive lock-waiting that you get from calling regular `add` in a
loop while other jobs are finishing and triggering queue maintenance.
Add multiple new jobs to the pool at once. This is better than calling
`add` in a loop because we only have to aquire the lock one time.
Provide an iterable of kwarg dictionaries. That is:
[
{'function': print, 'args': [4], 'name': '4'},
{'function': sample, 'kwargs': {'x': 2}},
]
See the Job class for kwarg details.
'''
self.assert_not_closed()
with self._job_manager_lock:
these_jobs = []
for kwargs in kwargss:
kwargs.pop('pool', None)
job = Job(pool=self, **kwargs)
these_jobs.append(job)
self._jobs.append(job)
these_jobs = [Job(pool=self, **kwargs) for kwargs in kwargss]
self._pending_jobs.extend(these_jobs)
if not self.paused:
self._clear_done_and_start_jobs()
if not self.paused:
self.start()
return these_jobs
@ -153,33 +174,100 @@ class ThreadPool:
Permanently close the pool, preventing any new jobs from being added,
and block until all jobs are complete.
'''
self.closed = True
self._closed = True
self.start()
for job in self._jobs:
self._all_done_event.wait()
def result_generator(self):
'''
This generator will start the job pool, then yield finished/raised Job
objects in the order they were added. Note that a slow job will
therefore hold up the generator, though it will not stop the job pool
from running and spawning new jobs in their other threads.
For best results, you should create the pool in the paused state, add
your jobs, then use this method to start the pool. Any jobs that run
while the result_generator is not active will not be stored, since we
don't necessarily know if this method will ever be used. So, any jobs
that start before the result_generator is active will not be yielded
and will simply be lost to garbage collection.
If more jobs are added while the generator is running, they will be
yielded as expected.
When there are no more outstanding jobs, the generator will stop
iteration and return. If the pool was paused before generating, it
will be paused again.
'''
if self._result_queue is not None:
raise TypeError('The result generator is already open.')
self._result_queue = queue.Queue()
was_paused = self.paused
self.start()
while (not self._all_done_event.is_set()) or (not self._result_queue.empty()):
job = self._result_queue.get()
job.join()
yield job
self._result_queue.task_done()
self._result_queue = None
if was_paused:
self.paused = True
def start(self):
'''
Remove finished and raised jobs from the queue and start some new jobs.
The job queue is maintained automatically while adding new jobs and
when a job finishes, as long as the pool is not paused, so you should
not have to call it yourself. If you do pause the pool, use this method
to restart it.
Because the pool's internal job queue is flushed regularly, you should
store your own references to jobs to get their return values.
'''
self.paused = False
with self._job_manager_lock:
self._clear_done_and_start_jobs()
self.paused = False
available = self.max_size - self._running_count
no_more_jobs = False
for x in range(available):
try:
job = next(self._pending_jobs)
except StopIteration:
no_more_jobs = True
break
self._all_done_event.clear()
job.start()
self._running_count += 1
if self._result_queue is not None:
self._result_queue.put(job)
if self._running_count == 0 and no_more_jobs:
self._all_done_event.set()
class Job:
def __init__(self, pool, function, *, name=None, args=tuple(), kwargs=dict()):
'''
Each job contains one function that it will call when it is started.
If the function completes successfully you will find the return value in
`job.value`. If it raises an exception, you'll find it in `job.exception`,
although the thread itself will not raise.
All job threads are daemons and will not prevent the main thread from
terminating. Call `job.join()` or `pool.join()` in the main thread to
ensure jobs complete.
'''
def __init__(self, pool, function, *, name=None, callback=None, args=tuple(), kwargs=dict()):
'''
When this job is started, `function(*args, **kwargs)` will be called.
name:
An optional value that will appear in the repr of the job and
has no other purpose. Use this if you intend to print(job) and want
a human friendly name string.
callback:
An optional function which will be called as `callback(job)` after
the job is finished running. Use this for async-style processing of
the job. Note that the callback is called via the job's thread, so
make sure it is memory safe.
'''
self.pool = pool
self.name = name
self.status = PENDING
self.function = function
self.callback = callback
self.args = args
self.kwargs = kwargs
self.value = NO_RETURN
@ -195,20 +283,22 @@ class Job:
def __repr__(self):
if self.name:
return f'<{self.status} Job {repr(self.name)}>'
return f'<{self.status.name} Job {repr(self.name)}>'
else:
return f'<{self.status} Job on {self.function}>'
return f'<{self.status.name} Job on {self.function}>'
def _run(self):
try:
self.value = self.function(*self.args, **self.kwargs)
self.status = FINISHED
except Exception as exc:
except BaseException as exc:
self.exception = exc
self.status = RAISED
self._thread = None
self.pool._job_finished()
self._joinme_lock.release()
self.pool._job_finished()
if self.callback is not None:
self.callback(self)
def join(self):
'''
@ -218,11 +308,6 @@ class Job:
self._joinme_lock.release()
def start(self):
'''
Start the job. If the function completes successfully you will find the
return value in `value`. If it raises an exception, you'll find it in
`exception`, although the thread itself will not raise.
'''
self.status = RUNNING
self._thread = threading.Thread(target=self._run)
self._thread.daemon = True