aboutsummaryrefslogtreecommitdiffstats
path: root/src/acit/imapplugin.py
diff options
context:
space:
mode:
authorVosjedev <vosje@vosjedev.net>2025-11-02 13:56:16 +0100
committerVosjedev <vosje@vosjedev.net>2025-11-02 13:56:16 +0100
commit60749b5099a8ba1d63e8cc313c451c8a4001a078 (patch)
treefc3aef71225a0cd8317efd4147b9d345378068d0 /src/acit/imapplugin.py
parent29989aface3276d36851c8601ea599ffbeaac471 (diff)
downloadacit-60749b5099a8ba1d63e8cc313c451c8a4001a078.tar.gz
acit-60749b5099a8ba1d63e8cc313c451c8a4001a078.tar.bz2
acit-60749b5099a8ba1d63e8cc313c451c8a4001a078.tar.xz
Too much (I tried to split it up but it won't work)
Diffstat (limited to 'src/acit/imapplugin.py')
-rw-r--r--src/acit/imapplugin.py436
1 files changed, 184 insertions, 252 deletions
diff --git a/src/acit/imapplugin.py b/src/acit/imapplugin.py
index f5a03a1..454904d 100644
--- a/src/acit/imapplugin.py
+++ b/src/acit/imapplugin.py
@@ -1,23 +1,20 @@
import threading
from os import getenv
-from queue import Queue, Empty
-
-from html import escape as html_escape
+from queue import Queue
from cherrypy.process import plugins
import cherrypy
import re
-import imap_tools
from imap_tools import MailBox, MailMessage
+from .imap_pool import MailBoxPool, PoolEmpty
+
from .db import DBPoolManager
from .types import Site
-from . import html
-
class ImapPlugin(plugins.SimplePlugin):
def __init__(self,bus,dbpool:DBPoolManager,site:Site):
plugins.SimplePlugin.__init__(self,bus)
@@ -27,7 +24,7 @@ class ImapPlugin(plugins.SimplePlugin):
self.imap_server=getenv("ACIT_IMAP_SERVER")
self.imap_user=getenv("ACIT_IMAP_USER")
self.imap_pass=getenv("ACIT_IMAP_PASS")
- self.imap_port=getenv("ACIT_IMAP_PORT",0)
+ self.imap_port=getenv("ACIT_IMAP_PORT",993)
self.smtp_server=getenv("ACIT_SMTP_SERVER")
self.smtp_user=getenv("ACIT_SMTP_USER")
@@ -38,24 +35,30 @@ class ImapPlugin(plugins.SimplePlugin):
raise ValueError("Missing ACIT_IMAP_SERVER, ACIT_IMAP_USER, ACIT_SMTP_SERVER, or ACIT_SMTP_USER")
# end block
+ self.mlog("IMAP config: %s @ %s : %d"%(self.imap_user,self.imap_server,self.imap_port))
+
# block: make storage attributes
self.mailin_thread=None
self.mailout_thread=None
- self.regen_worker_thread=None
self.stopping=threading.Event()
self.mailout_queue=Queue()
- self.regen_queue=Queue()
self.mailbox_per_thread={}
# end block
+
+ self.mbpool=MailBoxPool(
+ host=self.imap_server,port=self.imap_port,username=self.imap_user,password=self.imap_pass,
+ connection_n=int(getenv("ACIT_IMAP_POOL_SIZE",4))
+ )
+
+ self.index_lock=threading.Lock()
def start(self):
- self.mlog("Starting email-related loops, doing setup")
+ #self.mlog("Starting email-related loops, doing setup") # no need to log for quick stuff like this
# block: prepare threads
self.stopping.clear()
self.mailin_thread=threading.Thread(target=self.imap_loop_controller)
self.mailout_thread=threading.Thread(target=self.smtp_loop)
- self.regen_worker_thread=threading.Thread(target=self.regen_worker)
# end block
# block: fetch configuration around email address usage from env
@@ -73,51 +76,79 @@ class ImapPlugin(plugins.SimplePlugin):
self.addr_format=self.emailname+"+{proj}#{bug}@"+self.emaildomain
self.addr_regex=self.emailname+r"+[^@#]*(#[0-9]*)?@"+self.emaildomain.replace(".","\\.")
# end block
+
+ self.mlog("Starting mailbox pool. This can take a while.")
+ self.mbpool.open()
+ pool=self.mbpool.get_pool_size()
+ if not pool:
+ self.mlog("Failed to put anything in the pool (%d)."%pool)
+ self.mlog("Errors:")
+ for e in self.mbpool.errors:
+ self.mlog(e)
+
+ self.mlog("\n\nNOTE!!! THIS WILL MAKE ACIT UNUSABLE, BECAUSE IT WON'T BE ABLE TO INTERACT WITH EMAIL.\n"
+ " Please check your IMAP configuration.\n")
+
+ if not self.dbpool.poolStartedEvent.is_set():
+ cherrypy.engine.subscribe("db-started",self.update_index)
+ else:
+ self.update_index()
+
self.mlog("Starting threads")
self.mailin_thread.start()
- self.regen_worker_thread.start()
- threading.Thread(target=self.discover_projects).start()
#self.mailout_thread.start()
self.mlog("Done")
- def discover_projects(self):
- "Responsible for running through the imap structure, and queueing page generation for all bugs and projects"
- threading.current_thread().setName("discovery")
- self.mlog("Enlisting all currently known bugs for page generation...")
- with self.get_MailBox() as mailbox:
-
- mailbox.folder.set(self.ensurefolder(mailbox,"bugs")) # cd into bugs folder
- for folder in mailbox.folder.list("bugs"): # iterate over all subfolders
-
- # block: fetch bugname from foldername
- path=folder.name.split(folder.delim)
- if len(path)==3:
- try:
- proj=path[1]
- bug=int(path[2])
- except ValueError: # if bug isn't a number, int() raises ValueError
- continue
- # end block
+ def update_index(self,folders:list[str,int]=None):
+ "Note folder is a list of tuples consisting of trackername, bugid"
+ with self.dbpool.get_connection() as conn, conn.cursor() as cur, self.get_MailBox() as mb, self.index_lock:
+ cur.execute("CREATE TABLE IF NOT EXISTS msgindex ("
+ "tracker VARCHAR(80),"
+ "bugid INT,"
+ "messageid TINYTEXT UNIQUE"
+ ")"
+ )
- #self.mlog("Found %s#%d"%(proj,bug))
- self.regen_queue.put((proj,bug),block=True) # enqueue update
+ if not folders:
+ self.mlog("Full-updating message index...")
+ cur.execute("SELECT tracker,bugid FROM bugs")
+ folders=[ (tracker, bugid) for tracker,bugid in cur ]
+
+ for tracker,bugid in folders:
+ try:
+ #self.mlog("Updating index for %s/%d"%(tracker,bugid))
+ mb.folder.set(self.get_bug_folder(mb,tracker,bugid))
+ for msg in mb.fetch():
+ if "message-id" in msg.headers:
+ cur.execute("REPLACE INTO msgindex VALUES (?,?,?)",(tracker,bugid,msg.headers["message-id"][:255]))
+
+ except Exception as e:
+ self.mlog("Error while indexing mailbox of %s/%d: %s"%(tracker,bugid,e))
+
+ conn.commit()
+ self.mlog("Updated message index")
+
+ def find_in_reply_to(self,messageid):
+ with self.dbpool.get_connection() as conn, conn.cursor() as cur:
+ cur.execute("SELECT tracker,bugid FROM msgindex WHERE messageid=? LIMIT 1",(messageid,))
+ return cur.fetchone()
+
+ def format_emailaddr(self,project,bugid=None,subject=None):
+ email=self.addr_format.format(proj=project, bug=bugid)
+ email=email.replace("#None",'')
+ if subject:
+ from urllib.parse import quote as quote
+ email+='?subject='
+ email+=quote(subject, safe='')
+ return email
- self.mlog("Done")
def get_full_projectname(self,proj):
- projects=["cgit","acit","folder","subfolder/folder", "whohoo/test","public/test"]
- matches=[]
- for project in projects:
- if proj==project:
- matches.clear()
- matches.append(project)
- break
- else:
- if ('/'+project).endswith(proj):
- matches.append(proj)
- return matches
+ "returns results from ``self.site.findtrackers()`` without modification"
+ return self.site.findtrackers(proj)
+
def stripInfoFromMailAddr(self,address:str):
"matches bugnumber and projectname from email address"
@@ -152,238 +183,136 @@ class ImapPlugin(plugins.SimplePlugin):
mailbox.folder.subscribe(fname,True)
return fname
- def get_bug_folder(self,mailbox:MailBox,proj,bug):
+ def get_bug_folder(self,mailbox:MailBox,proj,bug=None):
"helper to format the path to a bug's folder and ensure it's existence"
- return self.ensurefolder(mailbox,"bugs",proj,str(bug))
+ path=["bugs"]
+ path.extend(proj.split('/'))
+ if bug: path.append(str(bug))
+ return self.ensurefolder(mailbox,*path)
def get_MailBox(self):
- "get a new mailbox, though only allows one mailbox per thread."
- #tid=threading.current_thread()
- #if not tid in self.mailbox_per_thread:
- # self.mailbox_per_thread[tid]=MailBox(self.imap_server).login(self.imap_user,self.imap_pass, None)
- #return self.mailbox_per_thread[tid]
- return MailBox(self.imap_server).login(self.imap_user,self.imap_pass, None)
-
-
- def regen_worker(self):
- """
- Listens to a queue and calls the page regenerator when requested via the queue.
- Use by putting a tuple `(project,bugid)` in :attr:`regen_queue`
- """
- threading.current_thread().setName("PageGenerator")
- with self.get_MailBox() as mailbox:
- dolog=True
- while not self.stopping.is_set():
- if dolog and self.regen_queue.empty():
- self.mlog("Ready, waiting for items to enter queue...")
- try:
- proj,bug=self.regen_queue.get(timeout=1)
- except Empty:
- dolog=False
- continue
- self.mlog("Starting regen for %s/%s"%(proj or "",bug or ""))
- self.generate_page(mailbox,proj,bug)
- dolog=True
-
+ "get a new mailbox connection from the pool"
+ return self.mbpool.get_box()
- def generate_page(self,mailbox:MailBox,project=None,bug=None):
- "Responsible for regenerating pages. Only generates bug pages, forwards any request to generate project pages to that function."
- if not bug and not project:
+ def imap_magic(self):
+ if self.stopping.wait(5): # if not stopping, this just times out after x seconds, so this is a nice timer
return
- if project and not bug:
- return self.generate_project_page(mailbox,project)
-
- try:
- # cd into mailbox folder
- mailbox.folder.set(self.get_bug_folder(mailbox,project,bug))
- pagelimit=25 # confusingly sets the limit on how many emails per page.
- currentpage=""
- pagenr=0
- emailsonpage=0
- og_message=None
-
- from .util import (
- lookahead, # this allows us to detect when we reached the last message in the mailbox
- email2html
- )
-
- for msg, last in lookahead(mailbox.fetch()):
- # block: allow showing original message & subject everywhere by storing it
- if pagenr==0 and emailsonpage==0: # first email
- og_message=msg
- else:
- # show an expandable thing with the original thing at the top of every page
- currentpage+='<details class="ogmail"><summary>Click to expand original report</summary>'
- currentpage+=html_escape(og_message.text)
- currentpage+='</details>'
- # end block
-
- # block: page generation
- #currentpage+='<section class="email">'
- #currentpage+=html_escape(msg.text)
- #currentpage+='</section><br>'
- currentpage+=email2html(msg.text)
- # end block
- emailsonpage+=1
-
- # block: complete page formatting, register page
- if emailsonpage>=pagelimit or last:
- resultingpage=html.mailpage.format(
- project=project,
- bug=bug,
- subject=og_message.subject if og_message else "<undefined>",
- content=currentpage,
- pagenr=pagenr
- )
-
- # subblock: register page
- path="%s / %d / %d"%(project,bug,pagenr)
- cherrypy.engine.publish(
- "newpage",
- path=path,
- content=resultingpage
- )
- # also register without page number if first page
- if pagenr==0:
- cherrypy.engine.publish(
- "newpage",
- path="%s/%d"%(project,bug),
- content=resultingpage
- )
- # end subblock
-
- # reset page-specific trackers
- currentpage=""
- emailsonpage=0
- pagenr+=1
- # end block
-
- except Exception:
- self.mlog("Error occured generating bug page for %s/%d"%(project,bug),traceback=True)
-
- def generate_project_page(self,mailbox,proj):
- return
+ refreshable={}
+ with self.get_MailBox() as mailbox:
+ mailbox.folder.set("INBOX")
-
- def imap_magic(self,mailbox:MailBox):
- mailbox.folder.set("INBOX")
+ if mailbox.folder.status()["MESSAGES"]>0:
- # block: wait 5 minutes and poll after that
- mailbox.idle.start()
+ for msg in mailbox.fetch():
+ self.mlog("Processing email with subject '%s'"%msg.subject)
- if self.stopping.wait(5): # if not stopping, this just times out after 300 seconds, so this is a nice timer
- mailbox.idle.stop()
- return
- responses=mailbox.idle.poll(timeout=1)
- mailbox.idle.stop()
- # end block
+ for addr in msg.to + msg.cc + msg.bcc + msg.reply_to:
+ if re.fullmatch(self.addr_regex,addr):
+ proj,bug=self.stripInfoFromMailAddr(addr)
+ break
+ else:
+ proj=None
+ bug=None
- if responses or mailbox.folder.status()["MESSAGES"]>0:
- refreshable={}
+ if "in-reply-to" in msg.headers:
+ self.mlog("Using In-Reply-To header to figure out meta")
+ replyid=msg.headers["in-reply-to"]
+ data=self.find_in_reply_to(replyid)
+ if data:
+ proj,bug=data
- for msg in mailbox.fetch():
- self.mlog("Processing email with subject '%s'"%msg.subject)
+ # block: make sure a project was specified
+ if not proj:
+ self.mlog("No project specified.")
+ self.mail_error(msg,"Please specify a project by mailing to:\n "+\
+ ("" if self.uses_aliases else self.emailname+"+")+"PROJECT@"+self.emaildomain+\
+ "\nwhere PROJECT is the name of your target project")
+ self.move_errored_mail(mailbox,msg)
+ continue
+ # end block
- for addr in msg.to + msg.cc + msg.bcc + msg.reply_to:
- if re.fullmatch(self.addr_regex,addr):
- proj,bug=self.stripInfoFromMailAddr(addr)
- break
- else:
- proj=None
- bug=None
+ # block: make sure project exists
+ proj_matches=self.get_full_projectname(proj)
+ if not proj_matches:
+ self.mlog("Received email for nonexistent project %s"%proj)
+ self.mail_error(msg,notice="Project '%s' doesn't exist"%proj)
+ self.move_errored_mail(mailbox,msg)
+ continue
+ # end block
- # block: make sure a project was specified
- if not proj:
- self.mlog("No project specified.")
- self.mail_error(msg,"Please specify a project by mailing to:\n "+\
- ("" if self.uses_aliases else self.emailname+"+")+"PROJECT@"+self.emaildomain+\
- "\nwhere PROJECT is the name of your target project")
- self.move_errored_mail(mailbox,msg)
- continue
- # end block
+ # block: make sure only 1 project matches
+ if len(proj_matches)>1:
+ self.mlog("Conficting projectname. Sending projectlist.")
+ self.mail_error(msg,notice="Multiple projects found to match your query. Please specify. Options:\n%s"%"\n".join(proj_matches))
+ self.move_errored_mail(mailbox,msg)
+ continue
- # block: make sure project exists
- proj_matches=self.get_full_projectname(proj)
- if not proj_matches:
- self.mlog("Received email for nonexistent project %s"%proj)
- self.mail_error(msg,notice="Project '%s' doesn't exist"%proj)
- self.move_errored_mail(mailbox,msg)
- continue
- # end block
+ proj=proj_matches[0]
+ # end block
- # block: make sure only 1 project matches
- if len(proj_matches)>1:
- self.mlog("Conficting projectname. Sending projectlist.")
- self.mail_error(msg,notice="Multiple projects found to match your query. Please specify. Options:\n%s"%"\n".join(proj_matches))
- self.move_errored_mail(mailbox,msg)
- continue
+ # block: parse bug id
+ if not bug:
+ if re.match(r"^\[PATCH.*\]",msg.subject):
+ bugtype="PATCH"
+ elif re.match(r"^\[DISCUSSION.*\]",msg.subject):
+ bugtype="DISCUS"
+ else:
+ bugtype="BUG"
+ bug=self.site.newbug(proj,bugtype=bugtype)
+ bug.subject=msg.subject[:1024]
+ bug.description=\
+ 'No description written.\nFirst email in thread:\n\n'+msg.text[:65535] # TODO: don't thruncate silently, send error to user.
+ self.mlog("Assigned new bugnr %d to '%s'"%(bug.bugid,msg.subject))
+ bug=bug.bugid
- proj=proj_matches[0]
- # end block
+ try:
+ bug=int(bug)
+ except ValueError as e:
+ self.mlog("Error decoding value to int:",e,traceback=True)
- # block: parse bug id
- if not bug:
- if re.match(r"^\[PATCH.*\]",msg.subject):
- bugtype="PATCH"
- elif re.match(r"^\[DISCUSSION.*\]"):
- bugtype="DISCUS"
- else:
- bugtype="BUG"
- bug=self.site.newbug(proj,bugtype=bugtype)
- bug.subject=msg.subject[:1024]
- bug.description=msg.text[:65535] # TODO: don't thruncate silently, send error to user.
- self.mlog("Assigned new bugnr %d to '%s'"%(bug,msg.subject))
+ self.mail_error(msg,notice="Exception while trying to convert bug number to integer",exception=e)
+ self.move_errored_mail(mailbox,msg)
+ continue
+ # end block
+
+ # block: move mail to folder specific for this project/bug
+ try:
+ path=self.get_bug_folder(mailbox,proj,bug)
+ mailbox.move([msg.uid], path)
+ refreshable.setdefault(proj,[]).append(bug)
+ except Exception:
+ self.mlog("Error processing email '%s' for %s/%d"%(msg.subject,proj,bug),traceback=True)
+ # end block
- try:
- bug=int(bug)
- except ValueError as e:
- self.mlog("Error decoding value to int:",e,traceback=True)
+ # block: update all webpages that received new mail
+ for proj,bugs in refreshable.items():
+ # project page needs to be regenerated too (counters)
+ cherrypy.engine.publish("regen",proj,None)
+ for bug in bugs:
+ cherrypy.engine.publish("regen",proj,bug)
- self.mail_error(msg,notice="Exception while trying to convert bug number to integer",exception=e)
- self.move_errored_mail(mailbox,msg)
- continue
# end block
-
- # block: move mail to folder specific for this project/bug
- try:
- path=self.get_bug_folder(mailbox,proj,bug)
- mailbox.move([msg.uid], path)
- refreshable.setdefault(proj,[]).append(bug)
- except Exception:
- self.mlog("Error processing email '%s' for %s/%d"%(msg.subject,proj,bug),traceback=True)
- # end block
-
- # block: update all webpages that received new mail
- for proj,bugs in refreshable.items():
- # project page needs to be regenerated too (counters)
- self.regen_queue.put((proj,None),block=True)
- for bug in bugs:
- self.regen_queue.put((proj,bug),block=True)
- # end block
+ if refreshable:
+ self.update_index([proj,bug] for bug in bugs for proj,bugs in refreshable.items())
def imap_loop_controller(self):
"Responsible for running imap_magic() repeatedly, and handling its errors."
threading.current_thread().setName("IMAPrunner")
- self.mlog("Connecting to IMAP")
- try:
- with self.get_MailBox() as mailbox:
- self.mlog("IMAP monitor thread started (connected)")
- self.ensurefolder(mailbox,"INBOX")
- while not self.stopping.is_set():
- try:
- self.imap_magic(mailbox)
- except Exception:
- import traceback
- exc=traceback.format_exc()
- self.mlog("Exception occured:\n%s"%exc)
- self.mlog("!! this may lead to emails not appearing or appearing later !!")
-
- except imap_tools.errors.MailboxLoginError:
- self.mlog("Error logging in.")
- cherrypy.engine.exit()
+ self.mlog("IMAP monitor thread started.")
+ while not self.stopping.is_set():
+ try:
+ self.imap_magic()
+ except PoolEmpty:
+ self.mlog("IMAP pool empty, unable to continue.")
+ break
+ except Exception:
+ import traceback
+ exc=traceback.format_exc()
+ self.mlog("Exception occured:\n%s"%exc)
+ self.mlog("!! this may lead to emails not appearing or appearing later !!")
self.mlog("IMAP monitor thread stopped.")
@@ -403,9 +332,12 @@ class ImapPlugin(plugins.SimplePlugin):
"Sets stopping signal, waits for all threads to stop"
self.mlog("Stopping. This can take a while.")
self.stopping.set()
- for thread in ( self.mailin_thread, self.mailout_thread, self.regen_worker_thread ):
+ for thread in ( self.mailin_thread, self.mailout_thread ):
if thread.is_alive() and not thread==threading.current_thread():
+ self.mlog("Waiting for thread: %s"%thread.name)
thread.join()
+ self.mlog("Closing IMAP pool")
+ self.mbpool.close()
self.mlog("Stopped")
def mlog(self,*msg,**kwargs):