aboutsummarybugs & patchesrefslogtreecommitdiffstats
path: root/src/discord_image_bridge/downloadpool.py
blob: fcf11cdb48469fed3c24ec0dddcc07c7bfdbca8a (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from threading import Lock, Thread, Event
from queue import Queue, Empty

import cherrypy

class DownloadPool():
	def __init__(self, download_fn, max_size=4):
		self.download_fn=download_fn
		self.max_size=4

		self.lock=Lock()
		self.queue=Queue()
		self.threads:list[Thread]=[]
		self.stopping=Event()

		self.fill()

	def fill(self):
		for i in range(self.max_size):
			th=Thread(target=self._wrap)
			th.start()
			self.threads.append(th)
	
	def exec(self, args=[], kwargs={}, callback=None):
		done=Event()
		data={
			"args":args,
			"kwargs":kwargs,
			"done":done,
			"callback":callback
			}
		self.queue.put(data)
		if not callback:
			done.wait()
			if "exception" in data:
				raise data["exception"]
			return data["result"]
	
	def _wrap(self):
		while not self.stopping.is_set():
			with self.lock:
				if self.stopping.is_set(): continue # to not block for 5 seconds while stopping
				try:
					data=self.queue.get(block=True,timeout=5)
				except Empty:
					continue
			try:
				data["result"]=self.download_fn(*data["args"], **data["kwargs"])
				if data["callback"]:
					data["callback"](*data["result"])
			except Exception as e:
				data["exception"]=e
				import traceback
				traceback.print_exc()
			data["done"].set()
	
	def stop(self):
		cherrypy.log("Waiting for download thread pool to exit")
		self.stopping.set()
		for thread in self.threads:
			thread.join()
		cherrypy.log("Done")