from typing import List from imaplib import IMAP4 import imap_tools from imap_tools.errors import MailboxLoginError, MailboxLogoutError from imap_tools import MailBox, MailMessage import socket from threading import Event, RLock class PooledMailBox(MailBox): def __init__(self,pool,*args,**kwargs): MailBox.__init__(*args,**kwargs) self._mailbox_pool=pool def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.logout() def logout(self): self._mailbox_pool.release(self) class MailBoxPool(): "A pool of mailboxes." def __init__(self, username:str, password:str, initial_folder:str='INBOX', # for MailBox.login() connection_n=4, # for ourselves host='', port=993, timeout=None, # for MailBox.__init__() keyfile=None, certfile=None, ssl_context=None # for MailBox.__init() ): self.n=connection_n self.username=username self.password=password self.initial_folder=initial_folder self.host=host self.port=port self.timeout=timeout self.keyfile=keyfile self.certfile=certfile self.ssl_context=ssl_context self.errors=[] self.lock=RLock() self.boxreturned=Event() self.pool:List[MailBox]=[] self.taken:List[MailBox]=[] self.ensure_all_connections() def is_alive(self,mb:MailBox): "Internal function. Uses the provided mailbox to send a command to the server, checking whether it's still connected" try: mb.client.noop() except ( TimeoutError, ConnectionError, # generic python errors IMAP4.error, IMAP4.abort, # IMAP4 errors ): return False return True def ensure_all_connections(self): "Internal function. Refils the pool with connections. Note this also removes disconnected boxes that are currently in use." with self.lock: toremove=[] for mb in self.pool+self.taken: if not self.is_alive(mb): toremove.append(mb) for mb in toremove: try: mb.logout() # logout if needed except: pass if mb in self.pool: self.pool.remove(mb) if mb in self.taken: self.taken.remove(mb) boxesleft=len(self.pool)+len(self.taken) for i in range(self.n - boxesleft): try: mb=self.new_box() self.boxreturned.set() except Exception as e: self.errors.append(e) def new_box(self): "Interal function. Generates a new box. Note: this does NOT add it to the pool." mb=MailBox(host=self.host,port=self.port,timeout=self.timeout, certfile=self.certfile,keyfile=self.keyfile,ssl_context=self.ssl_context) mb.login(username=self.username,password=self.password,initial_folder=self.initial_folder) return mb __public_=["get_pool_size","get_box","release", "errors"] def get_pool_size(self): "Returns the total pool size, free and taken connections." return len(self.pool)+len(self.taken) def get_box(self): "Gets a new mailbox from the pool" while True: with self.lock: self.ensure_all_connections() if len(self.pool)>0: self.boxreturned.clear() mb=self.pool.pop(0) self.taken.append(mb) return mb self.boxreturned.wait() def release(self,mb:PooledMailBox): "Returns a mailbox back to the pool. Please use a context manager instead of manually releasing." with self.lock: if mb in self.taken: self.taken.remove(mb) self.pool.append(mb) self.boxreturned.set()