import threading
from os import getenv
from queue import Queue, Empty
from html import escape as html_escape
from cherrypy.process import plugins
import cherrypy
import re
import imap_tools
from imap_tools import MailBox, MailMessage
from .db import DBPoolManager
from .types import Site
from . import html
class ImapPlugin(plugins.SimplePlugin):
def __init__(self,bus,dbpool:DBPoolManager,site:Site):
plugins.SimplePlugin.__init__(self,bus)
self.dbpool=dbpool
self.site=site
# block: get configuration variables from env
self.imap_server=getenv("ACIT_IMAP_SERVER")
self.imap_user=getenv("ACIT_IMAP_USER")
self.imap_pass=getenv("ACIT_IMAP_PASS")
self.imap_port=getenv("ACIT_IMAP_PORT",0)
self.smtp_server=getenv("ACIT_SMTP_SERVER")
self.smtp_user=getenv("ACIT_SMTP_USER")
self.smtp_pass=getenv("ACIT_SMTP_PASS")
self.smtp_port=getenv("ACIT_SMTP_PORT",0)
if not ( self.imap_server and self.imap_user and self.smtp_server and self.smtp_user ):
raise ValueError("Missing ACIT_IMAP_SERVER, ACIT_IMAP_USER, ACIT_SMTP_SERVER, or ACIT_SMTP_USER")
# end block
# block: make storage attributes
self.mailin_thread=None
self.mailout_thread=None
self.regen_worker_thread=None
self.stopping=threading.Event()
self.mailout_queue=Queue()
self.regen_queue=Queue()
self.mailbox_per_thread={}
# end block
def start(self):
self.mlog("Starting email-related loops, doing setup")
# block: prepare threads
self.stopping.clear()
self.mailin_thread=threading.Thread(target=self.imap_loop_controller)
self.mailout_thread=threading.Thread(target=self.smtp_loop)
self.regen_worker_thread=threading.Thread(target=self.regen_worker)
# end block
# block: fetch configuration around email address usage from env
self.uses_aliases=getenv("ACIT_MAIL_USES_ALIASES",False)
name,domain=self.imap_user.rsplit("@",1)
self.emaildomain=getenv("ACIT_MAIL_DOMAIN",domain)
self.emailname=getenv("ACIT_MAIL_NAME",name)
if self.uses_aliases: # if we use aliases, we match project#bug@example.com
self.addr_format="{proj}#{bug}@"+self.emaildomain
self.addr_regex="[^@#]*(#[0-9]*)?@"+ self.emaildomain.replace(".","\\.")
else: # if we use plus addresses, we match name+project#bug@example.com
self.addr_format=self.emailname+"+{proj}#{bug}@"+self.emaildomain
self.addr_regex=self.emailname+r"+[^@#]*(#[0-9]*)?@"+self.emaildomain.replace(".","\\.")
# end block
self.mlog("Starting threads")
self.mailin_thread.start()
self.regen_worker_thread.start()
threading.Thread(target=self.discover_projects).start()
#self.mailout_thread.start()
self.mlog("Done")
def discover_projects(self):
"Responsible for running through the imap structure, and queueing page generation for all bugs and projects"
threading.current_thread().setName("discovery")
self.mlog("Enlisting all currently known bugs for page generation...")
with self.get_MailBox() as mailbox:
mailbox.folder.set(self.ensurefolder(mailbox,"bugs")) # cd into bugs folder
for folder in mailbox.folder.list("bugs"): # iterate over all subfolders
# block: fetch bugname from foldername
path=folder.name.split(folder.delim)
if len(path)==3:
try:
proj=path[1]
bug=int(path[2])
except ValueError: # if bug isn't a number, int() raises ValueError
continue
# end block
#self.mlog("Found %s#%d"%(proj,bug))
self.regen_queue.put((proj,bug),block=True) # enqueue update
self.mlog("Done")
def get_full_projectname(self,proj):
projects=["cgit","acit","folder","subfolder/folder", "whohoo/test","public/test"]
matches=[]
for project in projects:
if proj==project:
matches.clear()
matches.append(project)
break
else:
if ('/'+project).endswith(proj):
matches.append(proj)
return matches
def stripInfoFromMailAddr(self,address:str):
"matches bugnumber and projectname from email address"
addr=address.removesuffix("@"+self.emaildomain)
if not self.uses_aliases:
if not '+' in addr:
return (None,None)
addr=addr.removeprefix(self.emailname)
addr=addr.removeprefix("+")
if '#' in addr:
proj,bug=addr.rsplit("#",1)
else:
proj=addr
bug=None
return (proj,bug)
def ensurefolder(self,mailbox:MailBox,*path):
"makes sure a folder exists on the mailserver"
# assuming from rfc3501 section 7.2.2:
# > All children of a top-level hierarchy node MUST use
# > the same separator character.
# hoping the whole mailserver uses the same delimiter
delim=mailbox.folder.list('')[0].delim
fname=delim.join([ str(i) for i in path])
if not mailbox.folder.exists(fname):
mailbox.folder.create(fname)
mailbox.folder.subscribe(fname,True)
return fname
def get_bug_folder(self,mailbox:MailBox,proj,bug):
"helper to format the path to a bug's folder and ensure it's existence"
return self.ensurefolder(mailbox,"bugs",proj,str(bug))
def get_MailBox(self):
"get a new mailbox, though only allows one mailbox per thread."
#tid=threading.current_thread()
#if not tid in self.mailbox_per_thread:
# self.mailbox_per_thread[tid]=MailBox(self.imap_server).login(self.imap_user,self.imap_pass, None)
#return self.mailbox_per_thread[tid]
return MailBox(self.imap_server).login(self.imap_user,self.imap_pass, None)
def regen_worker(self):
"""
Listens to a queue and calls the page regenerator when requested via the queue.
Use by putting a tuple `(project,bugid)` in :attr:`regen_queue`
"""
threading.current_thread().setName("PageGenerator")
with self.get_MailBox() as mailbox:
dolog=True
while not self.stopping.is_set():
if dolog and self.regen_queue.empty():
self.mlog("Ready, waiting for items to enter queue...")
try:
proj,bug=self.regen_queue.get(timeout=1)
except Empty:
dolog=False
continue
self.mlog("Starting regen for %s/%s"%(proj or "",bug or ""))
self.generate_page(mailbox,proj,bug)
dolog=True
def generate_page(self,mailbox:MailBox,project=None,bug=None):
"Responsible for regenerating pages. Only generates bug pages, forwards any request to generate project pages to that function."
if not bug and not project:
return
if project and not bug:
return self.generate_project_page(mailbox,project)
try:
# cd into mailbox folder
mailbox.folder.set(self.get_bug_folder(mailbox,project,bug))
pagelimit=25 # confusingly sets the limit on how many emails per page.
currentpage=""
pagenr=0
emailsonpage=0
og_message=None
from .util import (
lookahead, # this allows us to detect when we reached the last message in the mailbox
email2html
)
for msg, last in lookahead(mailbox.fetch()):
# block: allow showing original message & subject everywhere by storing it
if pagenr==0 and emailsonpage==0: # first email
og_message=msg
else:
# show an expandable thing with the original thing at the top of every page
currentpage+='Click to expand original report
'
currentpage+=html_escape(og_message.text)
currentpage+=' '
# end block
# block: page generation
#currentpage+=''
#currentpage+=html_escape(msg.text)
#currentpage+='
'
currentpage+=email2html(msg.text)
# end block
emailsonpage+=1
# block: complete page formatting, register page
if emailsonpage>=pagelimit or last:
resultingpage=html.mailpage.format(
project=project,
bug=bug,
subject=og_message.subject if og_message else "",
content=currentpage,
pagenr=pagenr
)
# subblock: register page
path="%s / %d / %d"%(project,bug,pagenr)
cherrypy.engine.publish(
"newpage",
path=path,
content=resultingpage
)
# also register without page number if first page
if pagenr==0:
cherrypy.engine.publish(
"newpage",
path="%s/%d"%(project,bug),
content=resultingpage
)
# end subblock
# reset page-specific trackers
currentpage=""
emailsonpage=0
pagenr+=1
# end block
except Exception:
self.mlog("Error occured generating bug page for %s/%d"%(project,bug),traceback=True)
def generate_project_page(self,mailbox,proj):
return
def imap_magic(self,mailbox:MailBox):
mailbox.folder.set("INBOX")
# block: wait 5 minutes and poll after that
mailbox.idle.start()
if self.stopping.wait(5): # if not stopping, this just times out after 300 seconds, so this is a nice timer
mailbox.idle.stop()
return
responses=mailbox.idle.poll(timeout=1)
mailbox.idle.stop()
# end block
if responses or mailbox.folder.status()["MESSAGES"]>0:
refreshable={}
for msg in mailbox.fetch():
self.mlog("Processing email with subject '%s'"%msg.subject)
for addr in msg.to + msg.cc + msg.bcc + msg.reply_to:
if re.fullmatch(self.addr_regex,addr):
proj,bug=self.stripInfoFromMailAddr(addr)
break
else:
proj=None
bug=None
# block: make sure a project was specified
if not proj:
self.mlog("No project specified.")
self.mail_error(msg,"Please specify a project by mailing to:\n "+\
("" if self.uses_aliases else self.emailname+"+")+"PROJECT@"+self.emaildomain+\
"\nwhere PROJECT is the name of your target project")
self.move_errored_mail(mailbox,msg)
continue
# end block
# block: make sure project exists
proj_matches=self.get_full_projectname(proj)
if not proj_matches:
self.mlog("Received email for nonexistent project %s"%proj)
self.mail_error(msg,notice="Project '%s' doesn't exist"%proj)
self.move_errored_mail(mailbox,msg)
continue
# end block
# block: make sure only 1 project matches
if len(proj_matches)>1:
self.mlog("Conficting projectname. Sending projectlist.")
self.mail_error(msg,notice="Multiple projects found to match your query. Please specify. Options:\n%s"%"\n".join(proj_matches))
self.move_errored_mail(mailbox,msg)
continue
proj=proj_matches[0]
# end block
# block: parse bug id
if not bug:
if re.match(r"^\[PATCH.*\]",msg.subject):
bugtype="PATCH"
elif re.match(r"^\[DISCUSSION.*\]"):
bugtype="DISCUS"
else:
bugtype="BUG"
bug=self.site.newbug(proj,bugtype=bugtype)
bug.subject=msg.subject[:1024]
bug.description=msg.text[:65535] # TODO: don't thruncate silently, send error to user.
self.mlog("Assigned new bugnr %d to '%s'"%(bug,msg.subject))
try:
bug=int(bug)
except ValueError as e:
self.mlog("Error decoding value to int:",e,traceback=True)
self.mail_error(msg,notice="Exception while trying to convert bug number to integer",exception=e)
self.move_errored_mail(mailbox,msg)
continue
# end block
# block: move mail to folder specific for this project/bug
try:
path=self.get_bug_folder(mailbox,proj,bug)
mailbox.move([msg.uid], path)
refreshable.setdefault(proj,[]).append(bug)
except Exception:
self.mlog("Error processing email '%s' for %s/%d"%(msg.subject,proj,bug),traceback=True)
# end block
# block: update all webpages that received new mail
for proj,bugs in refreshable.items():
# project page needs to be regenerated too (counters)
self.regen_queue.put((proj,None),block=True)
for bug in bugs:
self.regen_queue.put((proj,bug),block=True)
# end block
def imap_loop_controller(self):
"Responsible for running imap_magic() repeatedly, and handling its errors."
threading.current_thread().setName("IMAPrunner")
self.mlog("Connecting to IMAP")
try:
with self.get_MailBox() as mailbox:
self.mlog("IMAP monitor thread started (connected)")
self.ensurefolder(mailbox,"INBOX")
while not self.stopping.is_set():
try:
self.imap_magic(mailbox)
except Exception:
import traceback
exc=traceback.format_exc()
self.mlog("Exception occured:\n%s"%exc)
self.mlog("!! this may lead to emails not appearing or appearing later !!")
except imap_tools.errors.MailboxLoginError:
self.mlog("Error logging in.")
cherrypy.engine.exit()
self.mlog("IMAP monitor thread stopped.")
def mail_error(self,msg:MailMessage,notice:str=None,exception:Exception=None):
pass
def move_errored_mail(self,mailbox:MailBox,msg:MailMessage,):
target=self.ensurefolder(mailbox,"INBOX","Errors")
return mailbox.move(msg.uid,target)
def smtp_loop(self):
pass
def stop(self):
"Sets stopping signal, waits for all threads to stop"
self.mlog("Stopping. This can take a while.")
self.stopping.set()
for thread in ( self.mailin_thread, self.mailout_thread, self.regen_worker_thread ):
if thread.is_alive() and not thread==threading.current_thread():
thread.join()
self.mlog("Stopped")
def mlog(self,*msg,**kwargs):
import traceback
function=traceback.extract_stack(limit=2)[0].name
cherrypy.log(context="%s>>MAIL:%s"%(threading.current_thread().name, function), msg=" ".join([str(i) for i in msg]),**kwargs)