from typing import List
from imaplib import IMAP4
from imap_tools import MailBox
from threading import Event, RLock, current_thread
class PoolEmpty(Exception):
def __init__(self,*args,**kwargs):
Exception.__init__(self,*args,**kwargs)
class PooledMailBox(MailBox):
def __init__(self,pool,*args,**kwargs):
MailBox.__init__(self,*args,**kwargs)
self._mailbox_pool=pool
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.logout()
def logout(self):
self._mailbox_pool.release(self)
class MailBoxPool():
"A pool of mailboxes."
__public__=["open","close","get_pool_size","get_box","release", "errors"]
def __init__(self, username:str, password:str, initial_folder:str='INBOX', # for MailBox.login()
connection_n=4, # for ourselves
host='', port=993, timeout=None, # for MailBox.__init__()
keyfile=None, certfile=None, ssl_context=None # for MailBox.__init()
):
self.n=connection_n
self.username=username
self.password=password
self.initial_folder=initial_folder
self.host=host
self.port=port
self.timeout=timeout
self.keyfile=keyfile
self.certfile=certfile
self.ssl_context=ssl_context
self.errors:List[Exception]=[]
self.initalised=Event()
self.holding_threads=[]
self.lock=RLock()
self.boxreturned=Event()
self.pool:List[MailBox]=[]
self.taken:List[MailBox]=[]
def log(self,*msg):
import traceback, threading
tb=traceback.extract_stack(limit=4)
pparent=tb[0].name
parent=tb[2].name
print("%s>>IMAPPOOL:%s:%s"%(threading.current_thread().name, pparent,parent)," ".join([str(i) for i in msg]))
def open(self):
"Initialises the pool"
self.ensure_all_connections()
self.initalised.set()
self.log("Made all connections")
def is_alive(self,mb:MailBox):
"Internal function. Uses the provided mailbox to send a command to the server, checking whether it's still connected"
try:
mb.client.noop()
except (
TimeoutError, ConnectionError, # generic python errors
IMAP4.error, IMAP4.abort, # IMAP4 errors
):
return False
return True
def ensure_all_connections(self):
"Internal function. Refils the pool with connections. Note this also removes disconnected boxes that are currently in use."
#self.log("Waiting for lock")
with self.lock:
toremove=[]
for mb in self.pool:
if not self.is_alive(mb):
toremove.append(mb)
for mb in toremove:
try:
MailBox.logout(mb) # logout if needed
except:
pass
if mb in self.pool: self.pool.remove(mb)
if mb in self.taken: self.taken.remove(mb)
n=self.n-self.get_pool_size()
if n: self.log("Creating %d new IMAP connections"%n)
for i in range(n):
try:
mb=self.new_box()
self.pool.append(mb)
self.boxreturned.set()
except Exception as e:
self.errors.append(e)
def new_box(self):
"Interal function. Generates a new box. Note: this does NOT add it to the pool."
mb=PooledMailBox(pool=self,host=self.host,port=self.port,timeout=self.timeout,
certfile=self.certfile,keyfile=self.keyfile,ssl_context=self.ssl_context)
mb.login(username=self.username,password=self.password,initial_folder=self.initial_folder)
return mb
def get_pool_size(self):
"Returns the total pool size, free and taken connections."
return len(self.pool)+len(self.taken)
def get_box(self):
"Gets a new mailbox from the pool"
self.initalised.wait()
while True:
with self.lock:
if self.get_pool_size()<1:
raise PoolEmpty("No connections in pool!")
self.ensure_all_connections()
if len(self.pool)>0:
self.boxreturned.clear()
mb=self.pool.pop(0)
self.taken.append(mb)
self.holding_threads.append(current_thread())
return mb
self.boxreturned.wait()
def release(self,mb:PooledMailBox):
"Returns a mailbox back to the pool. Please use a context manager instead of manually releasing."
self.initalised.wait()
with self.lock:
if mb in self.taken:
self.taken.remove(mb)
self.holding_threads.remove(current_thread())
self.pool.append(mb)
self.boxreturned.set()
def close(self):
"Closes all the mailboxes"
self.initalised.set() # to force an exit wherever possible
self.n=0 # make sure no new mailboxes get created
while len(self.pool)+len(self.taken)>0:
with self.lock:
while len(self.pool)>0:
mb=self.pool.pop(0)
MailBox.logout(mb) # pass it this way bc we overwrite logout()
self.log("Threads still holding a MailBox:",", ".join([thread.name for thread in self.holding_threads]))
self.boxreturned.wait()