From a1f26200faa4c43b5bd6e23dee4b291a5d451db9 Mon Sep 17 00:00:00 2001 From: Ethan Dalool Date: Wed, 12 Feb 2020 16:56:07 -0800 Subject: [PATCH] Add the ability to pause the threadpool. --- voussoirkit/threadpool.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/voussoirkit/threadpool.py b/voussoirkit/threadpool.py index 31e007b..5bf286a 100644 --- a/voussoirkit/threadpool.py +++ b/voussoirkit/threadpool.py @@ -17,13 +17,19 @@ class PoolClosed(ThreadPoolException): pass class ThreadPool: - def __init__(self, size): + def __init__(self, size, paused=False): + ''' + paused: + The pool will start in a paused state and you will have to call + `clear_done_and_start_jobs` to start it. + ''' if not isinstance(size, int): raise TypeError(f'size must be an int, not {type(size)}.') if size < 1: raise ValueError(f'size must be >= 1, not {size}.') self.max_size = size self.closed = False + self.paused = paused self.jobs = [] self.job_manager_lock = threading.Lock() @@ -64,7 +70,7 @@ class ThreadPool: ''' When a job finishes, it will call here. ''' - if self.closed: + if self.closed or self.paused: return self.clear_done_and_start_jobs() @@ -91,6 +97,7 @@ class ThreadPool: ''' self.assert_not_closed() self.job_manager_lock.acquire() + job = Job( pool=self, function=function, @@ -99,7 +106,10 @@ class ThreadPool: kwargs=kwargs, ) self.jobs.append(job) - self._clear_done_and_start_jobs() + + if not self.paused: + self._clear_done_and_start_jobs() + self.job_manager_lock.release() return job @@ -107,14 +117,17 @@ class ThreadPool: ''' Remove finished and raised jobs from the queue and start some new jobs. - This function will be called automatically while adding new jobs and - when a job finishes, so you should not have to call it yourself. + The job queue is maintained automatically while adding new jobs and + when a job finishes, as long as the pool is not paused, so you should + not have to call it yourself. If you do pause the pool, use this method + to restart it. Because the pool's internal job queue is flushed regularly, you should store your own references to jobs to get their return values. ''' self.job_manager_lock.acquire() self._clear_done_and_start_jobs() + self.paused = False self.job_manager_lock.release() def join(self):