aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVosjedev <vosje@vosjedev.net>2025-11-02 13:02:23 +0100
committerVosjedev <vosje@vosjedev.net>2025-11-02 13:02:23 +0100
commit275a4c5c7ce41da817f1427a7c6837297771976e (patch)
tree68e197631c14cf2237325846a37a9e0f2b3646d4
parentb0b77c997574c633304cf621bd2e74fa2d3a39e6 (diff)
downloadacit-275a4c5c7ce41da817f1427a7c6837297771976e.tar.gz
acit-275a4c5c7ce41da817f1427a7c6837297771976e.tar.bz2
acit-275a4c5c7ce41da817f1427a7c6837297771976e.tar.xz
Finish the pool
-rw-r--r--src/acit/imap_pool.py61
1 files changed, 49 insertions, 12 deletions
diff --git a/src/acit/imap_pool.py b/src/acit/imap_pool.py
index 7a7017b..0447e9b 100644
--- a/src/acit/imap_pool.py
+++ b/src/acit/imap_pool.py
@@ -3,17 +3,20 @@ from typing import List
from imaplib import IMAP4
-import imap_tools
-from imap_tools.errors import MailboxLoginError, MailboxLogoutError
-from imap_tools import MailBox, MailMessage
-
-import socket
+from imap_tools import MailBox
from threading import Event, RLock
+
+class PoolEmpty(Exception):
+ def __init__(self,*args,**kwargs):
+ Exception.__init__(self,*args,**kwargs)
+
+
+
class PooledMailBox(MailBox):
def __init__(self,pool,*args,**kwargs):
- MailBox.__init__(*args,**kwargs)
+ MailBox.__init__(self,*args,**kwargs)
self._mailbox_pool=pool
def __enter__(self):
@@ -29,6 +32,8 @@ class PooledMailBox(MailBox):
class MailBoxPool():
"A pool of mailboxes."
+ __public__=["open","close","get_pool_size","get_box","release", "errors"]
+
def __init__(self, username:str, password:str, initial_folder:str='INBOX', # for MailBox.login()
connection_n=4, # for ourselves
host='', port=993, timeout=None, # for MailBox.__init__()
@@ -47,12 +52,25 @@ class MailBoxPool():
self.errors=[]
+ self.initalised=Event()
+
self.lock=RLock()
self.boxreturned=Event()
self.pool:List[MailBox]=[]
self.taken:List[MailBox]=[]
+ def log(self,*msg,**kwargs):
+ import traceback, threading, cherrypy
+ tb=traceback.extract_stack(limit=4)
+ pparent=tb[0].name
+ parent=tb[2].name
+ cherrypy.log(context="%s>>IMAPPOOL:%s:%s"%(threading.current_thread().name, pparent,parent), msg=" ".join([str(i) for i in msg]),**kwargs)
+
+ def open(self):
+ "Initialises the pool"
self.ensure_all_connections()
+ self.initalised.set()
+ self.log("Made all connections")
def is_alive(self,mb:MailBox):
"Internal function. Uses the provided mailbox to send a command to the server, checking whether it's still connected"
@@ -67,6 +85,7 @@ class MailBoxPool():
def ensure_all_connections(self):
"Internal function. Refils the pool with connections. Note this also removes disconnected boxes that are currently in use."
+ #self.log("Waiting for lock")
with self.lock:
toremove=[]
for mb in self.pool+self.taken:
@@ -81,49 +100,67 @@ class MailBoxPool():
if mb in self.pool: self.pool.remove(mb)
if mb in self.taken: self.taken.remove(mb)
- boxesleft=len(self.pool)+len(self.taken)
-
- for i in range(self.n - boxesleft):
+ n=self.n-self.get_pool_size()
+ if n: self.log("Creating %d new IMAP connections"%n)
+ for i in range(n):
try:
mb=self.new_box()
+ self.pool.append(mb)
self.boxreturned.set()
except Exception as e:
self.errors.append(e)
def new_box(self):
"Interal function. Generates a new box. Note: this does NOT add it to the pool."
- mb=MailBox(host=self.host,port=self.port,timeout=self.timeout,
+ mb=PooledMailBox(pool=self,host=self.host,port=self.port,timeout=self.timeout,
certfile=self.certfile,keyfile=self.keyfile,ssl_context=self.ssl_context)
mb.login(username=self.username,password=self.password,initial_folder=self.initial_folder)
return mb
- __public_=["get_pool_size","get_box","release", "errors"]
-
def get_pool_size(self):
"Returns the total pool size, free and taken connections."
return len(self.pool)+len(self.taken)
def get_box(self):
"Gets a new mailbox from the pool"
+ self.initalised.wait()
while True:
+ self.log("Waiting for lock (available=%d,taken=%d)"%(len(self.pool),len(self.taken))) # NOTE:testlog
with self.lock:
+ self.log("Aquired") # NOTE:testlog
+ if self.get_pool_size()<1:
+ raise PoolEmpty("No connections in pool!")
self.ensure_all_connections()
if len(self.pool)>0:
self.boxreturned.clear()
mb=self.pool.pop(0)
self.taken.append(mb)
return mb
+ self.log("No boxes")
self.boxreturned.wait()
def release(self,mb:PooledMailBox):
"Returns a mailbox back to the pool. Please use a context manager instead of manually releasing."
+ self.initalised.wait()
+ self.log("Waiting for lock, trying to release a connection") # NOTE:testlog
with self.lock:
+ self.log("Aquired") # NOTE:testlog
if mb in self.taken:
self.taken.remove(mb)
self.pool.append(mb)
self.boxreturned.set()
+
+ def close(self):
+ "Closes all the mailboxes"
+ self.initalised.set() # to force an exit wherever possible
+ while len(self.pool)+len(self.taken)>0:
+ with self.lock:
+ while len(self.pool)>0:
+ mb=self.pool.pop(0)
+ MailBox.logout(mb) # pass it this way bc we overwrite logout()
+ self.boxreturned.wait(.5)