aboutsummarybugs & patchesrefslogtreecommitdiffstats
from threading import Lock, Thread, Event
from queue import Queue, Empty

import cherrypy

class DownloadPool():
	def __init__(self, download_fn, max_size=4):
		self.download_fn=download_fn
		self.max_size=4

		self.lock=Lock()
		self.queue=Queue()
		self.threads:list[Thread]=[]
		self.stopping=Event()

		self.fill()

	def fill(self):
		for i in range(self.max_size):
			th=Thread(target=self._wrap)
			th.start()
			self.threads.append(th)
	
	def exec(self, args=[], kwargs={}, callback=None):
		done=Event()
		data={
			"args":args,
			"kwargs":kwargs,
			"done":done,
			"callback":callback
			}
		self.queue.put(data)
		if not callback:
			done.wait()
			if "exception" in data:
				raise data["exception"]
			return data["result"]
	
	def _wrap(self):
		while not self.stopping.is_set():
			with self.lock:
				if self.stopping.is_set(): continue # to not block for 5 seconds while stopping
				try:
					data=self.queue.get(block=True,timeout=5)
				except Empty:
					continue
			try:
				data["result"]=self.download_fn(*data["args"], **data["kwargs"])
				if data["callback"]:
					data["callback"](*data["result"])
			except Exception as e:
				data["exception"]=e
				import traceback
				traceback.print_exc()
			data["done"].set()
	
	def stop(self):
		cherrypy.log("Waiting for download thread pool to exit")
		self.stopping.set()
		for thread in self.threads:
			thread.join()
		cherrypy.log("Done")