Add the ability to pause the threadpool.

This commit is contained in:
Ethan Dalool 2020-02-12 16:56:07 -08:00
parent 630159768d
commit a1f26200fa

View file

@ -17,13 +17,19 @@ class PoolClosed(ThreadPoolException):
pass pass
class ThreadPool: class ThreadPool:
def __init__(self, size): def __init__(self, size, paused=False):
'''
paused:
The pool will start in a paused state and you will have to call
`clear_done_and_start_jobs` to start it.
'''
if not isinstance(size, int): if not isinstance(size, int):
raise TypeError(f'size must be an int, not {type(size)}.') raise TypeError(f'size must be an int, not {type(size)}.')
if size < 1: if size < 1:
raise ValueError(f'size must be >= 1, not {size}.') raise ValueError(f'size must be >= 1, not {size}.')
self.max_size = size self.max_size = size
self.closed = False self.closed = False
self.paused = paused
self.jobs = [] self.jobs = []
self.job_manager_lock = threading.Lock() self.job_manager_lock = threading.Lock()
@ -64,7 +70,7 @@ class ThreadPool:
''' '''
When a job finishes, it will call here. When a job finishes, it will call here.
''' '''
if self.closed: if self.closed or self.paused:
return return
self.clear_done_and_start_jobs() self.clear_done_and_start_jobs()
@ -91,6 +97,7 @@ class ThreadPool:
''' '''
self.assert_not_closed() self.assert_not_closed()
self.job_manager_lock.acquire() self.job_manager_lock.acquire()
job = Job( job = Job(
pool=self, pool=self,
function=function, function=function,
@ -99,7 +106,10 @@ class ThreadPool:
kwargs=kwargs, kwargs=kwargs,
) )
self.jobs.append(job) self.jobs.append(job)
self._clear_done_and_start_jobs()
if not self.paused:
self._clear_done_and_start_jobs()
self.job_manager_lock.release() self.job_manager_lock.release()
return job return job
@ -107,14 +117,17 @@ class ThreadPool:
''' '''
Remove finished and raised jobs from the queue and start some new jobs. Remove finished and raised jobs from the queue and start some new jobs.
This function will be called automatically while adding new jobs and The job queue is maintained automatically while adding new jobs and
when a job finishes, so you should not have to call it yourself. when a job finishes, as long as the pool is not paused, so you should
not have to call it yourself. If you do pause the pool, use this method
to restart it.
Because the pool's internal job queue is flushed regularly, you should Because the pool's internal job queue is flushed regularly, you should
store your own references to jobs to get their return values. store your own references to jobs to get their return values.
''' '''
self.job_manager_lock.acquire() self.job_manager_lock.acquire()
self._clear_done_and_start_jobs() self._clear_done_and_start_jobs()
self.paused = False
self.job_manager_lock.release() self.job_manager_lock.release()
def join(self): def join(self):