import threading from os import getenv from queue import Queue from cherrypy.process import plugins import cherrypy import re from imap_tools import MailBox, MailMessage from mariadb import Connection, Cursor from .imap_pool import MailBoxPool, PoolEmpty from .db import DBPoolManager from .types import Site, Bug from .smtpplugin import SmtpPlugin class ImapPlugin(plugins.SimplePlugin): def __init__(self,bus,dbpool:DBPoolManager,site:Site): plugins.SimplePlugin.__init__(self,bus) self.dbpool=dbpool self.site=site # block: get configuration variables from env self.imap_server=getenv("ACIT_IMAP_SERVER") self.imap_user=getenv("ACIT_IMAP_USER") self.imap_pass=getenv("ACIT_IMAP_PASS") self.imap_port=int(getenv("ACIT_IMAP_PORT",993)) if not ( self.imap_server and self.imap_user and self.imap_pass): raise ValueError("Missing ACIT_IMAP_SERVER, ACIT_IMAP_USER, or ACIT_IMAP_PASS") # end block self.mlog("IMAP config: %s @ %s : %d"%(self.imap_user,self.imap_server,self.imap_port)) # block: make storage attributes self.mailin_thread=None self.stopping=threading.Event() self.mailout_queue=Queue() self.mailbox_per_thread={} # end block self.mbpool=MailBoxPool( host=self.imap_server,port=self.imap_port,username=self.imap_user,password=self.imap_pass, connection_n=int(getenv("ACIT_IMAP_POOL_SIZE",4)) ) self.smtp=SmtpPlugin() self.index_lock=threading.Lock() def start(self): #self.mlog("Starting email-related loops, doing setup") # no need to log for quick stuff like this # block: prepare threads self.stopping.clear() self.mailin_thread=threading.Thread(target=self.imap_loop_controller) # end block # block: fetch configuration around email address usage from env self.uses_aliases=getenv("ACIT_MAIL_USES_ALIASES",False) name,domain=self.imap_user.rsplit("@",1) self.emaildomain=getenv("ACIT_MAIL_DOMAIN",domain) self.emailname=getenv("ACIT_MAIL_NAME",name) if self.uses_aliases: # if we use aliases, we match project#bug@example.com self.addr_format="{proj}#{bug}@"+self.emaildomain self.addr_regex="[^@#]*(#[0-9]*)?@"+ self.emaildomain.replace(".","\\.") else: # if we use plus addresses, we match name+project#bug@example.com self.addr_format=self.emailname+"+{proj}#{bug}@"+self.emaildomain self.addr_regex=self.emailname+r"+[^@#]*(#[0-9]*)?@"+self.emaildomain.replace(".","\\.") # end block self.smtp.domain=self.emaildomain self.mlog("Starting mailbox pool. This can take a while.") self.mbpool.open() pool=self.mbpool.get_pool_size() if not pool: self.mlog("Failed to put anything in the pool (%d)."%pool) self.mlog("Errors:") import traceback for e in self.mbpool.errors: tb=traceback.format_exception(e) self.mlog(repr(e),tb) self.mlog("\n\nNOTE!!! THIS WILL MAKE ACIT UNUSABLE, BECAUSE IT WON'T BE ABLE TO INTERACT WITH EMAIL.\n" " Please check your IMAP configuration.\n") if not self.dbpool.poolStartedEvent.is_set(): cherrypy.engine.subscribe("db-started",self.update_index) else: self.update_index() self.mlog("Starting threads") self.mailin_thread.start() self.smtp.start() self.mlog("Done") def update_index(self,folders:list[str,int]=None): "Note folder is a list of tuples consisting of trackername, bugid" with self.dbpool.get_connection() as conn, conn.cursor() as cur, self.get_MailBox() as mb, self.index_lock: cur.execute("CREATE TABLE IF NOT EXISTS msgindex (" "tracker VARCHAR(80)," "bugid INT," "messageid TINYTEXT UNIQUE" ")" ) if not folders: self.mlog("Full-updating message index...") cur.execute("SELECT tracker,bugid FROM bugs") folders=[ (tracker, bugid) for tracker,bugid in cur ] for tracker,bugid in folders: try: #self.mlog("Updating index for %s/%d"%(tracker,bugid)) mb.folder.set(self.get_bug_folder(mb,tracker,bugid)) for msg in mb.fetch(): if "message-id" in msg.headers: cur.execute("REPLACE INTO msgindex VALUES (?,?,?)",(tracker,bugid,msg.headers["message-id"][:255])) except Exception as e: self.mlog("Error while indexing mailbox of %s/%d: %s"%(tracker,bugid,e)) conn.commit() self.mlog("Updated message index") def find_in_reply_to(self,messageid): with self.dbpool.get_connection() as conn, conn.cursor() as cur: cur.execute("SELECT tracker,bugid FROM msgindex WHERE messageid=? LIMIT 1",(messageid,)) return cur.fetchone() def format_emailaddr(self,project=None,bugid=None,subject=None,headers={}): from urllib.parse import quote as quote email=self.addr_format.format(proj=project, bug=bugid) email=email.replace("#None",'') email=email.replace("+None",'') if subject: email+='?subject=' email+=quote(subject, safe='') for key,value in headers.items(): email+='?%s=%s'%(key,quote(value,safe='')) return email def get_full_projectname(self,proj): "returns results from ``self.site.findtrackers()`` without modification" return self.site.findtrackers(proj) def stripInfoFromMailAddr(self,address:str): "matches bugnumber and projectname from email address" addr=address.removesuffix("@"+self.emaildomain) if not self.uses_aliases: if not '+' in addr: return (None,None) addr=addr.removeprefix(self.emailname) addr=addr.removeprefix("+") if '#' in addr: proj,bug=addr.rsplit("#",1) else: proj=addr bug=None return (proj,bug) def ensurefolder(self,mailbox:MailBox,*path): "makes sure a folder exists on the mailserver" # assuming from rfc3501 section 7.2.2: # > All children of a top-level hierarchy node MUST use # > the same separator character. # hoping the whole mailserver uses the same delimiter delim=mailbox.folder.list('')[0].delim fname=delim.join([ str(i) for i in path]) if not mailbox.folder.exists(fname): mailbox.folder.create(fname) mailbox.folder.subscribe(fname,True) return fname def get_bug_folder(self,mailbox:MailBox,proj,bug=None): "helper to format the path to a bug's folder and ensure it's existence" path=["bugs"] path.extend(proj.split('/')) if bug: path.append(str(bug)) return self.ensurefolder(mailbox,*path) def get_MailBox(self): "get a new mailbox connection from the pool" return self.mbpool.get_box() def imap_magic(self): if self.stopping.wait(5): # if not stopping, this just times out after x seconds, so this is a nice timer return refreshable={} with self.get_MailBox() as mailbox: mailbox.folder.set("INBOX") if mailbox.folder.status()["MESSAGES"]>0: for msg in mailbox.fetch(): try: target=self.handle_email(mailbox,msg) if target: proj,bug=target refreshable.setdefault(proj,[]).append(bug) except Exception: import traceback self.mlog("Error processing email: ",traceback.format_exc()) self.mlog("Message-ID:",msg.headers["message-id"] if "message-id" in msg.headers else "None") self.mlog("Moving mail to INBOX/Errors.") if "message-id" in msg.headers: self.mail_error(msg=msg,notice="There was an error processing your email with message id:\n "+msg.headers["message-id"]+"\nPlease contact the instance administrator.") self.move_errored_mail(mailbox,msg) # block: update all webpages that received new mail for proj,bugs in refreshable.items(): # project page needs to be regenerated too (counters) cherrypy.engine.publish("regen",proj,None) for bug in bugs: cherrypy.engine.publish("regen",proj,bug) # end block if refreshable: self.update_index([proj,bug] for bug in bugs for proj,bugs in refreshable.items()) def get_newest_from_mailbox(self,mailbox,proj,bug): folder=mailbox.folder.get() mailbox.folder.set(self.get_bug_folder(mailbox,proj,bug),True) for m in mailbox.fetch(limit=1,reverse=True): break if folder: mailbox.folder.set(folder) return m def handle_email(self,mailbox:MailBox,msg:MailMessage): self.mlog("Processing email with subject '%s'"%msg.subject) if "x-acit-delete-when-sender" in msg.headers and "sender" in msg.headers: if msg.headers["x-acit-delete-when-sender"]==msg.headers["sender"]: self.mlog("Header requested deletion, deleting email") mailbox.delete([msg.uid]) return if "x-acit-is-outgoing" in msg.headers and "sender" in msg.headers: if msg.headers["x-acit-is-outgoing"]==msg.headers["sender"]: self.mlog("Header indicated this is Sent mail, moving") mailbox.move([msg.uid],self.ensurefolder(mailbox,"Sent")) return for addr in msg.to + msg.cc + msg.bcc + msg.reply_to: if re.fullmatch(self.addr_regex,addr): proj,bug=self.stripInfoFromMailAddr(addr) break else: proj=None bug=None if msg.subject.startswith("SUBSCRIBE"): from_=self.format_emailaddr(project=proj,bugid=bug) if not proj: self.smtp.sendmail( replyto=msg, from_=from_, body="Could not subscribe you, please specify a project and optionally a bug in the target address" ) self.mlog("") mailbox.delete([msg.uid]) return self.mlog("Subscribing",msg.from_,"to",proj,bug) with self.dbpool.get_connection() as conn, conn.cursor() as cur: cur.execute("INSERT INTO subscribers (tracker,bugid,email) VALUES (?,?,?)",(proj,bug,msg.from_)) if not bug: bug="all bugs" else: bug="bug "+bug self.smtp.sendmail( replyto=msg, from_=from_, body="Subscribed to "+bug+" "+proj+"\nTo unsubscribe, send an email with subjectline `UNSUBSCRIBE` to "+from_ ) mailbox.delete([msg.uid]) return if msg.subject.startswith("UNSUBSCRIBE"): self.mlog("Unsubscribing",msg.from_,"from",proj,bug) with self.dbpool.get_connection() as conn, conn.cursor() as cur: cur.execute("DELETE FROM subscribers WHERE tracker=? AND bugid=? AND email=?)",(proj,bug,msg.from_)) mailbox.delete([msg.uid]) return # block: handle secure email which uses $token$ as projectname if proj.startswith('$token$='): token=proj.removeprefix('$token$=') self.mlog("This email is sent to a secure token, resolving (matching against %s)."%token) with self.dbpool.get_connection() as conn, conn.cursor() as cur: cur.execute("SELECT email,tracker,bugid FROM tokens WHERE token=? AND usedin IS NULL",(token,)) data=cur.fetchone() if data and data[0]==msg.from_: self.mlog("Resolved to",data) proj=data[1] bug=data[2] try: self.secure_process_commands(mailbox,msg,conn,cur,proj,bug) except Exception as e: self.mlog("Error. IDK.",traceback=True) self.mail_error("Error processing commands:\n "+repr(e)+"\nPlease contact a site admin.") cur.execute("UPDATE tokens SET usedin=? WHERE token=?",(msg.headers["message-id"] if "message-id" else "nomessageid"),token) conn.commit() else: self.mlog("Couldn't resolve.") # end block # block: handle in-reply-to headers if "in-reply-to" in msg.headers: self.mlog("Using In-Reply-To header to figure out meta") replyid=msg.headers["in-reply-to"] data=self.find_in_reply_to(replyid) if data: proj,bug=data elif proj and bug: pass # just ignore this email if we already found a project and bug elif "RETRYING" in msg.flags: self.mail_error(msg,"No such email: "+msg.headers["in-reply-to"]) self.move_errored_mail(mailbox,msg) else: # try again later, maybe we need to index first or handle some other email first mailbox.flag([msg.uid],"RETRYING") self.mlog("Message-ID not in index, trying again next round") return # end block # block: make sure a project was specified if not proj: self.mlog("No project specified.") self.mail_error(msg,"Please specify a project by mailing to:\n "+\ ("" if self.uses_aliases else self.emailname+"+")+"PROJECT@"+self.emaildomain+\ "\nwhere PROJECT is the name of your target project") self.move_errored_mail(mailbox,msg) return # end block # block: make sure project exists proj_matches=self.get_full_projectname(proj) if not proj_matches: self.mlog("Received email for nonexistent project %s"%proj) self.mail_error(msg,notice="Project '%s' doesn't exist"%proj) self.move_errored_mail(mailbox,msg) return # end block # block: make sure only 1 project matches if len(proj_matches)>1: self.mlog("Conficting projectname. Sending projectlist.") self.mail_error(msg,notice="Multiple projects found to match your query. Please specify. Options:\n%s"%"\n".join(proj_matches)) self.move_errored_mail(mailbox,msg) return proj=proj_matches[0] # end block # block: make new bug if needed if not bug: if re.match(r"^\[PATCH.*\]",msg.subject): bugtype="PATCH" elif re.match(r"^\[DISCUSSION.*\]",msg.subject): bugtype="DISCUS" else: bugtype="BUG" bugobj=self.site.newbug(proj,bugtype=bugtype) bugobj.subject=msg.subject[:1024] bugobj.description=\ 'No description written.\nFirst email in thread:\n\n'+msg.text[:65535] # TODO: don't thruncate silently, send error to user. self.mlog("Assigned new bugnr %d to '%s'"%(bugobj.bugid,msg.subject)) bug=bugobj.bugid else: bugobj=self.site.getbug(proj,bug) # end block # parse bug id to make sure it's an int try: bug=int(bug) except ValueError as e: self.mlog("Error decoding value to int:",e,traceback=True) self.mail_error(msg,notice="Exception while trying to convert bug number to integer",exception=e) self.move_errored_mail(mailbox,msg) return # end block # block: move mail to folder specific for this project/bug self.mlog("Email '%s' into %s/%d"%(msg.subject,proj,bug)) try: path=self.get_bug_folder(mailbox,proj,bug) mailbox.move([msg.uid], path) if not msg.from_ in bugobj.subscribers: bugobj.addsubscriber(msg.from_) if msg.subject.startswith("SECURE"): self.secure_generate_token(mailbox,msg,proj,bug) mailbox.delete([msg.uid]) return (proj,bug) if not "in-reply-to" in msg.headers: replyto=self.get_newest_from_mailbox(mailbox,proj,bug) replyto=self.get_newest_from_mailbox(mailbox,proj,bug) self.fwd_to_list(msg,bugobj,replyto=replyto) except Exception: self.mlog("Error processing email '%s' for %s/%d"%(msg.subject,proj,bug),traceback=True) self.move_errored_mail(mailbox,msg) # end block return (proj,bug) def imap_loop_controller(self): "Responsible for running imap_magic() repeatedly, and handling its errors." threading.current_thread().setName("IMAPrunner") self.mlog("IMAP monitor thread started.") while not self.stopping.is_set(): try: self.imap_magic() except PoolEmpty: self.mlog("IMAP pool empty, unable to continue.") break except Exception: import traceback exc=traceback.format_exc() self.mlog("Exception occured:\n%s"%exc) self.mlog("!! this may lead to emails not appearing or appearing later !!") self.mlog("IMAP monitor thread stopped.") def mail_error(self,msg:MailMessage,notice:str=None,exception:Exception=None): from_=self.format_emailaddr() self.mlog("Sending error report.") body="An error occured while processing your email with subject:\n" body+="> "+msg.subject if notice: body+="\n\nThe mail worker reports:\n> "+notice.replace("\n","\n> ") if exception: body+="\n\nThe following error was included:\n> "+repr(exception).replace("\n","\n> ") body+="\n\nIf you think this is a bug, please report it (mail to acit@bugs.vosjedev.net).\n" body+="Make sure to attach both this email and the email that triggered this bug." self.smtp.sendmail(replyto=msg,from_=from_,body=body) def move_errored_mail(self,mailbox:MailBox,msg:MailMessage,): target=self.ensurefolder(mailbox,"INBOX","Errors") return mailbox.move(msg.uid,target) def fwd_to_list(self,msg:MailMessage,bug:Bug,replyto:MailMessage=None): tracker=self.site.gettracker(bug.tracker) bcc=bug.subscribers + tracker.subscribers self.mlog("Bcc:",bcc) from_=self.format_emailaddr(bug.tracker,bug.bugid) self.smtp.sendmail( from_=from_, bcc=bcc, tosend=msg, replyto=replyto, extraheaders={"Reply-To":from_} ) def secure_generate_token(self,mailbox:MailBox,msg:MailMessage,proj:str,bugid:int): self.mlog("Generating a secure token for",msg.from_) import random, string token=''.join(( random.choice(string.ascii_letters) for i in range(40))) with self.dbpool.get_connection() as conn, conn.cursor() as cur: cur.execute("SELECT email FROM permissiontable WHERE email=? AND (tracker=? OR tracker IS NULL) AND (bugid=? OR bugid IS NULL)",(msg.from_,proj,bugid)) if not cur.fetchall(): self.mail_error(msg,notice="You do not have permission to do that here.") return cur.execute("INSERT INTO tokens (email,token,tracker,bugid) VALUES (?,?,?,?)",(msg.from_,token,proj,bugid)) conn.commit() from_=self.format_emailaddr(project='$token$='+token) headers={} self.smtp.format_reply_headers(replyto=self.get_newest_from_mailbox(mailbox,proj,bugid), tosend=headers) self.smtp.sendmail( from_=from_, to=msg.from_, subject="Secure token generated for %s/%d"%(proj,bugid), save=False, body="Please reply to this email to use your token, or use the email address:\n "+from_, extraheaders=headers ) def secure_process_commands(self,mailbox:MailBox,msg:MailMessage,conn:Connection,cur:Cursor,proj:str,bugid:int): from .emailcommands import findcommands from .types import BUGSTATUS, BUGTYPES commandmatches=findcommands(msg.text) bug=self.site.getbug(tracker=proj,bugid=bugid) for match in commandmatches: text=match.group() text=text.replace("\r\n","\n") # dos2unix so we can forget about \r text=text.lstrip('\n') text=text.rstrip('\n') self.mlog(text) if text.startswith('SUBJECT '): bug.subject=text.removeprefix("SUBJECT ") self.mlog("Subject changed to:",bug.subject) elif text.startswith("STATUS ") or text.startswith("TYPE "): key,data=text.split(sep=' ',maxsplit=1) possibles={ "STATUS":BUGSTATUS, "TYPE":BUGTYPES } if not data in possibles[key]: # no need to errorcheck this, the check above implies key can only be STATUS or TYPE continue self.mlog("Set",key,"to",data) if key=="STATUS": bug.status=data elif key=="TYPE": bug.type=data elif text.startswith("DESCRIPTION"): self.mlog("Description command") cmdline,text=text.split('\n',1) text=text.removesuffix("\nDESCRIPTION END") m=re.match(r'DESCRIPTION @([0-9]*)(:[0-9])?',cmdline) start=None end=None self.mlog(m) if m: try: start=int(m[1]) if m[1]: end=int(m[2].lstrip(":")) else: end=start+1 except ValueError as e: self.mlog("Error converting description range to int:",repr(e)) continue replacement=text.split("\n") lines=bug.description.split("\n") self.mlog("Replacing: ",repr("\n".join(lines[start:end])),"with",repr("\n".join(replacement))) lines[start:end]=replacement bug.description="\n".join(lines) elif text.startswith("PERMIT "): account=text.removeprefix("PERMIT ") cur.execute("INSERT INTO permissiontable VALUES (?,?,?)",(account,proj,bugid)) conn.commit() elif text.startswith("UNPERMIT "): account=text.removeprefix("UNPERMIT ") cur.execute("DELETE FROM permissiontable WHERE email=? AND tracker=? AND bugid=?)",(account,proj,bugid)) conn.commit() else: self.mlog("Not yet implemented command: '%s'"%text) def stop(self): "Sets stopping signal, waits for all threads to stop" self.mlog("Stopping. This can take a while.") self.stopping.set() for thread in ( self.mailin_thread, ): if thread.is_alive() and not thread==threading.current_thread(): self.mlog("Waiting for thread: %s"%thread.name) thread.join() self.mlog("Stopping SMTP worker") self.smtp.stop() self.mlog("Closing IMAP pool") self.mbpool.close() self.mlog("Stopped") def mlog(self,*msg,**kwargs): import traceback function=traceback.extract_stack(limit=2)[0].name cherrypy.log(context="%s>>MAIL:%s"%(threading.current_thread().name, function), msg=" ".join([str(i) for i in msg]),**kwargs)