diff options
| author | Vosjedev <vosje@vosjedev.net> | 2025-11-02 13:02:23 +0100 |
|---|---|---|
| committer | Vosjedev <vosje@vosjedev.net> | 2025-11-02 13:02:23 +0100 |
| commit | 275a4c5c7ce41da817f1427a7c6837297771976e (patch) | |
| tree | 68e197631c14cf2237325846a37a9e0f2b3646d4 | |
| parent | b0b77c997574c633304cf621bd2e74fa2d3a39e6 (diff) | |
| download | acit-275a4c5c7ce41da817f1427a7c6837297771976e.tar.gz acit-275a4c5c7ce41da817f1427a7c6837297771976e.tar.bz2 acit-275a4c5c7ce41da817f1427a7c6837297771976e.tar.xz | |
Finish the pool
| -rw-r--r-- | src/acit/imap_pool.py | 61 |
1 files changed, 49 insertions, 12 deletions
diff --git a/src/acit/imap_pool.py b/src/acit/imap_pool.py index 7a7017b..0447e9b 100644 --- a/src/acit/imap_pool.py +++ b/src/acit/imap_pool.py @@ -3,17 +3,20 @@ from typing import List from imaplib import IMAP4 -import imap_tools -from imap_tools.errors import MailboxLoginError, MailboxLogoutError -from imap_tools import MailBox, MailMessage - -import socket +from imap_tools import MailBox from threading import Event, RLock + +class PoolEmpty(Exception): + def __init__(self,*args,**kwargs): + Exception.__init__(self,*args,**kwargs) + + + class PooledMailBox(MailBox): def __init__(self,pool,*args,**kwargs): - MailBox.__init__(*args,**kwargs) + MailBox.__init__(self,*args,**kwargs) self._mailbox_pool=pool def __enter__(self): @@ -29,6 +32,8 @@ class PooledMailBox(MailBox): class MailBoxPool(): "A pool of mailboxes." + __public__=["open","close","get_pool_size","get_box","release", "errors"] + def __init__(self, username:str, password:str, initial_folder:str='INBOX', # for MailBox.login() connection_n=4, # for ourselves host='', port=993, timeout=None, # for MailBox.__init__() @@ -47,12 +52,25 @@ class MailBoxPool(): self.errors=[] + self.initalised=Event() + self.lock=RLock() self.boxreturned=Event() self.pool:List[MailBox]=[] self.taken:List[MailBox]=[] + def log(self,*msg,**kwargs): + import traceback, threading, cherrypy + tb=traceback.extract_stack(limit=4) + pparent=tb[0].name + parent=tb[2].name + cherrypy.log(context="%s>>IMAPPOOL:%s:%s"%(threading.current_thread().name, pparent,parent), msg=" ".join([str(i) for i in msg]),**kwargs) + + def open(self): + "Initialises the pool" self.ensure_all_connections() + self.initalised.set() + self.log("Made all connections") def is_alive(self,mb:MailBox): "Internal function. Uses the provided mailbox to send a command to the server, checking whether it's still connected" @@ -67,6 +85,7 @@ class MailBoxPool(): def ensure_all_connections(self): "Internal function. Refils the pool with connections. Note this also removes disconnected boxes that are currently in use." + #self.log("Waiting for lock") with self.lock: toremove=[] for mb in self.pool+self.taken: @@ -81,49 +100,67 @@ class MailBoxPool(): if mb in self.pool: self.pool.remove(mb) if mb in self.taken: self.taken.remove(mb) - boxesleft=len(self.pool)+len(self.taken) - - for i in range(self.n - boxesleft): + n=self.n-self.get_pool_size() + if n: self.log("Creating %d new IMAP connections"%n) + for i in range(n): try: mb=self.new_box() + self.pool.append(mb) self.boxreturned.set() except Exception as e: self.errors.append(e) def new_box(self): "Interal function. Generates a new box. Note: this does NOT add it to the pool." - mb=MailBox(host=self.host,port=self.port,timeout=self.timeout, + mb=PooledMailBox(pool=self,host=self.host,port=self.port,timeout=self.timeout, certfile=self.certfile,keyfile=self.keyfile,ssl_context=self.ssl_context) mb.login(username=self.username,password=self.password,initial_folder=self.initial_folder) return mb - __public_=["get_pool_size","get_box","release", "errors"] - def get_pool_size(self): "Returns the total pool size, free and taken connections." return len(self.pool)+len(self.taken) def get_box(self): "Gets a new mailbox from the pool" + self.initalised.wait() while True: + self.log("Waiting for lock (available=%d,taken=%d)"%(len(self.pool),len(self.taken))) # NOTE:testlog with self.lock: + self.log("Aquired") # NOTE:testlog + if self.get_pool_size()<1: + raise PoolEmpty("No connections in pool!") self.ensure_all_connections() if len(self.pool)>0: self.boxreturned.clear() mb=self.pool.pop(0) self.taken.append(mb) return mb + self.log("No boxes") self.boxreturned.wait() def release(self,mb:PooledMailBox): "Returns a mailbox back to the pool. Please use a context manager instead of manually releasing." + self.initalised.wait() + self.log("Waiting for lock, trying to release a connection") # NOTE:testlog with self.lock: + self.log("Aquired") # NOTE:testlog if mb in self.taken: self.taken.remove(mb) self.pool.append(mb) self.boxreturned.set() + + def close(self): + "Closes all the mailboxes" + self.initalised.set() # to force an exit wherever possible + while len(self.pool)+len(self.taken)>0: + with self.lock: + while len(self.pool)>0: + mb=self.pool.pop(0) + MailBox.logout(mb) # pass it this way bc we overwrite logout() + self.boxreturned.wait(.5) |
