housekeep(fastapi): simplify package_base_comaintainers_post

Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
Kevin Morris 2021-11-20 13:19:34 -08:00
parent f897411ddf
commit 008a8824ce
No known key found for this signature in database
GPG key ID: F7E46DED420788F3
2 changed files with 88 additions and 73 deletions

View file

@ -4,13 +4,14 @@ from typing import Dict, List, Union
import orjson
from fastapi import HTTPException
from fastapi import HTTPException, Request
from sqlalchemy import and_, orm
from aurweb import db, models
from aurweb import db, l10n, models, util
from aurweb.models.official_provider import OFFICIAL_BASE
from aurweb.models.relation_type import PROVIDES_ID
from aurweb.redis import redis_connection
from aurweb.scripts import notify
from aurweb.templates import register_filter
@ -223,6 +224,85 @@ def query_notified(query: List[models.Package],
).filter(
models.PackageNotification.UserID == user.ID
)
for notify in notified:
output[notify.PackageBase.ID] = True
for notif in notified:
output[notif.PackageBase.ID] = True
return output
def remove_comaintainers(pkgbase: models.PackageBase,
usernames: List[str]) -> None:
"""
Remove comaintainers from `pkgbase`.
:param pkgbase: PackageBase instance
:param usernames: Iterable of username strings
:return: None
"""
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
notifications = []
with db.begin():
for username in usernames:
# We know that the users we passed here are in the DB.
# No need to check for their existence.
comaintainer = pkgbase.comaintainers.join(models.User).filter(
models.User.Username == username
).first()
notifications.append(
notify.ComaintainerRemoveNotification(
conn, comaintainer.User.ID, pkgbase.ID
)
)
db.delete(comaintainer)
# Send out notifications if need be.
util.apply_all(notifications, lambda n: n.send())
def add_comaintainers(request: Request, pkgbase: models.PackageBase,
priority: int, usernames: List[str]) -> None:
"""
Add comaintainers to `pkgbase`.
:param request: FastAPI request
:param pkgbase: PackageBase instance
:param priority: Initial priority value
:param usernames: Iterable of username strings
:return: None on success, an error string on failure
"""
# First, perform a check against all usernames given; for each
# username, add its related User object to memo.
_ = l10n.get_translator_for_request(request)
memo = {}
for username in usernames:
user = db.query(models.User).filter(
models.User.Username == username).first()
if not user:
return _("Invalid user name: %s") % username
memo[username] = user
# Alright, now that we got past the check, add them all to the DB.
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
notifications = []
with db.begin():
for username in usernames:
user = memo.get(username)
if pkgbase.Maintainer == user:
# Already a maintainer. Move along.
continue
# If we get here, our user model object is in the memo.
comaintainer = db.create(
models.PackageComaintainer,
PackageBase=pkgbase,
User=user,
Priority=priority)
priority += 1
notifications.append(
notify.ComaintainerAddNotification(
conn, comaintainer.User.ID, pkgbase.ID)
)
# Send out notifications.
util.apply_all(notifications, lambda n: n.send())

View file

@ -15,6 +15,7 @@ from aurweb.exceptions import ValidationError
from aurweb.models.package_request import ACCEPTED_ID, PENDING_ID, REJECTED_ID
from aurweb.models.relation_type import CONFLICTS_ID, PROVIDES_ID, REPLACES_ID
from aurweb.models.request_type import DELETION_ID, MERGE, MERGE_ID
from aurweb.packages import util as pkgutil
from aurweb.packages import validate
from aurweb.packages.search import PackageSearch
from aurweb.packages.util import get_pkg_or_base, get_pkgbase_comment, get_pkgreq_by_id, query_notified, query_voted
@ -531,28 +532,6 @@ async def package_base_comaintainers(request: Request, name: str) -> Response:
return render_template(request, "pkgbase/comaintainers.html", context)
def remove_users(pkgbase, usernames):
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
notifications = []
with db.begin():
for username in usernames:
# We know that the users we passed here are in the DB.
# No need to check for their existence.
comaintainer = pkgbase.comaintainers.join(models.User).filter(
models.User.Username == username
).first()
notifications.append(
notify.ComaintainerRemoveNotification(
conn, comaintainer.User.ID, pkgbase.ID
)
)
db.delete(comaintainer)
# Send out notifications if need be.
for notify_ in notifications:
notify_.send()
@router.post("/pkgbase/{name}/comaintainers")
@auth_required(True, redirect="/pkgbase/{name}/comaintainers")
async def package_base_comaintainers_post(
@ -573,7 +552,7 @@ async def package_base_comaintainers_post(
users.remove(str()) # Remove any empty strings from the set.
records = {c.User.Username for c in pkgbase.comaintainers}
remove_users(pkgbase, records.difference(users))
pkgutil.remove_comaintainers(pkgbase, records.difference(users))
# Default priority (lowest value; most preferred).
priority = 1
@ -590,52 +569,8 @@ async def package_base_comaintainers_post(
if last_priority:
priority = last_priority.Priority + 1
def add_users(usernames):
""" Add users as comaintainers to pkgbase.
:param usernames: An iterable of username strings
:return: None on success, an error string on failure. """
nonlocal request, pkgbase, priority
# First, perform a check against all usernames given; for each
# username, add its related User object to memo.
_ = l10n.get_translator_for_request(request)
memo = {}
for username in usernames:
user = db.query(models.User).filter(
models.User.Username == username).first()
if not user:
return _("Invalid user name: %s") % username
memo[username] = user
# Alright, now that we got past the check, add them all to the DB.
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
notifications = []
with db.begin():
for username in usernames:
user = memo.get(username)
if pkgbase.Maintainer == user:
# Already a maintainer. Move along.
continue
# If we get here, our user model object is in the memo.
comaintainer = db.create(
models.PackageComaintainer,
PackageBase=pkgbase,
User=user,
Priority=priority)
priority += 1
notifications.append(
notify.ComaintainerAddNotification(
conn, comaintainer.User.ID, pkgbase.ID)
)
# Send out notifications.
for notify_ in notifications:
notify_.send()
error = add_users(users.difference(records))
error = pkgutil.add_comaintainers(request, pkgbase, priority,
users.difference(records))
if error:
context = make_context(request, "Manage Co-maintainers")
context["pkgbase"] = pkgbase