change(python): refactor & centralize comaintainer management

This commit centralizes comaintainer management with a few new
functions and uses them more appropriately within routes:

- aurweb.packages.util.latest_priority
- aurweb.packages.util.remove_comaintainer
- aurweb.packages.util.remove_comaintainers
- aurweb.packages.util.add_comaintainer
- aurweb.packages.util.add_comaintainers
- aurweb.packages.util.rotate_comaintainers

Closes #117

Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
Kevin Morris 2021-12-29 01:28:56 -08:00
parent 9d3e77bab1
commit fc229d755b
No known key found for this signature in database
GPG key ID: F7E46DED420788F3
3 changed files with 151 additions and 82 deletions

View file

@ -8,8 +8,9 @@ from fastapi import HTTPException, Request
from sqlalchemy import and_, orm from sqlalchemy import and_, orm
from aurweb import db, l10n, models, util from aurweb import db, l10n, models, util
from aurweb.models import Package, PackageBase, User
from aurweb.models.official_provider import OFFICIAL_BASE from aurweb.models.official_provider import OFFICIAL_BASE
from aurweb.models.package import Package from aurweb.models.package_comaintainer import PackageComaintainer
from aurweb.models.package_dependency import PackageDependency from aurweb.models.package_dependency import PackageDependency
from aurweb.models.relation_type import PROVIDES_ID from aurweb.models.relation_type import PROVIDES_ID
from aurweb.redis import redis_connection from aurweb.redis import redis_connection
@ -235,82 +236,151 @@ def query_notified(query: List[models.Package],
return output return output
def remove_comaintainers(pkgbase: models.PackageBase, def remove_comaintainer(comaint: PackageComaintainer) \
usernames: List[str]) -> None: -> notify.ComaintainerRemoveNotification:
"""
Remove a PackageComaintainer.
This function does *not* begin any database transaction and
must be used **within** a database transaction, e.g.:
with db.begin():
remove_comaintainer(comaint)
:param comaint: Target PackageComaintainer to be deleted
:return: ComaintainerRemoveNotification
"""
pkgbase = comaint.PackageBase
notif = notify.ComaintainerRemoveNotification(comaint.User.ID, pkgbase.ID)
db.delete(comaint)
rotate_comaintainers(pkgbase)
return notif
def remove_comaintainers(pkgbase: PackageBase, usernames: List[str]) -> None:
""" """
Remove comaintainers from `pkgbase`. Remove comaintainers from `pkgbase`.
:param pkgbase: PackageBase instance :param pkgbase: PackageBase instance
:param usernames: Iterable of username strings :param usernames: Iterable of username strings
:return: None
""" """
notifications = [] notifications = []
with db.begin(): with db.begin():
for username in usernames: comaintainers = pkgbase.comaintainers.join(User).filter(
# We know that the users we passed here are in the DB. User.Username.in_(usernames)
# No need to check for their existence. ).all()
comaintainer = pkgbase.comaintainers.join(models.User).filter( notifications = [
models.User.Username == username notify.ComaintainerRemoveNotification(co.User.ID, pkgbase.ID)
).first() for co in comaintainers
notifications.append( ]
notify.ComaintainerRemoveNotification( db.delete_all(comaintainers)
comaintainer.User.ID, pkgbase.ID)
)
db.delete(comaintainer)
# Send out notifications if need be. # Rotate comaintainer priority values.
with db.begin():
rotate_comaintainers(pkgbase)
# Send out notifications.
util.apply_all(notifications, lambda n: n.send()) util.apply_all(notifications, lambda n: n.send())
def latest_priority(pkgbase: PackageBase) -> int:
"""
Return the highest Priority column related to `pkgbase`.
:param pkgbase: PackageBase instance
:return: Highest Priority found or 0 if no records exist
"""
# Order comaintainers related to pkgbase by Priority DESC.
record = pkgbase.comaintainers.order_by(
PackageComaintainer.Priority.desc()).first()
# Use Priority column if record exists, otherwise 0.
return record.Priority if record else 0
class NoopComaintainerNotification:
""" A noop notification stub used as an error-state return value. """
def send(self) -> None:
""" noop """
return
def add_comaintainer(pkgbase: PackageBase, comaintainer: User) \
-> notify.ComaintainerAddNotification:
"""
Add a new comaintainer to `pkgbase`.
:param pkgbase: PackageBase instance
:param comaintainer: User instance used for new comaintainer record
:return: ComaintainerAddNotification
"""
# Skip given `comaintainers` who are already maintainer.
if pkgbase.Maintainer == comaintainer:
return NoopComaintainerNotification()
# Priority for the new comaintainer is +1 more than the highest.
new_prio = latest_priority(pkgbase) + 1
with db.begin():
db.create(PackageComaintainer, PackageBase=pkgbase,
User=comaintainer, Priority=new_prio)
return notify.ComaintainerAddNotification(comaintainer.ID, pkgbase.ID)
def add_comaintainers(request: Request, pkgbase: models.PackageBase, def add_comaintainers(request: Request, pkgbase: models.PackageBase,
priority: int, usernames: List[str]) -> None: usernames: List[str]) -> None:
""" """
Add comaintainers to `pkgbase`. Add comaintainers to `pkgbase`.
:param request: FastAPI request :param request: FastAPI request
:param pkgbase: PackageBase instance :param pkgbase: PackageBase instance
:param priority: Initial priority value
:param usernames: Iterable of username strings :param usernames: Iterable of username strings
:return: None on success, an error string on failure :return: Error string on failure else None
""" """
# For each username in usernames, perform validation of the username
# First, perform a check against all usernames given; for each # and append the User record to `users` if no errors occur.
# username, add its related User object to memo. users = []
_ = l10n.get_translator_for_request(request)
memo = {}
for username in usernames: for username in usernames:
user = db.query(models.User).filter( user = db.query(User).filter(User.Username == username).first()
models.User.Username == username).first()
if not user: if not user:
_ = l10n.get_translator_for_request(request)
return _("Invalid user name: %s") % username return _("Invalid user name: %s") % username
memo[username] = user users.append(user)
# Alright, now that we got past the check, add them all to the DB.
notifications = [] notifications = []
with db.begin():
for username in usernames:
user = memo.get(username)
if pkgbase.Maintainer == user:
# Already a maintainer. Move along.
continue
# If we get here, our user model object is in the memo. def add_comaint(user: User):
comaintainer = db.create( nonlocal notifications
models.PackageComaintainer, # Populate `notifications` with add_comaintainer's return value,
PackageBase=pkgbase, # which is a ComaintainerAddNotification.
User=user, notifications.append(add_comaintainer(pkgbase, user))
Priority=priority)
priority += 1
notifications.append( # Move along: add all `users` as new `pkgbase` comaintainers.
notify.ComaintainerAddNotification( util.apply_all(users, add_comaint)
comaintainer.User.ID, pkgbase.ID)
)
# Send out notifications. # Send out notifications.
util.apply_all(notifications, lambda n: n.send()) util.apply_all(notifications, lambda n: n.send())
def rotate_comaintainers(pkgbase: PackageBase) -> None:
"""
Rotate `pkgbase` comaintainers.
This function resets the Priority column of all PackageComaintainer
instances related to `pkgbase` to seqential 1 .. n values with
persisted order.
:param pkgbase: PackageBase instance
"""
comaintainers = pkgbase.comaintainers.order_by(
models.PackageComaintainer.Priority.asc())
for i, comaint in enumerate(comaintainers):
comaint.Priority = i + 1
def pkg_required(pkgname: str, provides: List[str], limit: int) \ def pkg_required(pkgname: str, provides: List[str], limit: int) \
-> List[PackageDependency]: -> List[PackageDependency]:
""" """

View file

@ -543,27 +543,13 @@ async def package_base_comaintainers_post(
users = {e.strip() for e in users.split("\n") if bool(e.strip())} users = {e.strip() for e in users.split("\n") if bool(e.strip())}
records = {c.User.Username for c in pkgbase.comaintainers} records = {c.User.Username for c in pkgbase.comaintainers}
logger.debug(f"RemoveComaintainers: {records.difference(users)}") users_to_rm = records.difference(users)
pkgutil.remove_comaintainers(pkgbase, records.difference(users)) pkgutil.remove_comaintainers(pkgbase, users_to_rm)
logger.debug(f"{request.user} removed comaintainers from "
f"{pkgbase.Name}: {users_to_rm}")
# Default priority (lowest value; most preferred). users_to_add = users.difference(records)
priority = 1 error = pkgutil.add_comaintainers(request, pkgbase, users_to_add)
# Get the highest priority in the comaintainer set.
last_priority = pkgbase.comaintainers.order_by(
models.PackageComaintainer.Priority.desc()
).limit(1).first()
# If that record exists, we use a priority which is 1 higher.
# TODO: This needs to ensure that it wraps around and preserves
# ordering in the case where we hit the max number allowed by
# the Priority column type.
if last_priority:
priority = last_priority.Priority + 1
logger.debug(f"AddComaintainers: {users.difference(records)}")
error = pkgutil.add_comaintainers(request, pkgbase, priority,
users.difference(records))
if error: if error:
context = make_context(request, "Manage Co-maintainers") context = make_context(request, "Manage Co-maintainers")
context["pkgbase"] = pkgbase context["pkgbase"] = pkgbase
@ -573,6 +559,9 @@ async def package_base_comaintainers_post(
context["errors"] = [error] context["errors"] = [error]
return render_template(request, "pkgbase/comaintainers.html", context) return render_template(request, "pkgbase/comaintainers.html", context)
logger.debug(f"{request.user} added comaintainers to "
f"{pkgbase.Name}: {users_to_add}")
return RedirectResponse(f"/pkgbase/{pkgbase.Name}", return RedirectResponse(f"/pkgbase/{pkgbase.Name}",
status_code=HTTPStatus.SEE_OTHER) status_code=HTTPStatus.SEE_OTHER)
@ -925,21 +914,26 @@ def pkgbase_disown_instance(request: Request, pkgbase: models.PackageBase):
disowner = request.user disowner = request.user
notifs = [notify.DisownNotification(disowner.ID, pkgbase.ID)] notifs = [notify.DisownNotification(disowner.ID, pkgbase.ID)]
if disowner != pkgbase.Maintainer: is_maint = disowner == pkgbase.Maintainer
if is_maint:
with db.begin():
# Comaintainer with the lowest Priority value; next-in-line.
prio_comaint = pkgbase.comaintainers.order_by(
models.PackageComaintainer.Priority.asc()
).first()
if prio_comaint:
# If there is such a comaintainer, promote them to maint.
pkgbase.Maintainer = prio_comaint.User
notifs.append(pkgutil.remove_comaintainer(prio_comaint))
else:
# Otherwise, just orphan the package completely.
pkgbase.Maintainer = None
elif request.user.has_credential(creds.PKGBASE_DISOWN):
# Otherwise, the request user performing this disownage is a
# Trusted User and we treat it like a standard orphan request.
notifs += handle_request(request, ORPHAN_ID, pkgbase) notifs += handle_request(request, ORPHAN_ID, pkgbase)
with db.begin(): with db.begin():
pkgbase.Maintainer = None pkgbase.Maintainer = None
else:
co = pkgbase.comaintainers.order_by(
models.PackageComaintainer.Priority.asc()
).limit(1).first()
with db.begin():
if co:
pkgbase.Maintainer = co.User
db.delete(co)
else:
pkgbase.Maintainer = None
util.apply_all(notifs, lambda n: n.send()) util.apply_all(notifs, lambda n: n.send())

View file

@ -75,14 +75,19 @@ def client() -> TestClient:
yield TestClient(app=asgi.app) yield TestClient(app=asgi.app)
def create_user(username: str) -> User:
with db.begin():
user = db.create(User, Username=username,
Email=f"{username}@example.org",
Passwd="testPassword",
AccountTypeID=USER_ID)
return user
@pytest.fixture @pytest.fixture
def user() -> User: def user() -> User:
""" Yield a user. """ """ Yield a user. """
with db.begin(): user = create_user("test")
user = db.create(User, Username="test",
Email="test@example.org",
Passwd="testPassword",
AccountTypeID=USER_ID)
yield user yield user