from typing import List from imaplib import IMAP4 from imap_tools import MailBox from threading import Event, RLock, current_thread class PoolEmpty(Exception): def __init__(self,*args,**kwargs): Exception.__init__(self,*args,**kwargs) class PooledMailBox(MailBox): def __init__(self,pool,*args,**kwargs): MailBox.__init__(self,*args,**kwargs) self._mailbox_pool=pool def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.logout() def logout(self): self._mailbox_pool.release(self) class MailBoxPool(): "A pool of mailboxes." __public__=["open","close","get_pool_size","get_box","release", "errors"] def __init__(self, username:str, password:str, initial_folder:str='INBOX', # for MailBox.login() connection_n=4, # for ourselves host='', port=993, timeout=None, # for MailBox.__init__() keyfile=None, certfile=None, ssl_context=None # for MailBox.__init() ): self.n=connection_n self.username=username self.password=password self.initial_folder=initial_folder self.host=host self.port=port self.timeout=timeout self.keyfile=keyfile self.certfile=certfile self.ssl_context=ssl_context self.errors:List[Exception]=[] self.initalised=Event() self.holding_threads=[] self.lock=RLock() self.boxreturned=Event() self.pool:List[MailBox]=[] self.taken:List[MailBox]=[] def log(self,*msg): import traceback, threading tb=traceback.extract_stack(limit=4) pparent=tb[0].name parent=tb[2].name print("%s>>IMAPPOOL:%s:%s"%(threading.current_thread().name, pparent,parent)," ".join([str(i) for i in msg])) def open(self): "Initialises the pool" self.ensure_all_connections() self.initalised.set() self.log("Made all connections") def is_alive(self,mb:MailBox): "Internal function. Uses the provided mailbox to send a command to the server, checking whether it's still connected" try: mb.client.noop() except ( TimeoutError, ConnectionError, # generic python errors IMAP4.error, IMAP4.abort, # IMAP4 errors ): return False return True def ensure_all_connections(self): "Internal function. Refils the pool with connections. Note this also removes disconnected boxes that are currently in use." #self.log("Waiting for lock") with self.lock: toremove=[] for mb in self.pool: if not self.is_alive(mb): toremove.append(mb) for mb in toremove: try: MailBox.logout(mb) # logout if needed except: pass if mb in self.pool: self.pool.remove(mb) if mb in self.taken: self.taken.remove(mb) n=self.n-self.get_pool_size() if n: self.log("Creating %d new IMAP connections"%n) for i in range(n): try: mb=self.new_box() self.pool.append(mb) self.boxreturned.set() except Exception as e: self.errors.append(e) def new_box(self): "Interal function. Generates a new box. Note: this does NOT add it to the pool." mb=PooledMailBox(pool=self,host=self.host,port=self.port,timeout=self.timeout, certfile=self.certfile,keyfile=self.keyfile,ssl_context=self.ssl_context) mb.login(username=self.username,password=self.password,initial_folder=self.initial_folder) return mb def get_pool_size(self): "Returns the total pool size, free and taken connections." return len(self.pool)+len(self.taken) def get_box(self): "Gets a new mailbox from the pool" self.initalised.wait() while True: with self.lock: if self.get_pool_size()<1: raise PoolEmpty("No connections in pool!") self.ensure_all_connections() if len(self.pool)>0: self.boxreturned.clear() mb=self.pool.pop(0) self.taken.append(mb) self.holding_threads.append(current_thread()) return mb self.boxreturned.wait() def release(self,mb:PooledMailBox): "Returns a mailbox back to the pool. Please use a context manager instead of manually releasing." self.initalised.wait() with self.lock: if mb in self.taken: self.taken.remove(mb) self.holding_threads.remove(current_thread()) self.pool.append(mb) self.boxreturned.set() def close(self): "Closes all the mailboxes" self.initalised.set() # to force an exit wherever possible self.n=0 # make sure no new mailboxes get created while len(self.pool)+len(self.taken)>0: with self.lock: while len(self.pool)>0: mb=self.pool.pop(0) MailBox.logout(mb) # pass it this way bc we overwrite logout() self.log("Threads still holding a MailBox:",", ".join([thread.name for thread in self.holding_threads])) self.boxreturned.wait()