diff --git a/voussoirkit/threadpool.py b/voussoirkit/threadpool.py index 5bc6c43..6a47539 100644 --- a/voussoirkit/threadpool.py +++ b/voussoirkit/threadpool.py @@ -123,6 +123,8 @@ class PooledThread: if status is NO_MORE_JOBS and self.pool.closed: break + self.pool._thread_finished(self) + class ThreadPool: ''' The ThreadPool is used to perform large numbers of tasks using a pool of @@ -165,17 +167,34 @@ class ThreadPool: self._jobs_available = threading.Event() self._closed = False + self._done = False self._running_count = 0 self._result_queue = None self._pending_jobs = lazychain.LazyChain() + # Should be held while manipulating self._pending_jobs. self._job_manager_lock = threading.Lock() # Should be held while manipulating self._running_count. self._running_count_lock = threading.Lock() + + # Should be held while manipulating self._threads. + self._thread_manager_lock = threading.Lock() + self._size = size self._threads = {PooledThread(pool=self) for x in range(size)} + def _thread_finished(self, thread): + ''' + Threads call here when they finish their mainloop so we can do + some bookkeeping. + ''' + with self._thread_manager_lock: + self._threads.remove(thread) + + if len(self._threads) == 0: + self._done = True + @property def closed(self) -> bool: ''' @@ -184,6 +203,14 @@ class ThreadPool: ''' return self._closed + @property + def done(self) -> bool: + ''' + done indicates that the pool is closed and all jobs have completely + finished running. + ''' + return self._done + @property def paused(self) -> bool: return not self._unpaused_event.is_set()