aboutsummaryrefslogtreecommitdiffstats

from typing import List

from imaplib import IMAP4

from imap_tools import MailBox

from threading import Event, RLock, current_thread


class PoolEmpty(Exception):
	def __init__(self,*args,**kwargs):
		Exception.__init__(self,*args,**kwargs)



class PooledMailBox(MailBox):
	def __init__(self,pool,*args,**kwargs):
		MailBox.__init__(self,*args,**kwargs)
		self._mailbox_pool=pool

	def __enter__(self):
		return self

	def __exit__(self, exc_type, exc_value, traceback):
		self.logout()
	
	def logout(self):
		self._mailbox_pool.release(self)



class MailBoxPool():
	"A pool of mailboxes."
	__public__=["open","close","get_pool_size","get_box","release", "errors"]

	def __init__(self, username:str, password:str, initial_folder:str='INBOX', # for MailBox.login()
			connection_n=4, # for ourselves
			host='', port=993, timeout=None, # for MailBox.__init__()
			keyfile=None, certfile=None, ssl_context=None # for MailBox.__init()
			):
		self.n=connection_n
		self.username=username
		self.password=password
		self.initial_folder=initial_folder
		self.host=host
		self.port=port
		self.timeout=timeout
		self.keyfile=keyfile
		self.certfile=certfile
		self.ssl_context=ssl_context

		self.errors:List[Exception]=[]

		self.initalised=Event()
		self.holding_threads=[]

		self.lock=RLock()
		self.boxreturned=Event()
		self.pool:List[MailBox]=[]
		self.taken:List[MailBox]=[]

	def log(self,*msg):
		import traceback, threading
		tb=traceback.extract_stack(limit=4)
		pparent=tb[0].name
		parent=tb[2].name
		print("%s>>IMAPPOOL:%s:%s"%(threading.current_thread().name, pparent,parent)," ".join([str(i) for i in msg]))

	def open(self):
		"Initialises the pool"
		self.ensure_all_connections()
		self.initalised.set()
		self.log("Made all connections")
	
	def is_alive(self,mb:MailBox):
		"Internal function. Uses the provided mailbox to send a command to the server, checking whether it's still connected"
		try:
			mb.client.noop()
		except (
			TimeoutError, ConnectionError, # generic python errors
			IMAP4.error, IMAP4.abort, # IMAP4 errors
			):
			return False
		return True

	def ensure_all_connections(self):
		"Internal function. Refils the pool with connections. Note this also removes disconnected boxes that are currently in use."
		#self.log("Waiting for lock")
		with self.lock:
			toremove=[]
			for mb in self.pool:
				if not self.is_alive(mb):
					toremove.append(mb)

			for mb in toremove:
				try:
					MailBox.logout(mb) # logout if needed
				except:
					pass
				if mb in self.pool: self.pool.remove(mb)
				if mb in self.taken: self.taken.remove(mb)

			n=self.n-self.get_pool_size()
			if n: self.log("Creating %d new IMAP connections"%n)
			for i in range(n):
				try:
					mb=self.new_box()
					self.pool.append(mb)
					self.boxreturned.set()
				except Exception as e:
					self.errors.append(e)
	
	def new_box(self):
		"Interal function. Generates a new box. Note: this does NOT add it to the pool."
		mb=PooledMailBox(pool=self,host=self.host,port=self.port,timeout=self.timeout,
			certfile=self.certfile,keyfile=self.keyfile,ssl_context=self.ssl_context)

		mb.login(username=self.username,password=self.password,initial_folder=self.initial_folder)
		return mb

	def get_pool_size(self):
		"Returns the total pool size, free and taken connections."
		return len(self.pool)+len(self.taken)
	
	def get_box(self):
		"Gets a new mailbox from the pool"
		self.initalised.wait()
		while True:
			with self.lock:
				if self.get_pool_size()<1:
					raise PoolEmpty("No connections in pool!")
				self.ensure_all_connections()
				if len(self.pool)>0:
					self.boxreturned.clear()
					mb=self.pool.pop(0)
					self.taken.append(mb)
					self.holding_threads.append(current_thread())
					return mb

			self.boxreturned.wait()
	
	def release(self,mb:PooledMailBox):
		"Returns a mailbox back to the pool. Please use a context manager instead of manually releasing."
		self.initalised.wait()
		with self.lock:
			if mb in self.taken:
				self.taken.remove(mb)
			self.holding_threads.remove(current_thread())
			self.pool.append(mb)
			self.boxreturned.set()
	
	def close(self):
		"Closes all the mailboxes"
		self.initalised.set() # to force an exit wherever possible
		self.n=0 # make sure no new mailboxes get created
		while len(self.pool)+len(self.taken)>0:
			with self.lock:
				while len(self.pool)>0:
					mb=self.pool.pop(0)
					MailBox.logout(mb) # pass it this way bc we overwrite logout()
			self.log("Threads still holding a MailBox:",", ".join([thread.name for thread in self.holding_threads]))
			self.boxreturned.wait()