from threading import Lock, Thread, Event
from queue import Queue, Empty
import cherrypy
class DownloadPool():
def __init__(self, download_fn, max_size=4):
self.download_fn=download_fn
self.max_size=4
self.lock=Lock()
self.queue=Queue()
self.threads:list[Thread]=[]
self.stopping=Event()
self.fill()
def fill(self):
for i in range(self.max_size):
th=Thread(target=self._wrap)
th.start()
self.threads.append(th)
def exec(self, args=[], kwargs={}, callback=None):
done=Event()
data={
"args":args,
"kwargs":kwargs,
"done":done,
"callback":callback
}
self.queue.put(data)
if not callback:
done.wait()
if "exception" in data:
raise data["exception"]
return data["result"]
def _wrap(self):
while not self.stopping.is_set():
with self.lock:
if self.stopping.is_set(): continue # to not block for 5 seconds while stopping
try:
data=self.queue.get(block=True,timeout=5)
except Empty:
continue
try:
data["result"]=self.download_fn(*data["args"], **data["kwargs"])
if data["callback"]:
data["callback"](*data["result"])
except Exception as e:
data["exception"]=e
import traceback
traceback.print_exc()
data["done"].set()
def stop(self):
cherrypy.log("Waiting for download thread pool to exit")
self.stopping.set()
for thread in self.threads:
thread.join()
cherrypy.log("Done")