from threading import Lock, Thread, Event from queue import Queue, Empty import cherrypy class DownloadPool(): def __init__(self, download_fn, max_size=4): self.download_fn=download_fn self.max_size=4 self.lock=Lock() self.queue=Queue() self.threads:list[Thread]=[] self.stopping=Event() self.fill() def fill(self): for i in range(self.max_size): th=Thread(target=self._wrap) th.start() self.threads.append(th) def exec(self, args=[], kwargs={}, callback=None): done=Event() data={ "args":args, "kwargs":kwargs, "done":done, "callback":callback } self.queue.put(data) if not callback: done.wait() if "exception" in data: raise data["exception"] return data["result"] def _wrap(self): while not self.stopping.is_set(): with self.lock: if self.stopping.is_set(): continue # to not block for 5 seconds while stopping try: data=self.queue.get(block=True,timeout=5) except Empty: continue try: data["result"]=self.download_fn(*data["args"], **data["kwargs"]) if data["callback"]: data["callback"](*data["result"]) except Exception as e: data["exception"]=e import traceback traceback.print_exc() data["done"].set() def stop(self): cherrypy.log("Waiting for download thread pool to exit") self.stopping.set() for thread in self.threads: thread.join() cherrypy.log("Done")