change(notify): converted to use aurweb.db ORM

- Removed notify sharness test

Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
Kevin Morris 2021-11-22 12:03:53 -08:00
parent 9fb1fbe32c
commit d8e3ca1abb
No known key found for this signature in database
GPG key ID: F7E46DED420788F3
7 changed files with 934 additions and 627 deletions

View file

@ -239,7 +239,6 @@ def remove_comaintainers(pkgbase: models.PackageBase,
:param usernames: Iterable of username strings :param usernames: Iterable of username strings
:return: None :return: None
""" """
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
notifications = [] notifications = []
with db.begin(): with db.begin():
for username in usernames: for username in usernames:
@ -250,8 +249,7 @@ def remove_comaintainers(pkgbase: models.PackageBase,
).first() ).first()
notifications.append( notifications.append(
notify.ComaintainerRemoveNotification( notify.ComaintainerRemoveNotification(
conn, comaintainer.User.ID, pkgbase.ID comaintainer.User.ID, pkgbase.ID)
)
) )
db.delete(comaintainer) db.delete(comaintainer)
@ -283,7 +281,6 @@ def add_comaintainers(request: Request, pkgbase: models.PackageBase,
memo[username] = user memo[username] = user
# Alright, now that we got past the check, add them all to the DB. # Alright, now that we got past the check, add them all to the DB.
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
notifications = [] notifications = []
with db.begin(): with db.begin():
for username in usernames: for username in usernames:
@ -302,7 +299,7 @@ def add_comaintainers(request: Request, pkgbase: models.PackageBase,
notifications.append( notifications.append(
notify.ComaintainerAddNotification( notify.ComaintainerAddNotification(
conn, comaintainer.User.ID, pkgbase.ID) comaintainer.User.ID, pkgbase.ID)
) )
# Send out notifications. # Send out notifications.

View file

@ -97,8 +97,7 @@ async def passreset_post(request: Request,
with db.begin(): with db.begin():
user.ResetKey = resetkey user.ResetKey = resetkey
executor = db.ConnectionExecutor(db.get_engine().raw_connection()) ResetKeyNotification(user.ID).send()
ResetKeyNotification(executor, user.ID).send()
# Render ?step=confirm. # Render ?step=confirm.
return RedirectResponse(url="/passreset?step=confirm", return RedirectResponse(url="/passreset?step=confirm",
@ -323,8 +322,7 @@ async def account_register_post(request: Request,
Fingerprint=fingerprint) Fingerprint=fingerprint)
# Send a reset key notification to the new user. # Send a reset key notification to the new user.
executor = db.ConnectionExecutor(db.get_engine().raw_connection()) WelcomeNotification(user.ID).send()
WelcomeNotification(executor, user.ID).send()
context["complete"] = True context["complete"] = True
context["user"] = user context["user"] = user

View file

@ -146,7 +146,6 @@ def delete_package(deleter: models.User, package: models.Package):
requests = [] requests = []
bases_to_delete = [] bases_to_delete = []
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
# In all cases, though, just delete the Package in question. # In all cases, though, just delete the Package in question.
if package.PackageBase.packages.count() == 1: if package.PackageBase.packages.count() == 1:
reqtype = db.query(models.RequestType).filter( reqtype = db.query(models.RequestType).filter(
@ -162,7 +161,7 @@ def delete_package(deleter: models.User, package: models.Package):
# Prepare DeleteNotification. # Prepare DeleteNotification.
notifications.append( notifications.append(
notify.DeleteNotification(conn, deleter.ID, package.PackageBase.ID) notify.DeleteNotification(deleter.ID, package.PackageBase.ID)
) )
# For each PackageRequest created, mock up an open and close notification. # For each PackageRequest created, mock up an open and close notification.
@ -170,12 +169,12 @@ def delete_package(deleter: models.User, package: models.Package):
for pkgreq in requests: for pkgreq in requests:
notifications.append( notifications.append(
notify.RequestOpenNotification( notify.RequestOpenNotification(
conn, deleter.ID, pkgreq.ID, reqtype.Name, deleter.ID, pkgreq.ID, reqtype.Name,
pkgreq.PackageBase.ID, merge_into=basename or None) pkgreq.PackageBase.ID, merge_into=basename or None)
) )
notifications.append( notifications.append(
notify.RequestCloseNotification( notify.RequestCloseNotification(
conn, deleter.ID, pkgreq.ID, pkgreq.status_display()) deleter.ID, pkgreq.ID, pkgreq.status_display())
) )
# Perform all the deletions. # Perform all the deletions.
@ -666,10 +665,9 @@ async def pkgbase_request_post(request: Request, name: str,
Comments=comments, Comments=comments,
ClosureComment=str()) ClosureComment=str())
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
# Prepare notification object. # Prepare notification object.
notif = notify.RequestOpenNotification( notif = notify.RequestOpenNotification(
conn, request.user.ID, pkgreq.ID, reqtype.Name, request.user.ID, pkgreq.ID, reqtype.Name,
pkgreq.PackageBase.ID, merge_into=merge_into or None) pkgreq.PackageBase.ID, merge_into=merge_into or None)
# Send the notification now that we're out of the DB scope. # Send the notification now that we're out of the DB scope.
@ -688,7 +686,7 @@ async def pkgbase_request_post(request: Request, name: str,
pkgreq.Status = ACCEPTED_ID pkgreq.Status = ACCEPTED_ID
db.refresh(pkgreq) db.refresh(pkgreq)
notif = notify.RequestCloseNotification( notif = notify.RequestCloseNotification(
conn, request.user.ID, pkgreq.ID, pkgreq.status_display()) request.user.ID, pkgreq.ID, pkgreq.status_display())
notif.send() notif.send()
elif type == "deletion" and is_maintainer and outdated: elif type == "deletion" and is_maintainer and outdated:
packages = pkgbase.packages.all() packages = pkgbase.packages.all()
@ -742,9 +740,8 @@ async def requests_close_post(request: Request, id: int,
pkgreq.Status = reason pkgreq.Status = reason
pkgreq.ClosureComment = comments pkgreq.ClosureComment = comments
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
notify_ = notify.RequestCloseNotification( notify_ = notify.RequestCloseNotification(
conn, request.user.ID, pkgreq.ID, pkgreq.status_display()) request.user.ID, pkgreq.ID, pkgreq.status_display())
notify_.send() notify_.send()
return RedirectResponse("/requests", status_code=HTTPStatus.SEE_OTHER) return RedirectResponse("/requests", status_code=HTTPStatus.SEE_OTHER)
@ -936,9 +933,7 @@ async def pkgbase_unvote(request: Request, name: str):
def pkgbase_disown_instance(request: Request, pkgbase: models.PackageBase): def pkgbase_disown_instance(request: Request, pkgbase: models.PackageBase):
disowner = request.user disowner = request.user
notif = notify.DisownNotification(disowner.ID, pkgbase.ID)
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
notif = notify.DisownNotification(conn, disowner.ID, pkgbase.ID)
if disowner != pkgbase.Maintainer: if disowner != pkgbase.Maintainer:
with db.begin(): with db.begin():
@ -1003,8 +998,7 @@ def pkgbase_adopt_instance(request: Request, pkgbase: models.PackageBase):
with db.begin(): with db.begin():
pkgbase.Maintainer = request.user pkgbase.Maintainer = request.user
conn = db.ConnectionExecutor(db.get_engine().raw_connection()) notif = notify.AdoptNotification(request.user.ID, pkgbase.ID)
notif = notify.AdoptNotification(conn, request.user.ID, pkgbase.ID)
notif.send() notif.send()
@ -1366,7 +1360,7 @@ def pkgbase_merge_instance(request: Request, pkgbase: models.PackageBase,
f"{request.user.Username}.") f"{request.user.Username}.")
rejected_closure_comment = ("Rejected because another merge request " rejected_closure_comment = ("Rejected because another merge request "
"for the same package base was accepted.") "for the same package base was accepted.")
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
if not requests: if not requests:
# If there are no requests, create one owned by request.user. # If there are no requests, create one owned by request.user.
with db.begin(): with db.begin():
@ -1383,7 +1377,7 @@ def pkgbase_merge_instance(request: Request, pkgbase: models.PackageBase,
# Add a notification about the opening to our notifs array. # Add a notification about the opening to our notifs array.
notif = notify.RequestOpenNotification( notif = notify.RequestOpenNotification(
conn, request.user.ID, pkgreq.ID, MERGE, request.user.ID, pkgreq.ID, MERGE,
pkgbase.ID, merge_into=target.Name) pkgbase.ID, merge_into=target.Name)
notifs.append(notif) notifs.append(notif)
@ -1417,11 +1411,9 @@ def pkgbase_merge_instance(request: Request, pkgbase: models.PackageBase,
for pkgreq in all_requests: for pkgreq in all_requests:
# Create notifications for request closure. # Create notifications for request closure.
notif = notify.RequestCloseNotification( notif = notify.RequestCloseNotification(
conn, request.user.ID, pkgreq.ID, pkgreq.status_display()) request.user.ID, pkgreq.ID, pkgreq.status_display())
notifs.append(notif) notifs.append(notif)
conn.close()
# Log this out for accountability purposes. # Log this out for accountability purposes.
logger.info(f"Trusted User '{request.user.Username}' merged " logger.info(f"Trusted User '{request.user.Username}' merged "
f"'{pkgbasename}' into '{target.Name}'.") f"'{pkgbasename}' into '{target.Name}'.")

View file

@ -7,10 +7,16 @@ import subprocess
import sys import sys
import textwrap import textwrap
from sqlalchemy import and_, or_
import aurweb.config import aurweb.config
import aurweb.db import aurweb.db
import aurweb.l10n import aurweb.l10n
from aurweb import db
from aurweb.models import (PackageBase, PackageComaintainer, PackageComment, PackageNotification, PackageRequest, RequestType,
TUVote, User)
aur_location = aurweb.config.get('options', 'aur_location') aur_location = aurweb.config.get('options', 'aur_location')
@ -22,23 +28,6 @@ def headers_reply(thread_id):
return {'In-Reply-To': thread_id, 'References': thread_id} return {'In-Reply-To': thread_id, 'References': thread_id}
def username_from_id(conn, uid):
cur = conn.execute('SELECT UserName FROM Users WHERE ID = ?', [uid])
return cur.fetchone()[0]
def pkgbase_from_id(conn, pkgbase_id):
cur = conn.execute('SELECT Name FROM PackageBases WHERE ID = ?',
[pkgbase_id])
return cur.fetchone()[0]
def pkgbase_from_pkgreq(conn, reqid):
cur = conn.execute('SELECT PackageBaseID FROM PackageRequests ' +
'WHERE ID = ?', [reqid])
return cur.fetchone()[0]
class Notification: class Notification:
def get_refs(self): def get_refs(self):
return () return ()
@ -52,8 +41,8 @@ class Notification:
def get_body_fmt(self, lang): def get_body_fmt(self, lang):
body = '' body = ''
for line in self.get_body(lang).splitlines(): for line in self.get_body(lang).splitlines():
if line == '-- ': if line == '--':
body += '-- \n' body += '--\n'
continue continue
body += textwrap.fill(line, break_long_words=False) + '\n' body += textwrap.fill(line, break_long_words=False) + '\n'
for i, ref in enumerate(self.get_refs()): for i, ref in enumerate(self.get_refs()):
@ -103,10 +92,11 @@ class Notification:
user = aurweb.config.get('notifications', 'smtp-user') user = aurweb.config.get('notifications', 'smtp-user')
passwd = aurweb.config.get('notifications', 'smtp-password') passwd = aurweb.config.get('notifications', 'smtp-password')
if use_ssl: classes = {
server = smtplib.SMTP_SSL(server_addr, server_port) False: smtplib.SMTP,
else: True: smtplib.SMTP_SSL,
server = smtplib.SMTP(server_addr, server_port) }
server = classes[use_ssl](server_addr, server_port)
if use_starttls: if use_starttls:
server.ehlo() server.ehlo()
@ -123,12 +113,24 @@ class Notification:
class ResetKeyNotification(Notification): class ResetKeyNotification(Notification):
def __init__(self, conn, uid): def __init__(self, uid):
cur = conn.execute('SELECT UserName, Email, BackupEmail, ' +
'LangPreference, ResetKey ' + user = db.query(User).filter(
'FROM Users WHERE ID = ? AND Suspended = 0', [uid]) and_(User.ID == uid, User.Suspended == 0)
self._username, self._to, self._backup, self._lang, self._resetkey = \ ).with_entities(
cur.fetchone() User.Username,
User.Email,
User.BackupEmail,
User.LangPreference,
User.ResetKey
).order_by(User.Username.asc()).first()
self._username = user.Username
self._to = user.Email
self._backup = user.BackupEmail
self._lang = user.LangPreference
self._resetkey = user.ResetKey
super().__init__() super().__init__()
def get_recipients(self): def get_recipients(self):
@ -167,21 +169,28 @@ class WelcomeNotification(ResetKeyNotification):
class CommentNotification(Notification): class CommentNotification(Notification):
def __init__(self, conn, uid, pkgbase_id, comment_id): def __init__(self, uid, pkgbase_id, comment_id):
self._user = username_from_id(conn, uid)
self._pkgbase = pkgbase_from_id(conn, pkgbase_id) self._user = db.query(User.Username).filter(
cur = conn.execute('SELECT DISTINCT Users.Email, Users.LangPreference ' User.ID == uid).first().Username
'FROM Users INNER JOIN PackageNotifications ' + self._pkgbase = db.query(PackageBase.Name).filter(
'ON PackageNotifications.UserID = Users.ID WHERE ' + PackageBase.ID == pkgbase_id).first().Name
'Users.CommentNotify = 1 AND ' +
'PackageNotifications.UserID != ? AND ' + query = db.query(User).join(PackageNotification).filter(
'PackageNotifications.PackageBaseID = ? AND ' + and_(User.CommentNotify == 1,
'Users.Suspended = 0', PackageNotification.UserID != uid,
[uid, pkgbase_id]) PackageNotification.PackageBaseID == pkgbase_id,
self._recipients = cur.fetchall() User.Suspended == 0)
cur = conn.execute('SELECT Comments FROM PackageComments WHERE ID = ?', ).with_entities(
[comment_id]) User.Email,
self._text = cur.fetchone()[0] User.LangPreference
).distinct()
self._recipients = [(u.Email, u.LangPreference) for u in query]
pkgcomment = db.query(PackageComment.Comments).filter(
PackageComment.ID == comment_id).first()
self._text = pkgcomment.Comments
super().__init__() super().__init__()
def get_recipients(self): def get_recipients(self):
@ -196,7 +205,7 @@ class CommentNotification(Notification):
body = aurweb.l10n.translator.translate( body = aurweb.l10n.translator.translate(
'{user} [1] added the following comment to {pkgbase} [2]:', '{user} [1] added the following comment to {pkgbase} [2]:',
lang).format(user=self._user, pkgbase=self._pkgbase) lang).format(user=self._user, pkgbase=self._pkgbase)
body += '\n\n' + self._text + '\n\n-- \n' body += '\n\n' + self._text + '\n\n--\n'
dnlabel = aurweb.l10n.translator.translate( dnlabel = aurweb.l10n.translator.translate(
'Disable notifications', lang) 'Disable notifications', lang)
body += aurweb.l10n.translator.translate( body += aurweb.l10n.translator.translate(
@ -216,19 +225,24 @@ class CommentNotification(Notification):
class UpdateNotification(Notification): class UpdateNotification(Notification):
def __init__(self, conn, uid, pkgbase_id): def __init__(self, uid, pkgbase_id):
self._user = username_from_id(conn, uid)
self._pkgbase = pkgbase_from_id(conn, pkgbase_id) self._user = db.query(User.Username).filter(
cur = conn.execute('SELECT DISTINCT Users.Email, ' + User.ID == uid).first().Username
'Users.LangPreference FROM Users ' + self._pkgbase = db.query(PackageBase.Name).filter(
'INNER JOIN PackageNotifications ' + PackageBase.ID == pkgbase_id).first().Name
'ON PackageNotifications.UserID = Users.ID WHERE ' +
'Users.UpdateNotify = 1 AND ' + query = db.query(User).join(PackageNotification).filter(
'PackageNotifications.UserID != ? AND ' + and_(User.UpdateNotify == 1,
'PackageNotifications.PackageBaseID = ? AND ' + PackageNotification.UserID != uid,
'Users.Suspended = 0', PackageNotification.PackageBaseID == pkgbase_id,
[uid, pkgbase_id]) User.Suspended == 0)
self._recipients = cur.fetchall() ).with_entities(
User.Email,
User.LangPreference
).distinct()
self._recipients = [(u.Email, u.LangPreference) for u in query]
super().__init__() super().__init__()
def get_recipients(self): def get_recipients(self):
@ -243,7 +257,7 @@ class UpdateNotification(Notification):
body = aurweb.l10n.translator.translate( body = aurweb.l10n.translator.translate(
'{user} [1] pushed a new commit to {pkgbase} [2].', '{user} [1] pushed a new commit to {pkgbase} [2].',
lang).format(user=self._user, pkgbase=self._pkgbase) lang).format(user=self._user, pkgbase=self._pkgbase)
body += '\n\n-- \n' body += '\n\n--\n'
dnlabel = aurweb.l10n.translator.translate( dnlabel = aurweb.l10n.translator.translate(
'Disable notifications', lang) 'Disable notifications', lang)
body += aurweb.l10n.translator.translate( body += aurweb.l10n.translator.translate(
@ -263,23 +277,30 @@ class UpdateNotification(Notification):
class FlagNotification(Notification): class FlagNotification(Notification):
def __init__(self, conn, uid, pkgbase_id): def __init__(self, uid, pkgbase_id):
self._user = username_from_id(conn, uid)
self._pkgbase = pkgbase_from_id(conn, pkgbase_id) self._user = db.query(User.Username).filter(
cur = conn.execute( User.ID == uid).first().Username
'SELECT DISTINCT Users.Email, ' + self._pkgbase = db.query(PackageBase.Name).filter(
'Users.LangPreference FROM Users ' + PackageBase.ID == pkgbase_id).first().Name
'LEFT JOIN PackageComaintainers ' +
'ON PackageComaintainers.UsersID = Users.ID ' + query = db.query(User).join(PackageComaintainer, isouter=True).join(
'INNER JOIN PackageBases ' + PackageBase,
'ON PackageBases.MaintainerUID = Users.ID OR ' + or_(PackageBase.MaintainerUID == User.ID,
'PackageBases.ID = PackageComaintainers.PackageBaseID ' + PackageBase.ID == PackageComaintainer.PackageBaseID)
'WHERE PackageBases.ID = ? AND ' + ).filter(
'Users.Suspended = 0', [pkgbase_id]) and_(PackageBase.ID == pkgbase_id,
self._recipients = cur.fetchall() User.Suspended == 0)
cur = conn.execute('SELECT FlaggerComment FROM PackageBases WHERE ' + ).with_entities(
'ID = ?', [pkgbase_id]) User.Email,
self._text = cur.fetchone()[0] User.LangPreference
).distinct()
self._recipients = [(u.Email, u.LangPreference) for u in query]
pkgbase = db.query(PackageBase.FlaggerComment).filter(
PackageBase.ID == pkgbase_id).first()
self._text = pkgbase.FlaggerComment
super().__init__() super().__init__()
def get_recipients(self): def get_recipients(self):
@ -304,22 +325,28 @@ class FlagNotification(Notification):
class OwnershipEventNotification(Notification): class OwnershipEventNotification(Notification):
def __init__(self, conn, uid, pkgbase_id): def __init__(self, uid, pkgbase_id):
self._user = username_from_id(conn, uid)
self._pkgbase = pkgbase_from_id(conn, pkgbase_id) self._user = db.query(User.Username).filter(
cur = conn.execute('SELECT DISTINCT Users.Email, ' + User.ID == uid).first().Username
'Users.LangPreference FROM Users ' + self._pkgbase = db.query(PackageBase.Name).filter(
'INNER JOIN PackageNotifications ' + PackageBase.ID == pkgbase_id).first().Name
'ON PackageNotifications.UserID = Users.ID WHERE ' +
'Users.OwnershipNotify = 1 AND ' + query = db.query(User).join(PackageNotification).filter(
'PackageNotifications.UserID != ? AND ' + and_(User.OwnershipNotify == 1,
'PackageNotifications.PackageBaseID = ? AND ' + PackageNotification.UserID != uid,
'Users.Suspended = 0', PackageNotification.PackageBaseID == pkgbase_id,
[uid, pkgbase_id]) User.Suspended == 0)
self._recipients = cur.fetchall() ).with_entities(
cur = conn.execute('SELECT FlaggerComment FROM PackageBases WHERE ' + User.Email,
'ID = ?', [pkgbase_id]) User.LangPreference
self._text = cur.fetchone()[0] ).distinct()
self._recipients = [(u.Email, u.LangPreference) for u in query]
pkgbase = db.query(PackageBase.FlaggerComment).filter(
PackageBase.ID == pkgbase_id).first()
self._text = pkgbase.FlaggerComment
super().__init__() super().__init__()
def get_recipients(self): def get_recipients(self):
@ -351,11 +378,22 @@ class DisownNotification(OwnershipEventNotification):
class ComaintainershipEventNotification(Notification): class ComaintainershipEventNotification(Notification):
def __init__(self, conn, uid, pkgbase_id): def __init__(self, uid, pkgbase_id):
self._pkgbase = pkgbase_from_id(conn, pkgbase_id)
cur = conn.execute('SELECT Email, LangPreference FROM Users ' + self._pkgbase = db.query(PackageBase.Name).filter(
'WHERE ID = ? AND Suspended = 0', [uid]) PackageBase.ID == pkgbase_id).first().Name
self._to, self._lang = cur.fetchone()
user = db.query(User).filter(
and_(User.ID == uid,
User.Suspended == 0)
).with_entities(
User.Email,
User.LangPreference
).first()
self._to = user.Email
self._lang = user.LangPreference
super().__init__() super().__init__()
def get_recipients(self): def get_recipients(self):
@ -385,22 +423,28 @@ class ComaintainerRemoveNotification(ComaintainershipEventNotification):
class DeleteNotification(Notification): class DeleteNotification(Notification):
def __init__(self, conn, uid, old_pkgbase_id, new_pkgbase_id=None): def __init__(self, uid, old_pkgbase_id, new_pkgbase_id=None):
self._user = username_from_id(conn, uid)
self._old_pkgbase = pkgbase_from_id(conn, old_pkgbase_id) self._user = db.query(User.Username).filter(
if new_pkgbase_id: User.ID == uid).first().Username
self._new_pkgbase = pkgbase_from_id(conn, new_pkgbase_id) self._old_pkgbase = db.query(PackageBase.Name).filter(
else: PackageBase.ID == old_pkgbase_id).first().Name
self._new_pkgbase = None self._new_pkgbase = None
cur = conn.execute('SELECT DISTINCT Users.Email, ' + if new_pkgbase_id:
'Users.LangPreference FROM Users ' + self._new_pkgbase = db.query(PackageBase.Name).filter(
'INNER JOIN PackageNotifications ' + PackageBase.ID == new_pkgbase_id).first().Name
'ON PackageNotifications.UserID = Users.ID WHERE ' +
'PackageNotifications.UserID != ? AND ' + query = db.query(User).join(PackageNotification).filter(
'PackageNotifications.PackageBaseID = ? AND ' + and_(PackageNotification.UserID != uid,
'Users.Suspended = 0', PackageNotification.PackageBaseID == old_pkgbase_id,
[uid, old_pkgbase_id]) User.Suspended == 0)
self._recipients = cur.fetchall() ).with_entities(
User.Email,
User.LangPreference
).distinct()
self._recipients = [(u.Email, u.LangPreference) for u in query]
super().__init__() super().__init__()
def get_recipients(self): def get_recipients(self):
@ -417,7 +461,7 @@ class DeleteNotification(Notification):
'Disable notifications', lang) 'Disable notifications', lang)
return aurweb.l10n.translator.translate( return aurweb.l10n.translator.translate(
'{user} [1] merged {old} [2] into {new} [3].\n\n' '{user} [1] merged {old} [2] into {new} [3].\n\n'
'-- \n' '--\n'
'If you no longer wish receive notifications about the ' 'If you no longer wish receive notifications about the '
'new package, please go to [3] and click "{label}".', 'new package, please go to [3] and click "{label}".',
lang).format(user=self._user, old=self._old_pkgbase, lang).format(user=self._user, old=self._old_pkgbase,
@ -438,26 +482,36 @@ class DeleteNotification(Notification):
class RequestOpenNotification(Notification): class RequestOpenNotification(Notification):
def __init__(self, conn, uid, reqid, reqtype, pkgbase_id, merge_into=None): def __init__(self, uid, reqid, reqtype, pkgbase_id, merge_into=None):
self._user = username_from_id(conn, uid)
self._pkgbase = pkgbase_from_id(conn, pkgbase_id) self._user = db.query(User.Username).filter(
cur = conn.execute( User.ID == uid).first().Username
'SELECT DISTINCT Users.Email FROM PackageRequests ' + self._pkgbase = db.query(PackageBase.Name).filter(
'INNER JOIN PackageBases ' + PackageBase.ID == pkgbase_id).first().Name
'ON PackageBases.ID = PackageRequests.PackageBaseID ' +
'LEFT JOIN PackageComaintainers ' +
'ON PackageComaintainers.PackageBaseID = PackageRequests.PackageBaseID ' +
'INNER JOIN Users ' +
'ON Users.ID = PackageRequests.UsersID ' +
'OR Users.ID = PackageBases.MaintainerUID ' +
'OR Users.ID = PackageComaintainers.UsersID ' +
'WHERE PackageRequests.ID = ? AND ' +
'Users.Suspended = 0', [reqid])
self._to = aurweb.config.get('options', 'aur_request_ml') self._to = aurweb.config.get('options', 'aur_request_ml')
self._cc = [row[0] for row in cur.fetchall()]
cur = conn.execute('SELECT Comments FROM PackageRequests WHERE ID = ?', query = db.query(PackageRequest).join(PackageBase).join(
[reqid]) PackageComaintainer,
self._text = cur.fetchone()[0] PackageComaintainer.PackageBaseID == PackageRequest.PackageBaseID,
isouter=True
).join(
User,
or_(User.ID == PackageRequest.UsersID,
User.ID == PackageBase.MaintainerUID,
User.ID == PackageComaintainer.UsersID)
).filter(
and_(PackageRequest.ID == reqid,
User.Suspended == 0)
).with_entities(
User.Email
).distinct()
self._cc = [u.Email for u in query]
pkgreq = db.query(PackageRequest.Comments).filter(
PackageRequest.ID == reqid).first()
self._text = pkgreq.Comments
self._reqid = int(reqid) self._reqid = int(reqid)
self._reqtype = reqtype self._reqtype = reqtype
self._merge_into = merge_into self._merge_into = merge_into
@ -500,31 +554,41 @@ class RequestOpenNotification(Notification):
class RequestCloseNotification(Notification): class RequestCloseNotification(Notification):
def __init__(self, conn, uid, reqid, reason): def __init__(self, uid, reqid, reason):
self._user = username_from_id(conn, uid) if int(uid) else None user = db.query(User.Username).filter(User.ID == uid).first()
self._user = user.Username if user else None
cur = conn.execute(
'SELECT DISTINCT Users.Email FROM PackageRequests ' +
'INNER JOIN PackageBases ' +
'ON PackageBases.ID = PackageRequests.PackageBaseID ' +
'LEFT JOIN PackageComaintainers ' +
'ON PackageComaintainers.PackageBaseID = PackageRequests.PackageBaseID ' +
'INNER JOIN Users ' +
'ON Users.ID = PackageRequests.UsersID ' +
'OR Users.ID = PackageBases.MaintainerUID ' +
'OR Users.ID = PackageComaintainers.UsersID ' +
'WHERE PackageRequests.ID = ? AND ' +
'Users.Suspended = 0', [reqid])
self._to = aurweb.config.get('options', 'aur_request_ml') self._to = aurweb.config.get('options', 'aur_request_ml')
self._cc = [row[0] for row in cur.fetchall()]
cur = conn.execute('SELECT PackageRequests.ClosureComment, ' + query = db.query(PackageRequest).join(PackageBase).join(
'RequestTypes.Name, ' + PackageComaintainer,
'PackageRequests.PackageBaseName ' + PackageComaintainer.PackageBaseID == PackageRequest.PackageBaseID,
'FROM PackageRequests ' + isouter=True
'INNER JOIN RequestTypes ' + ).join(
'ON RequestTypes.ID = PackageRequests.ReqTypeID ' + User,
'WHERE PackageRequests.ID = ?', [reqid]) or_(User.ID == PackageRequest.UsersID,
self._text, self._reqtype, self._pkgbase = cur.fetchone() User.ID == PackageBase.MaintainerUID,
User.ID == PackageComaintainer.UsersID)
).filter(
and_(PackageRequest.ID == reqid,
User.Suspended == 0)
).with_entities(
User.Email
).distinct()
self._cc = [u.Email for u in query]
pkgreq = db.query(PackageRequest).join(RequestType).filter(
PackageRequest.ID == reqid
).with_entities(
PackageRequest.ClosureComment,
RequestType.Name,
PackageRequest.PackageBaseName
).first()
self._text = pkgreq.ClosureComment
self._reqtype = pkgreq.Name
self._pkgbase = pkgreq.PackageBaseName
self._reqid = int(reqid) self._reqid = int(reqid)
self._reason = reason self._reason = reason
@ -567,14 +631,19 @@ class RequestCloseNotification(Notification):
class TUVoteReminderNotification(Notification): class TUVoteReminderNotification(Notification):
def __init__(self, conn, vote_id): def __init__(self, vote_id):
self._vote_id = int(vote_id) self._vote_id = int(vote_id)
cur = conn.execute('SELECT Email, LangPreference FROM Users ' +
'WHERE AccountTypeID IN (2, 4) AND ID NOT IN ' + subquery = db.query(TUVote.UserID).filter(TUVote.VoteID == vote_id)
'(SELECT UserID FROM TU_Votes ' + query = db.query(User).filter(
'WHERE TU_Votes.VoteID = ?) AND ' + and_(User.AccountTypeID.in_((2, 4)),
'Users.Suspended = 0', [vote_id]) ~User.ID.in_(subquery),
self._recipients = cur.fetchall() User.Suspended == 0)
).with_entities(
User.Email, User.LangPreference
)
self._recipients = [(u.Email, u.LangPreference) for u in query]
super().__init__() super().__init__()
def get_recipients(self): def get_recipients(self):
@ -596,6 +665,7 @@ class TUVoteReminderNotification(Notification):
def main(): def main():
db.get_engine()
action = sys.argv[1] action = sys.argv[1]
action_map = { action_map = {
'send-resetkey': ResetKeyNotification, 'send-resetkey': ResetKeyNotification,
@ -613,14 +683,10 @@ def main():
'tu-vote-reminder': TUVoteReminderNotification, 'tu-vote-reminder': TUVoteReminderNotification,
} }
conn = aurweb.db.Connection() with db.begin():
notification = action_map[action](*sys.argv[2:])
notification = action_map[action](conn, *sys.argv[2:])
notification.send() notification.send()
conn.commit()
conn.close()
if __name__ == '__main__': if __name__ == '__main__':
main() main()

42
aurweb/testing/smtp.py Normal file
View file

@ -0,0 +1,42 @@
""" Fake SMTP clients that can be used for testing. """
class FakeSMTP:
""" A fake version of smtplib.SMTP used for testing. """
starttls_enabled = False
use_ssl = False
def __init__(self):
self.emails = []
self.count = 0
self.ehlo_count = 0
self.quit_count = 0
self.set_debuglevel_count = 0
self.user = None
self.passwd = None
def ehlo(self) -> None:
self.ehlo_count += 1
def starttls(self) -> None:
self.starttls_enabled = True
def set_debuglevel(self, level: int = 0) -> None:
self.set_debuglevel_count += 1
def login(self, user: str, passwd: str) -> None:
self.user = user
self.passwd = passwd
def sendmail(self, sender: str, to: str, msg: bytes) -> None:
self.emails.append((sender, to, msg.decode()))
self.count += 1
def quit(self) -> None:
self.quit_count += 1
class FakeSMTP_SSL(FakeSMTP):
""" A fake version of smtplib.SMTP_SSL used for testing. """
use_ssl = True

View file

@ -1,431 +0,0 @@
#!/bin/sh
test_description='notify tests'
. "$(dirname "$0")/setup.sh"
test_expect_success 'Test out-of-date notifications.' '
cat <<-EOD | sqlite3 aur.db &&
/* Use package base IDs which can be distinguished from user IDs. */
INSERT INTO PackageBases (ID, Name, MaintainerUID, SubmittedTS, ModifiedTS, FlaggerComment) VALUES (1001, "foobar", 1, 0, 0, "This is a test OOD comment.");
INSERT INTO PackageBases (ID, Name, MaintainerUID, SubmittedTS, ModifiedTS, FlaggerComment) VALUES (1002, "foobar2", 2, 0, 0, "");
INSERT INTO PackageBases (ID, Name, MaintainerUID, SubmittedTS, ModifiedTS, FlaggerComment) VALUES (1003, "foobar3", NULL, 0, 0, "");
INSERT INTO PackageBases (ID, Name, MaintainerUID, SubmittedTS, ModifiedTS, FlaggerComment) VALUES (1004, "foobar4", 1, 0, 0, "");
INSERT INTO PackageComaintainers (PackageBaseID, UsersID, Priority) VALUES (1001, 2, 1);
INSERT INTO PackageComaintainers (PackageBaseID, UsersID, Priority) VALUES (1001, 4, 2);
INSERT INTO PackageComaintainers (PackageBaseID, UsersID, Priority) VALUES (1002, 3, 1);
INSERT INTO PackageComaintainers (PackageBaseID, UsersID, Priority) VALUES (1002, 5, 2);
INSERT INTO PackageComaintainers (PackageBaseID, UsersID, Priority) VALUES (1003, 4, 1);
EOD
>sendmail.out &&
cover "$NOTIFY" flag 1 1001 &&
cat <<-EOD >expected &&
Subject: AUR Out-of-date Notification for foobar
To: tu@localhost
Subject: AUR Out-of-date Notification for foobar
To: user2@localhost
Subject: AUR Out-of-date Notification for foobar
To: user@localhost
EOD
grep "^\(Subject\|To\)" sendmail.out >sendmail.parts &&
test_cmp sendmail.parts expected &&
cat <<-EOD | sqlite3 aur.db
DELETE FROM PackageComaintainers;
EOD
'
test_expect_success 'Test subject and body of reset key notifications.' '
cat <<-EOD | sqlite3 aur.db &&
UPDATE Users SET ResetKey = "12345678901234567890123456789012" WHERE ID = 1;
EOD
>sendmail.out &&
cover "$NOTIFY" send-resetkey 1 &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: AUR Password Reset
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
A password reset request was submitted for the account user associated
with your email address. If you wish to reset your password follow the
link [1] below, otherwise ignore this message and nothing will happen.
[1] https://aur.archlinux.org/passreset/?resetkey=12345678901234567890123456789012
EOD
test_cmp actual expected
'
test_expect_success 'Test subject and body of welcome notifications.' '
cat <<-EOD | sqlite3 aur.db &&
UPDATE Users SET ResetKey = "12345678901234567890123456789012" WHERE ID = 1;
EOD
>sendmail.out &&
cover "$NOTIFY" welcome 1 &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: Welcome to the Arch User Repository
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
Welcome to the Arch User Repository! In order to set an initial
password for your new account, please click the link [1] below. If the
link does not work, try copying and pasting it into your browser.
[1] https://aur.archlinux.org/passreset/?resetkey=12345678901234567890123456789012
EOD
test_cmp actual expected
'
test_expect_success 'Test subject and body of comment notifications.' '
cat <<-EOD | sqlite3 aur.db &&
/* Use package comments IDs which can be distinguished from other IDs. */
INSERT INTO PackageComments (ID, PackageBaseID, UsersID, Comments, RenderedComment) VALUES (2001, 1001, 1, "This is a test comment.", "This is a test comment.");
INSERT INTO PackageNotifications (PackageBaseID, UserID) VALUES (1001, 2);
UPDATE Users SET CommentNotify = 1 WHERE ID = 2;
EOD
>sendmail.out &&
cover "$NOTIFY" comment 1 1001 2001 &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: AUR Comment for foobar
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
user [1] added the following comment to foobar [2]:
This is a test comment.
--
If you no longer wish to receive notifications about this package,
please go to the package page [2] and select "Disable notifications".
[1] https://aur.archlinux.org/account/user/
[2] https://aur.archlinux.org/pkgbase/foobar/
EOD
test_cmp actual expected
'
test_expect_success 'Test subject and body of update notifications.' '
cat <<-EOD | sqlite3 aur.db &&
UPDATE Users SET UpdateNotify = 1 WHERE ID = 2;
EOD
>sendmail.out &&
cover "$NOTIFY" update 1 1001 &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: AUR Package Update: foobar
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
user [1] pushed a new commit to foobar [2].
--
If you no longer wish to receive notifications about this package,
please go to the package page [2] and select "Disable notifications".
[1] https://aur.archlinux.org/account/user/
[2] https://aur.archlinux.org/pkgbase/foobar/
EOD
test_cmp actual expected
'
test_expect_success 'Test subject and body of out-of-date notifications.' '
>sendmail.out &&
cover "$NOTIFY" flag 1 1001 &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: AUR Out-of-date Notification for foobar
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
Your package foobar [1] has been flagged out-of-date by user [2]:
This is a test OOD comment.
[1] https://aur.archlinux.org/pkgbase/foobar/
[2] https://aur.archlinux.org/account/user/
EOD
test_cmp actual expected
'
test_expect_success 'Test subject and body of adopt notifications.' '
>sendmail.out &&
cover "$NOTIFY" adopt 1 1001 &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: AUR Ownership Notification for foobar
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
The package foobar [1] was adopted by user [2].
[1] https://aur.archlinux.org/pkgbase/foobar/
[2] https://aur.archlinux.org/account/user/
EOD
test_cmp actual expected
'
test_expect_success 'Test subject and body of disown notifications.' '
>sendmail.out &&
cover "$NOTIFY" disown 1 1001 &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: AUR Ownership Notification for foobar
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
The package foobar [1] was disowned by user [2].
[1] https://aur.archlinux.org/pkgbase/foobar/
[2] https://aur.archlinux.org/account/user/
EOD
test_cmp actual expected
'
test_expect_success 'Test subject and body of co-maintainer addition notifications.' '
>sendmail.out &&
cover "$NOTIFY" comaintainer-add 1 1001 &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: AUR Co-Maintainer Notification for foobar
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
You were added to the co-maintainer list of foobar [1].
[1] https://aur.archlinux.org/pkgbase/foobar/
EOD
test_cmp actual expected
'
test_expect_success 'Test subject and body of co-maintainer removal notifications.' '
>sendmail.out &&
cover "$NOTIFY" comaintainer-remove 1 1001 &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: AUR Co-Maintainer Notification for foobar
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
You were removed from the co-maintainer list of foobar [1].
[1] https://aur.archlinux.org/pkgbase/foobar/
EOD
test_cmp actual expected
'
test_expect_success 'Test subject and body of delete notifications.' '
>sendmail.out &&
cover "$NOTIFY" delete 1 1001 &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: AUR Package deleted: foobar
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
user [1] deleted foobar [2].
You will no longer receive notifications about this package.
[1] https://aur.archlinux.org/account/user/
[2] https://aur.archlinux.org/pkgbase/foobar/
EOD
test_cmp actual expected
'
test_expect_success 'Test subject and body of merge notifications.' '
>sendmail.out &&
cover "$NOTIFY" delete 1 1001 1002 &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: AUR Package deleted: foobar
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
user [1] merged foobar [2] into foobar2 [3].
--
If you no longer wish receive notifications about the new package,
please go to [3] and click "Disable notifications".
[1] https://aur.archlinux.org/account/user/
[2] https://aur.archlinux.org/pkgbase/foobar/
[3] https://aur.archlinux.org/pkgbase/foobar2/
EOD
test_cmp actual expected
'
test_expect_success 'Test Cc, subject and body of request open notifications.' '
cat <<-EOD | sqlite3 aur.db &&
/* Use package request IDs which can be distinguished from other IDs. */
INSERT INTO PackageRequests (ID, PackageBaseID, PackageBaseName, UsersID, ReqTypeID, Comments, ClosureComment) VALUES (3001, 1001, "foobar", 2, 1, "This is a request test comment.", "");
EOD
>sendmail.out &&
cover "$NOTIFY" request-open 1 3001 orphan 1001 &&
grep ^Cc: sendmail.out >actual &&
cat <<-EOD >expected &&
Cc: user@localhost, tu@localhost
EOD
test_cmp actual expected &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: [PRQ#3001] Orphan Request for foobar
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
user [1] filed an orphan request for foobar [2]:
This is a request test comment.
[1] https://aur.archlinux.org/account/user/
[2] https://aur.archlinux.org/pkgbase/foobar/
EOD
test_cmp actual expected
'
test_expect_success 'Test subject and body of request open notifications for merge requests.' '
>sendmail.out &&
cover "$NOTIFY" request-open 1 3001 merge 1001 foobar2 &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: [PRQ#3001] Merge Request for foobar
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
user [1] filed a request to merge foobar [2] into foobar2 [3]:
This is a request test comment.
[1] https://aur.archlinux.org/account/user/
[2] https://aur.archlinux.org/pkgbase/foobar/
[3] https://aur.archlinux.org/pkgbase/foobar2/
EOD
test_cmp actual expected
'
test_expect_success 'Test Cc, subject and body of request close notifications.' '
>sendmail.out &&
cover "$NOTIFY" request-close 1 3001 accepted &&
grep ^Cc: sendmail.out >actual &&
cat <<-EOD >expected &&
Cc: user@localhost, tu@localhost
EOD
test_cmp actual expected &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: [PRQ#3001] Deletion Request for foobar Accepted
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
Request #3001 has been accepted by user [1].
[1] https://aur.archlinux.org/account/user/
EOD
test_cmp actual expected
'
test_expect_success 'Test subject and body of request close notifications (auto-accept).' '
>sendmail.out &&
cover "$NOTIFY" request-close 0 3001 accepted &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: [PRQ#3001] Deletion Request for foobar Accepted
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
Request #3001 has been accepted automatically by the Arch User
Repository package request system.
EOD
test_cmp actual expected
'
test_expect_success 'Test Cc of request close notification with co-maintainer.' '
cat <<-EOD | sqlite3 aur.db &&
/* Use package base IDs which can be distinguished from user IDs. */
INSERT INTO PackageComaintainers (PackageBaseID, UsersID, Priority) VALUES (1001, 3, 1);
EOD
>sendmail.out &&
"$NOTIFY" request-close 0 3001 accepted &&
grep ^Cc: sendmail.out >actual &&
cat <<-EOD >expected &&
Cc: user@localhost, tu@localhost, dev@localhost
EOD
test_cmp actual expected &&
cat <<-EOD | sqlite3 aur.db
DELETE FROM PackageComaintainers;
EOD
'
test_expect_success 'Test subject and body of request close notifications with closure comment.' '
cat <<-EOD | sqlite3 aur.db &&
UPDATE PackageRequests SET ClosureComment = "This is a test closure comment." WHERE ID = 3001;
EOD
>sendmail.out &&
cover "$NOTIFY" request-close 1 3001 accepted &&
grep ^Subject: sendmail.out >actual &&
cat <<-EOD >expected &&
Subject: [PRQ#3001] Deletion Request for foobar Accepted
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
Request #3001 has been accepted by user [1]:
This is a test closure comment.
[1] https://aur.archlinux.org/account/user/
EOD
test_cmp actual expected
'
test_expect_success 'Test subject and body of TU vote reminders.' '
>sendmail.out &&
cover "$NOTIFY" tu-vote-reminder 1 &&
grep ^Subject: sendmail.out | head -1 >actual &&
cat <<-EOD >expected &&
Subject: TU Vote Reminder: Proposal 1
EOD
test_cmp actual expected &&
sed -n "/^\$/,\$p" sendmail.out | head -4 | base64 -d >actual &&
echo >>actual &&
cat <<-EOD >expected &&
Please remember to cast your vote on proposal 1 [1]. The voting period
ends in less than 48 hours.
[1] https://aur.archlinux.org/tu/?id=1
EOD
test_cmp actual expected
'
test_done

643
test/test_notify.py Normal file
View file

@ -0,0 +1,643 @@
from datetime import datetime
from typing import List
from unittest import mock
import pytest
from aurweb import config, db, models
from aurweb.models import Package, PackageBase, PackageRequest, User
from aurweb.models.account_type import TRUSTED_USER_ID, USER_ID
from aurweb.models.request_type import ORPHAN_ID
from aurweb.scripts import notify, rendercomment
from aurweb.testing.email import Email
from aurweb.testing.smtp import FakeSMTP, FakeSMTP_SSL
aur_location = config.get("options", "aur_location")
aur_request_ml = config.get("options", "aur_request_ml")
@pytest.fixture(autouse=True)
def setup(db_test):
return
@pytest.fixture
def user() -> User:
with db.begin():
user = db.create(User, Username="test", Email="test@example.org",
Passwd=str(), AccountTypeID=USER_ID)
yield user
@pytest.fixture
def user1() -> User:
with db.begin():
user1 = db.create(User, Username="user1", Email="user1@example.org",
Passwd=str(), AccountTypeID=USER_ID)
yield user1
@pytest.fixture
def user2() -> User:
with db.begin():
user2 = db.create(User, Username="user2", Email="user2@example.org",
Passwd=str(), AccountTypeID=USER_ID)
yield user2
@pytest.fixture
def pkgbases(user: User) -> List[PackageBase]:
now = int(datetime.utcnow().timestamp())
output = []
with db.begin():
for i in range(5):
output.append(
db.create(PackageBase, Name=f"pkgbase_{i}",
Maintainer=user, SubmittedTS=now,
ModifiedTS=now))
db.create(models.PackageNotification, PackageBase=output[-1],
User=user)
yield output
@pytest.fixture
def pkgreq(user2: User, pkgbases: List[PackageBase]):
pkgbase = pkgbases[0]
with db.begin():
pkgreq_ = db.create(PackageRequest, PackageBase=pkgbase,
PackageBaseName=pkgbase.Name, User=user2,
ReqTypeID=ORPHAN_ID,
Comments="This is a request test comment.",
ClosureComment=str())
yield pkgreq_
@pytest.fixture
def packages(pkgbases: List[PackageBase]) -> List[Package]:
output = []
with db.begin():
for i, pkgbase in enumerate(pkgbases):
output.append(
db.create(Package, PackageBase=pkgbase,
Name=f"pkg_{i}", Version=f"{i}.0"))
yield output
def test_out_of_date(user: User, user1: User, user2: User,
pkgbases: List[PackageBase]):
pkgbase = pkgbases[0]
# Create two comaintainers. We'll pass the maintainer uid to
# FlagNotification, so we should expect to get two emails.
with db.begin():
db.create(models.PackageComaintainer,
PackageBase=pkgbase, User=user1, Priority=1)
db.create(models.PackageComaintainer,
PackageBase=pkgbase, User=user2, Priority=2)
# Send the notification for pkgbases[0].
notif = notify.FlagNotification(user.ID, pkgbases[0].ID)
notif.send()
# Should've gotten three emails: maintainer + the two comaintainers.
assert Email.count() == 3
# Comaintainer 1.
first = Email(1).parse()
assert first.headers.get("To") == user1.Email
expected = f"AUR Out-of-date Notification for {pkgbase.Name}"
assert first.headers.get("Subject") == expected
# Comaintainer 2.
second = Email(2).parse()
assert second.headers.get("To") == user2.Email
# Maintainer.
third = Email(3).parse()
assert third.headers.get("To") == user.Email
def test_reset(user: User):
with db.begin():
user.ResetKey = "12345678901234567890123456789012"
notif = notify.ResetKeyNotification(user.ID)
notif.send()
assert Email.count() == 1
email = Email(1).parse()
expected = "AUR Password Reset"
assert email.headers.get("Subject") == expected
expected = f"""\
A password reset request was submitted for the account test associated
with your email address. If you wish to reset your password follow the
link [1] below, otherwise ignore this message and nothing will happen.
[1] {aur_location}/passreset/?resetkey=12345678901234567890123456789012\
"""
assert email.body == expected
def test_welcome(user: User):
with db.begin():
user.ResetKey = "12345678901234567890123456789012"
notif = notify.WelcomeNotification(user.ID)
notif.send()
assert Email.count() == 1
email = Email(1).parse()
expected = "Welcome to the Arch User Repository"
assert email.headers.get("Subject") == expected
expected = f"""\
Welcome to the Arch User Repository! In order to set an initial
password for your new account, please click the link [1] below. If the
link does not work, try copying and pasting it into your browser.
[1] {aur_location}/passreset/?resetkey=12345678901234567890123456789012\
"""
assert email.body == expected
def test_comment(user: User, user2: User, pkgbases: List[PackageBase]):
pkgbase = pkgbases[0]
with db.begin():
comment = db.create(models.PackageComment, PackageBase=pkgbase,
User=user2, Comments="This is a test comment.")
rendercomment.update_comment_render_fastapi(comment)
notif = notify.CommentNotification(user2.ID, pkgbase.ID, comment.ID)
notif.send()
assert Email.count() == 1
email = Email(1).parse()
assert email.headers.get("To") == user.Email
expected = f"AUR Comment for {pkgbase.Name}"
assert email.headers.get("Subject") == expected
expected = f"""\
{user2.Username} [1] added the following comment to {pkgbase.Name} [2]:
This is a test comment.
--
If you no longer wish to receive notifications about this package,
please go to the package page [2] and select "Disable notifications".
[1] {aur_location}/account/{user2.Username}/
[2] {aur_location}/pkgbase/{pkgbase.Name}/\
"""
assert expected == email.body
def test_update(user: User, user2: User, pkgbases: List[PackageBase]):
pkgbase = pkgbases[0]
with db.begin():
user.UpdateNotify = 1
notif = notify.UpdateNotification(user2.ID, pkgbase.ID)
notif.send()
assert Email.count() == 1
email = Email(1).parse()
assert email.headers.get("To") == user.Email
expected = f"AUR Package Update: {pkgbase.Name}"
assert email.headers.get("Subject") == expected
expected = f"""\
{user2.Username} [1] pushed a new commit to {pkgbase.Name} [2].
--
If you no longer wish to receive notifications about this package,
please go to the package page [2] and select "Disable notifications".
[1] {aur_location}/account/{user2.Username}/
[2] {aur_location}/pkgbase/{pkgbase.Name}/\
"""
assert expected == email.body
def test_adopt(user: User, user2: User, pkgbases: List[PackageBase]):
pkgbase = pkgbases[0]
notif = notify.AdoptNotification(user2.ID, pkgbase.ID)
notif.send()
assert Email.count() == 1
email = Email(1).parse()
assert email.headers.get("To") == user.Email
expected = f"AUR Ownership Notification for {pkgbase.Name}"
assert email.headers.get("Subject") == expected
expected = f"""\
The package {pkgbase.Name} [1] was adopted by {user2.Username} [2].
[1] {aur_location}/pkgbase/{pkgbase.Name}/
[2] {aur_location}/account/{user2.Username}/\
"""
assert email.body == expected
def test_disown(user: User, user2: User, pkgbases: List[PackageBase]):
pkgbase = pkgbases[0]
notif = notify.DisownNotification(user2.ID, pkgbase.ID)
notif.send()
assert Email.count() == 1
email = Email(1).parse()
assert email.headers.get("To") == user.Email
expected = f"AUR Ownership Notification for {pkgbase.Name}"
assert email.headers.get("Subject") == expected
expected = f"""\
The package {pkgbase.Name} [1] was disowned by {user2.Username} [2].
[1] {aur_location}/pkgbase/{pkgbase.Name}/
[2] {aur_location}/account/{user2.Username}/\
"""
assert email.body == expected
def test_comaintainer_addition(user: User, pkgbases: List[PackageBase]):
# TODO: Add this in fastapi code!
pkgbase = pkgbases[0]
notif = notify.ComaintainerAddNotification(user.ID, pkgbase.ID)
notif.send()
assert Email.count() == 1
email = Email(1).parse()
assert email.headers.get("To") == user.Email
expected = f"AUR Co-Maintainer Notification for {pkgbase.Name}"
assert email.headers.get("Subject") == expected
expected = f"""\
You were added to the co-maintainer list of {pkgbase.Name} [1].
[1] {aur_location}/pkgbase/{pkgbase.Name}/\
"""
assert email.body == expected
def test_comaintainer_removal(user: User, pkgbases: List[PackageBase]):
# TODO: Add this in fastapi code!
pkgbase = pkgbases[0]
notif = notify.ComaintainerRemoveNotification(user.ID, pkgbase.ID)
notif.send()
assert Email.count() == 1
email = Email(1).parse()
assert email.headers.get("To") == user.Email
expected = f"AUR Co-Maintainer Notification for {pkgbase.Name}"
assert email.headers.get("Subject") == expected
expected = f"""\
You were removed from the co-maintainer list of {pkgbase.Name} [1].
[1] {aur_location}/pkgbase/{pkgbase.Name}/\
"""
assert email.body == expected
def test_delete(user: User, user2: User, pkgbases: List[PackageBase]):
pkgbase = pkgbases[0]
notif = notify.DeleteNotification(user2.ID, pkgbase.ID)
notif.send()
assert Email.count() == 1
email = Email(1).parse()
assert email.headers.get("To") == user.Email
expected = f"AUR Package deleted: {pkgbase.Name}"
assert email.headers.get("Subject") == expected
expected = f"""\
{user2.Username} [1] deleted {pkgbase.Name} [2].
You will no longer receive notifications about this package.
[1] {aur_location}/account/{user2.Username}/
[2] {aur_location}/pkgbase/{pkgbase.Name}/\
"""
assert email.body == expected
def test_merge(user: User, user2: User, pkgbases: List[PackageBase]):
source, target = pkgbases[:2]
notif = notify.DeleteNotification(user2.ID, source.ID, target.ID)
notif.send()
assert Email.count() == 1
email = Email(1).parse()
assert email.headers.get("To") == user.Email
expected = f"AUR Package deleted: {source.Name}"
assert email.headers.get("Subject") == expected
expected = f"""\
{user2.Username} [1] merged {source.Name} [2] into {target.Name} [3].
--
If you no longer wish receive notifications about the new package,
please go to [3] and click "Disable notifications".
[1] {aur_location}/account/{user2.Username}/
[2] {aur_location}/pkgbase/{source.Name}/
[3] {aur_location}/pkgbase/{target.Name}/\
"""
assert email.body == expected
def set_tu(users: List[User]) -> User:
with db.begin():
for user in users:
user.AccountTypeID = TRUSTED_USER_ID
def test_open_close_request(user: User, user2: User,
pkgreq: PackageRequest,
pkgbases: List[PackageBase]):
set_tu([user])
pkgbase = pkgbases[0]
# Send an open request notification.
notif = notify.RequestOpenNotification(
user2.ID, pkgreq.ID, pkgreq.RequestType.Name, pkgbase.ID)
notif.send()
assert Email.count() == 1
email = Email(1).parse()
assert email.headers.get("To") == aur_request_ml
assert email.headers.get("Cc") == ", ".join([user.Email, user2.Email])
expected = f"[PRQ#{pkgreq.ID}] Orphan Request for {pkgbase.Name}"
assert email.headers.get("Subject") == expected
expected = f"""\
{user2.Username} [1] filed an orphan request for {pkgbase.Name} [2]:
This is a request test comment.
[1] {aur_location}/account/{user2.Username}/
[2] {aur_location}/pkgbase/{pkgbase.Name}/\
"""
assert email.body == expected
# Now send a closure notification on the pkgbase we just opened.
notif = notify.RequestCloseNotification(user2.ID, pkgreq.ID, "rejected")
notif.send()
assert Email.count() == 2
email = Email(2).parse()
assert email.headers.get("To") == aur_request_ml
assert email.headers.get("Cc") == ", ".join([user.Email, user2.Email])
expected = f"[PRQ#{pkgreq.ID}] Orphan Request for {pkgbase.Name} Rejected"
assert email.headers.get("Subject") == expected
expected = f"""\
Request #{pkgreq.ID} has been rejected by {user2.Username} [1].
[1] {aur_location}/account/{user2.Username}/\
"""
assert email.body == expected
# Test auto-accept.
notif = notify.RequestCloseNotification(0, pkgreq.ID, "accepted")
notif.send()
assert Email.count() == 3
email = Email(3).parse()
assert email.headers.get("To") == aur_request_ml
assert email.headers.get("Cc") == ", ".join([user.Email, user2.Email])
expected = (f"[PRQ#{pkgreq.ID}] Orphan Request for "
f"{pkgbase.Name} Accepted")
assert email.headers.get("Subject") == expected
expected = (f"Request #{pkgreq.ID} has been accepted automatically "
"by the Arch User Repository\npackage request system.")
assert email.body == expected
def test_close_request_auto_accept():
pass
def test_close_request_comaintainer_cc(user: User, user2: User,
pkgreq: PackageRequest,
pkgbases: List[PackageBase]):
# TODO: Check this in fastapi code!
pkgbase = pkgbases[0]
with db.begin():
db.create(models.PackageComaintainer, PackageBase=pkgbase,
User=user2, Priority=1)
notif = notify.RequestCloseNotification(0, pkgreq.ID, "accepted")
notif.send()
assert Email.count() == 1
email = Email(1).parse()
assert email.headers.get("To") == aur_request_ml
assert email.headers.get("Cc") == ", ".join([user.Email, user2.Email])
def test_close_request_closure_comment(user: User, user2: User,
pkgreq: PackageRequest,
pkgbases: List[PackageBase]):
pkgbase = pkgbases[0]
with db.begin():
pkgreq.ClosureComment = "This is a test closure comment."
notif = notify.RequestCloseNotification(user2.ID, pkgreq.ID, "accepted")
notif.send()
assert Email.count() == 1
email = Email(1).parse()
assert email.headers.get("To") == aur_request_ml
assert email.headers.get("Cc") == ", ".join([user.Email, user2.Email])
expected = f"[PRQ#{pkgreq.ID}] Orphan Request for {pkgbase.Name} Accepted"
assert email.headers.get("Subject") == expected
expected = f"""\
Request #{pkgreq.ID} has been accepted by {user2.Username} [1]:
This is a test closure comment.
[1] {aur_location}/account/{user2.Username}/\
"""
assert email.body == expected
def test_tu_vote_reminders(user: User):
set_tu([user])
vote_id = 1
notif = notify.TUVoteReminderNotification(vote_id)
notif.send()
assert Email.count() == 1
email = Email(1).parse()
assert email.headers.get("To") == user.Email
expected = f"TU Vote Reminder: Proposal {vote_id}"
assert email.headers.get("Subject") == expected
expected = f"""\
Please remember to cast your vote on proposal {vote_id} [1]. The voting period
ends in less than 48 hours.
[1] {aur_location}/tu/?id={vote_id}\
"""
assert email.body == expected
def test_notify_main(user: User):
""" Test TU vote reminder through aurweb.notify.main(). """
set_tu([user])
vote_id = 1
args = ["aurweb-notify", "tu-vote-reminder", str(vote_id)]
with mock.patch("sys.argv", args):
notify.main()
assert Email.count() == 1
email = Email(1).parse()
assert email.headers.get("To") == user.Email
expected = f"TU Vote Reminder: Proposal {vote_id}"
assert email.headers.get("Subject") == expected
expected = f"""\
Please remember to cast your vote on proposal {vote_id} [1]. The voting period
ends in less than 48 hours.
[1] {aur_location}/tu/?id={vote_id}\
"""
assert email.body == expected
# Save original config.get; we're going to mock it and need
# to be able to fallback when we are not overriding.
config_get = config.get
def mock_smtp_config(cls):
def _mock_smtp_config(section: str, key: str):
if section == "notifications":
if key == "sendmail":
return cls()
elif key == "smtp-use-ssl":
return cls(0)
elif key == "smtp-use-starttls":
return cls(0)
elif key == "smtp-user":
return cls()
elif key == "smtp-password":
return cls()
return cls(config_get(section, key))
return _mock_smtp_config
def test_smtp(user: User):
with db.begin():
user.ResetKey = "12345678901234567890123456789012"
SMTP = FakeSMTP()
get = "aurweb.config.get"
getboolean = "aurweb.config.getboolean"
with mock.patch(get, side_effect=mock_smtp_config(str)):
with mock.patch(getboolean, side_effect=mock_smtp_config(bool)):
with mock.patch("smtplib.SMTP", side_effect=lambda a, b: SMTP):
config.rehash()
notif = notify.WelcomeNotification(user.ID)
notif.send()
config.rehash()
assert len(SMTP.emails) == 1
def mock_smtp_starttls_config(cls):
def _mock_smtp_starttls_config(section: str, key: str):
if section == "notifications":
if key == "sendmail":
return cls()
elif key == "smtp-use-ssl":
return cls(0)
elif key == "smtp-use-starttls":
return cls(1)
elif key == "smtp-user":
return cls("test")
elif key == "smtp-password":
return cls("password")
return cls(config_get(section, key))
return _mock_smtp_starttls_config
def test_smtp_starttls(user: User):
# This test does two things: test starttls path and test
# path where we have a backup email.
with db.begin():
user.ResetKey = "12345678901234567890123456789012"
user.BackupEmail = "backup@example.org"
SMTP = FakeSMTP()
get = "aurweb.config.get"
getboolean = "aurweb.config.getboolean"
with mock.patch(get, side_effect=mock_smtp_starttls_config(str)):
with mock.patch(
getboolean, side_effect=mock_smtp_starttls_config(bool)):
with mock.patch("smtplib.SMTP", side_effect=lambda a, b: SMTP):
notif = notify.WelcomeNotification(user.ID)
notif.send()
assert SMTP.starttls_enabled
assert SMTP.user
assert SMTP.passwd
assert len(SMTP.emails) == 2
to = SMTP.emails[0][1]
assert to == [user.Email]
to = SMTP.emails[1][1]
assert to == [user.BackupEmail]
def mock_smtp_ssl_config(cls):
def _mock_smtp_ssl_config(section: str, key: str):
if section == "notifications":
if key == "sendmail":
return cls()
elif key == "smtp-use-ssl":
return cls(1)
elif key == "smtp-use-starttls":
return cls(0)
elif key == "smtp-user":
return cls("test")
elif key == "smtp-password":
return cls("password")
return cls(config_get(section, key))
return _mock_smtp_ssl_config
def test_smtp_ssl(user: User):
with db.begin():
user.ResetKey = "12345678901234567890123456789012"
SMTP = FakeSMTP_SSL()
get = "aurweb.config.get"
getboolean = "aurweb.config.getboolean"
with mock.patch(get, side_effect=mock_smtp_ssl_config(str)):
with mock.patch(getboolean, side_effect=mock_smtp_ssl_config(bool)):
with mock.patch("smtplib.SMTP_SSL", side_effect=lambda a, b: SMTP):
notif = notify.WelcomeNotification(user.ID)
notif.send()
assert len(SMTP.emails) == 1
assert SMTP.use_ssl
assert SMTP.user
assert SMTP.passwd
def test_notification_defaults():
notif = notify.Notification()
assert notif.get_refs() == tuple()
assert notif.get_headers() == dict()
assert notif.get_cc() == list()