1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
from typing import List
from imaplib import IMAP4
import imap_tools
from imap_tools.errors import MailboxLoginError, MailboxLogoutError
from imap_tools import MailBox, MailMessage
import socket
from threading import Event, RLock
class PooledMailBox(MailBox):
def __init__(self,pool,*args,**kwargs):
MailBox.__init__(*args,**kwargs)
self._mailbox_pool=pool
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.logout()
def logout(self):
self._mailbox_pool.release(self)
class MailBoxPool():
"A pool of mailboxes."
def __init__(self, username:str, password:str, initial_folder:str='INBOX', # for MailBox.login()
connection_n=4, # for ourselves
host='', port=993, timeout=None, # for MailBox.__init__()
keyfile=None, certfile=None, ssl_context=None # for MailBox.__init()
):
self.n=connection_n
self.username=username
self.password=password
self.initial_folder=initial_folder
self.host=host
self.port=port
self.timeout=timeout
self.keyfile=keyfile
self.certfile=certfile
self.ssl_context=ssl_context
self.errors=[]
self.lock=RLock()
self.boxreturned=Event()
self.pool:List[MailBox]=[]
self.taken:List[MailBox]=[]
self.ensure_all_connections()
def is_alive(self,mb:MailBox):
"Internal function. Uses the provided mailbox to send a command to the server, checking whether it's still connected"
try:
mb.client.noop()
except (
TimeoutError, ConnectionError, # generic python errors
IMAP4.error, IMAP4.abort, # IMAP4 errors
):
return False
return True
def ensure_all_connections(self):
"Internal function. Refils the pool with connections. Note this also removes disconnected boxes that are currently in use."
with self.lock:
toremove=[]
for mb in self.pool+self.taken:
if not self.is_alive(mb):
toremove.append(mb)
for mb in toremove:
try:
mb.logout() # logout if needed
except:
pass
if mb in self.pool: self.pool.remove(mb)
if mb in self.taken: self.taken.remove(mb)
boxesleft=len(self.pool)+len(self.taken)
for i in range(self.n - boxesleft):
try:
mb=self.new_box()
self.boxreturned.set()
except Exception as e:
self.errors.append(e)
def new_box(self):
"Interal function. Generates a new box. Note: this does NOT add it to the pool."
mb=MailBox(host=self.host,port=self.port,timeout=self.timeout,
certfile=self.certfile,keyfile=self.keyfile,ssl_context=self.ssl_context)
mb.login(username=self.username,password=self.password,initial_folder=self.initial_folder)
return mb
__public_=["get_pool_size","get_box","release", "errors"]
def get_pool_size(self):
"Returns the total pool size, free and taken connections."
return len(self.pool)+len(self.taken)
def get_box(self):
"Gets a new mailbox from the pool"
while True:
with self.lock:
self.ensure_all_connections()
if len(self.pool)>0:
self.boxreturned.clear()
mb=self.pool.pop(0)
self.taken.append(mb)
return mb
self.boxreturned.wait()
def release(self,mb:PooledMailBox):
"Returns a mailbox back to the pool. Please use a context manager instead of manually releasing."
with self.lock:
if mb in self.taken:
self.taken.remove(mb)
self.pool.append(mb)
self.boxreturned.set()
|