From bd2ad9b616ba62c4d9e876b3a11c5f127b39fb99 Mon Sep 17 00:00:00 2001 From: Kevin Morris Date: Sat, 1 Jan 2022 20:09:22 -0800 Subject: [PATCH] change(python): put pkgbase routes & impl into their own modules Introduces new router: - `aurweb.routers.pkgbase` Introduces new package: - `aurweb.pkgbase` Introduces new modules: - `aurweb.pkgbase.actions` - `aurweb.pkgbase.util` Changes: - `pkgbase_{action}_instance` functions are now located in `aurweb.pkgbase.actions`. - `pkgbase`-wise routes have been moved to `aurweb.routers.pkgbase`. - `make_single_context` was moved to `aurweb.pkgbase.util.make_context`. Signed-off-by: Kevin Morris --- aurweb/asgi.py | 3 +- aurweb/pkgbase/__init__.py | 0 aurweb/pkgbase/actions.py | 142 +++++++ aurweb/pkgbase/util.py | 47 +++ aurweb/routers/packages.py | 794 +------------------------------------ aurweb/routers/pkgbase.py | 701 ++++++++++++++++++++++++++++++++ 6 files changed, 910 insertions(+), 777 deletions(-) create mode 100644 aurweb/pkgbase/__init__.py create mode 100644 aurweb/pkgbase/actions.py create mode 100644 aurweb/pkgbase/util.py create mode 100644 aurweb/routers/pkgbase.py diff --git a/aurweb/asgi.py b/aurweb/asgi.py index ef8d5933..88e72052 100644 --- a/aurweb/asgi.py +++ b/aurweb/asgi.py @@ -21,7 +21,7 @@ from aurweb.auth import BasicAuthBackend from aurweb.db import get_engine, query from aurweb.models import AcceptedTerm, Term from aurweb.prometheus import http_api_requests_total, http_requests_total, instrumentator -from aurweb.routers import accounts, auth, html, packages, rpc, rss, sso, trusted_user +from aurweb.routers import accounts, auth, html, packages, pkgbase, rpc, rss, sso, trusted_user from aurweb.templates import make_context, render_template # Setup the FastAPI app. @@ -81,6 +81,7 @@ async def app_startup(): app.include_router(trusted_user.router) app.include_router(rss.router) app.include_router(packages.router) + app.include_router(pkgbase.router) app.include_router(rpc.router) # Initialize the database engine and ORM. diff --git a/aurweb/pkgbase/__init__.py b/aurweb/pkgbase/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/aurweb/pkgbase/actions.py b/aurweb/pkgbase/actions.py new file mode 100644 index 00000000..00098d2e --- /dev/null +++ b/aurweb/pkgbase/actions.py @@ -0,0 +1,142 @@ +from typing import List + +from fastapi import Request + +from aurweb import db, logging, util +from aurweb.auth import creds +from aurweb.models import PackageBase +from aurweb.models.package_comaintainer import PackageComaintainer +from aurweb.models.package_notification import PackageNotification +from aurweb.models.request_type import DELETION_ID, MERGE_ID, ORPHAN_ID +from aurweb.packages.requests import handle_request, update_closure_comment +from aurweb.scripts import notify, popupdate + +logger = logging.get_logger(__name__) + + +def pkgbase_notify_instance(request: Request, pkgbase: PackageBase) -> None: + notif = db.query(pkgbase.notifications.filter( + PackageNotification.UserID == request.user.ID + ).exists()).scalar() + has_cred = request.user.has_credential(creds.PKGBASE_NOTIFY) + if has_cred and not notif: + with db.begin(): + db.create(PackageNotification, + PackageBase=pkgbase, + User=request.user) + + +def pkgbase_unnotify_instance(request: Request, pkgbase: PackageBase) -> None: + notif = pkgbase.notifications.filter( + PackageNotification.UserID == request.user.ID + ).first() + has_cred = request.user.has_credential(creds.PKGBASE_NOTIFY) + if has_cred and notif: + with db.begin(): + db.delete(notif) + + +def pkgbase_unflag_instance(request: Request, pkgbase: PackageBase) -> None: + has_cred = request.user.has_credential( + creds.PKGBASE_UNFLAG, approved=[pkgbase.Flagger, pkgbase.Maintainer]) + if has_cred: + with db.begin(): + pkgbase.OutOfDateTS = None + pkgbase.Flagger = None + pkgbase.FlaggerComment = str() + + +def pkgbase_disown_instance(request: Request, pkgbase: PackageBase) -> None: + import aurweb.packages.util as pkgutil + + disowner = request.user + notifs = [notify.DisownNotification(disowner.ID, pkgbase.ID)] + + is_maint = disowner == pkgbase.Maintainer + if is_maint: + with db.begin(): + # Comaintainer with the lowest Priority value; next-in-line. + prio_comaint = pkgbase.comaintainers.order_by( + PackageComaintainer.Priority.asc() + ).first() + if prio_comaint: + # If there is such a comaintainer, promote them to maint. + pkgbase.Maintainer = prio_comaint.User + notifs.append(pkgutil.remove_comaintainer(prio_comaint)) + else: + # Otherwise, just orphan the package completely. + pkgbase.Maintainer = None + elif request.user.has_credential(creds.PKGBASE_DISOWN): + # Otherwise, the request user performing this disownage is a + # Trusted User and we treat it like a standard orphan request. + notifs += handle_request(request, ORPHAN_ID, pkgbase) + with db.begin(): + pkgbase.Maintainer = None + + util.apply_all(notifs, lambda n: n.send()) + + +def pkgbase_adopt_instance(request: Request, pkgbase: PackageBase) -> None: + with db.begin(): + pkgbase.Maintainer = request.user + + notif = notify.AdoptNotification(request.user.ID, pkgbase.ID) + notif.send() + + +def pkgbase_delete_instance(request: Request, pkgbase: PackageBase, + comments: str = str()) \ + -> List[notify.Notification]: + notifs = handle_request(request, DELETION_ID, pkgbase) + [ + notify.DeleteNotification(request.user.ID, pkgbase.ID) + ] + + with db.begin(): + update_closure_comment(pkgbase, DELETION_ID, comments) + db.delete(pkgbase) + + return notifs + + +def pkgbase_merge_instance(request: Request, pkgbase: PackageBase, + target: PackageBase, comments: str = str()) -> None: + pkgbasename = str(pkgbase.Name) + + # Create notifications. + notifs = handle_request(request, MERGE_ID, pkgbase, target) + + # Target votes and notifications sets of user IDs that are + # looking to be migrated. + target_votes = set(v.UsersID for v in target.package_votes) + target_notifs = set(n.UserID for n in target.notifications) + + with db.begin(): + # Merge pkgbase's comments. + for comment in pkgbase.comments: + comment.PackageBase = target + + # Merge notifications that don't yet exist in the target. + for notif in pkgbase.notifications: + if notif.UserID not in target_notifs: + notif.PackageBase = target + + # Merge votes that don't yet exist in the target. + for vote in pkgbase.package_votes: + if vote.UsersID not in target_votes: + vote.PackageBase = target + + # Run popupdate. + popupdate.run_single(target) + + with db.begin(): + # Delete pkgbase and its packages now that everything's merged. + for pkg in pkgbase.packages: + db.delete(pkg) + db.delete(pkgbase) + + # Log this out for accountability purposes. + logger.info(f"Trusted User '{request.user.Username}' merged " + f"'{pkgbasename}' into '{target.Name}'.") + + # Send notifications. + util.apply_all(notifs, lambda n: n.send()) diff --git a/aurweb/pkgbase/util.py b/aurweb/pkgbase/util.py new file mode 100644 index 00000000..e348d5f0 --- /dev/null +++ b/aurweb/pkgbase/util.py @@ -0,0 +1,47 @@ +from typing import Any, Dict + +from fastapi import Request + +from aurweb import config +from aurweb.models import PackageBase +from aurweb.models.package_comment import PackageComment +from aurweb.models.package_request import PackageRequest +from aurweb.models.package_vote import PackageVote +from aurweb.templates import make_context as _make_context + + +def make_context(request: Request, pkgbase: PackageBase) -> Dict[str, Any]: + """ Make a basic context for package or pkgbase. + + :param request: FastAPI request + :param pkgbase: PackageBase instance + :return: A pkgbase context without specific differences + """ + context = _make_context(request, pkgbase.Name) + + context["git_clone_uri_anon"] = config.get("options", "git_clone_uri_anon") + context["git_clone_uri_priv"] = config.get("options", "git_clone_uri_priv") + context["pkgbase"] = pkgbase + context["packages_count"] = pkgbase.packages.count() + context["keywords"] = pkgbase.keywords + context["comments"] = pkgbase.comments.order_by( + PackageComment.CommentTS.desc() + ) + context["pinned_comments"] = pkgbase.comments.filter( + PackageComment.PinnedTS != 0 + ).order_by(PackageComment.CommentTS.desc()) + + context["is_maintainer"] = bool(request.user == pkgbase.Maintainer) + context["notified"] = request.user.notified(pkgbase) + + context["out_of_date"] = bool(pkgbase.OutOfDateTS) + + context["voted"] = request.user.package_votes.filter( + PackageVote.PackageBaseID == pkgbase.ID + ).scalar() + + context["requests"] = pkgbase.requests.filter( + PackageRequest.ClosedTS.is_(None) + ).count() + + return context diff --git a/aurweb/routers/packages.py b/aurweb/routers/packages.py index 585c62ee..eda83c3f 100644 --- a/aurweb/routers/packages.py +++ b/aurweb/routers/packages.py @@ -3,27 +3,26 @@ from datetime import datetime from http import HTTPStatus from typing import Any, Dict, List -from fastapi import APIRouter, Form, HTTPException, Query, Request, Response -from fastapi.responses import JSONResponse, RedirectResponse -from sqlalchemy import and_, case +from fastapi import APIRouter, Form, Query, Request, Response +from fastapi.responses import RedirectResponse +from sqlalchemy import case import aurweb.filters import aurweb.packages.util -from aurweb import config, db, defaults, l10n, logging, models, util +from aurweb import config, db, defaults, logging, models, util from aurweb.auth import auth_required, creds from aurweb.exceptions import InvariantError, ValidationError from aurweb.models.package_request import ACCEPTED_ID, PENDING_ID, REJECTED_ID from aurweb.models.relation_type import CONFLICTS_ID, PROVIDES_ID, REPLACES_ID -from aurweb.models.request_type import DELETION_ID, MERGE_ID, ORPHAN_ID from aurweb.packages import util as pkgutil from aurweb.packages import validate -from aurweb.packages.requests import handle_request, update_closure_comment from aurweb.packages.search import PackageSearch -from aurweb.packages.util import get_pkg_or_base, get_pkgbase_comment, get_pkgreq_by_id -from aurweb.scripts import notify, popupdate -from aurweb.scripts.rendercomment import update_comment_render_fastapi -from aurweb.templates import make_context, make_variable_context, render_raw_template, render_template +from aurweb.packages.util import get_pkg_or_base, get_pkgreq_by_id +from aurweb.pkgbase import actions as pkgbase_actions +from aurweb.pkgbase import util as pkgbaseutil +from aurweb.scripts import notify +from aurweb.templates import make_context, make_variable_context, render_template logger = logging.get_logger(__name__) router = APIRouter() @@ -130,59 +129,6 @@ async def packages(request: Request) -> Response: return await packages_get(request, context) -def pkgbase_delete_instance(request: Request, pkgbase: models.PackageBase, - comments: str = str()) \ - -> List[notify.Notification]: - notifs = handle_request(request, DELETION_ID, pkgbase) + [ - notify.DeleteNotification(request.user.ID, pkgbase.ID) - ] - - with db.begin(): - update_closure_comment(pkgbase, DELETION_ID, comments) - db.delete(pkgbase) - - return notifs - - -async def make_single_context(request: Request, - pkgbase: models.PackageBase) -> Dict[str, Any]: - """ Make a basic context for package or pkgbase. - - :param request: FastAPI request - :param pkgbase: PackageBase instance - :return: A pkgbase context without specific differences - """ - context = make_context(request, pkgbase.Name) - context["git_clone_uri_anon"] = aurweb.config.get("options", - "git_clone_uri_anon") - context["git_clone_uri_priv"] = aurweb.config.get("options", - "git_clone_uri_priv") - context["pkgbase"] = pkgbase - context["packages_count"] = pkgbase.packages.count() - context["keywords"] = pkgbase.keywords - context["comments"] = pkgbase.comments.order_by( - models.PackageComment.CommentTS.desc() - ) - context["pinned_comments"] = pkgbase.comments.filter( - models.PackageComment.PinnedTS != 0 - ).order_by(models.PackageComment.CommentTS.desc()) - - context["is_maintainer"] = (request.user.is_authenticated() - and request.user.ID == pkgbase.MaintainerUID) - context["notified"] = request.user.notified(pkgbase) - - context["out_of_date"] = bool(pkgbase.OutOfDateTS) - - context["voted"] = request.user.package_votes.filter( - models.PackageVote.PackageBaseID == pkgbase.ID).scalar() - - context["requests"] = pkgbase.requests.filter( - models.PackageRequest.ClosedTS.is_(None) - ).count() - - return context - - @router.get("/packages/{name}") async def package(request: Request, name: str) -> Response: # Get the Package. @@ -200,7 +146,7 @@ async def package(request: Request, name: str) -> Response: rels_data["r"].append(rel) # Add our base information. - context = await make_single_context(request, pkgbase) + context = pkgbaseutil.make_context(request, pkgbase) context["package"] = pkg # Package sources. @@ -235,249 +181,6 @@ async def package(request: Request, name: str) -> Response: return render_template(request, "packages/show.html", context) -@router.get("/pkgbase/{name}") -async def package_base(request: Request, name: str) -> Response: - # Get the PackageBase. - pkgbase = get_pkg_or_base(name, models.PackageBase) - - # If this is not a split package, redirect to /packages/{name}. - if pkgbase.packages.count() == 1: - return RedirectResponse(f"/packages/{name}", - status_code=int(HTTPStatus.SEE_OTHER)) - - # Add our base information. - context = await make_single_context(request, pkgbase) - context["packages"] = pkgbase.packages.all() - - return render_template(request, "pkgbase.html", context) - - -@router.get("/pkgbase/{name}/voters") -async def package_base_voters(request: Request, name: str) -> Response: - # Get the PackageBase. - pkgbase = get_pkg_or_base(name, models.PackageBase) - - if not request.user.has_credential(creds.PKGBASE_LIST_VOTERS): - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - context = make_context(request, "Voters") - context["pkgbase"] = pkgbase - return render_template(request, "pkgbase/voters.html", context) - - -@router.post("/pkgbase/{name}/comments") -@auth_required() -async def pkgbase_comments_post( - request: Request, name: str, - comment: str = Form(default=str()), - enable_notifications: bool = Form(default=False)): - """ Add a new comment. """ - pkgbase = get_pkg_or_base(name, models.PackageBase) - - if not comment: - raise HTTPException(status_code=HTTPStatus.BAD_REQUEST) - - # If the provided comment is different than the record's version, - # update the db record. - now = int(datetime.utcnow().timestamp()) - with db.begin(): - comment = db.create(models.PackageComment, User=request.user, - PackageBase=pkgbase, - Comments=comment, RenderedComment=str(), - CommentTS=now) - - if enable_notifications and not request.user.notified(pkgbase): - db.create(models.PackageNotification, - User=request.user, - PackageBase=pkgbase) - update_comment_render_fastapi(comment) - - # Redirect to the pkgbase page. - return RedirectResponse(f"/pkgbase/{pkgbase.Name}#comment-{comment.ID}", - status_code=HTTPStatus.SEE_OTHER) - - -@router.get("/pkgbase/{name}/comments/{id}/form") -@auth_required() -async def pkgbase_comment_form(request: Request, name: str, id: int, - next: str = Query(default=None)): - """ Produce a comment form for comment {id}. """ - pkgbase = get_pkg_or_base(name, models.PackageBase) - comment = pkgbase.comments.filter(models.PackageComment.ID == id).first() - if not comment: - return JSONResponse({}, status_code=HTTPStatus.NOT_FOUND) - - if not request.user.is_elevated() and request.user != comment.User: - return JSONResponse({}, status_code=HTTPStatus.UNAUTHORIZED) - - context = await make_single_context(request, pkgbase) - context["comment"] = comment - - if not next: - next = f"/pkgbase/{name}" - - context["next"] = next - - form = render_raw_template( - request, "partials/packages/comment_form.html", context) - return JSONResponse({"form": form}) - - -@router.post("/pkgbase/{name}/comments/{id}") -@auth_required() -async def pkgbase_comment_post( - request: Request, name: str, id: int, - comment: str = Form(default=str()), - enable_notifications: bool = Form(default=False), - next: str = Form(default=None)): - pkgbase = get_pkg_or_base(name, models.PackageBase) - db_comment = get_pkgbase_comment(pkgbase, id) - - if not comment: - raise HTTPException(status_code=HTTPStatus.BAD_REQUEST) - - # If the provided comment is different than the record's version, - # update the db record. - now = int(datetime.utcnow().timestamp()) - if db_comment.Comments != comment: - with db.begin(): - db_comment.Comments = comment - db_comment.Editor = request.user - db_comment.EditedTS = now - - db_notif = request.user.notifications.filter( - models.PackageNotification.PackageBaseID == pkgbase.ID - ).first() - if enable_notifications and not db_notif: - db.create(models.PackageNotification, - User=request.user, - PackageBase=pkgbase) - update_comment_render_fastapi(db_comment) - - if not next: - next = f"/pkgbase/{pkgbase.Name}" - - # Redirect to the pkgbase page anchored to the updated comment. - return RedirectResponse(f"{next}#comment-{db_comment.ID}", - status_code=HTTPStatus.SEE_OTHER) - - -@router.get("/pkgbase/{name}/comments/{id}/edit") -@auth_required() -async def pkgbase_comment_edit(request: Request, name: str, id: int, - next: str = Form(default=None)): - pkgbase = get_pkg_or_base(name, models.PackageBase) - comment = get_pkgbase_comment(pkgbase, id) - - if not next: - next = f"/pkgbase/{name}" - - context = await make_variable_context(request, "Edit comment", next=next) - context["comment"] = comment - return render_template(request, "packages/comments/edit.html", context) - - -@router.post("/pkgbase/{name}/comments/{id}/delete") -@auth_required() -async def pkgbase_comment_delete(request: Request, name: str, id: int, - next: str = Form(default=None)): - pkgbase = get_pkg_or_base(name, models.PackageBase) - comment = get_pkgbase_comment(pkgbase, id) - - authorized = request.user.has_credential(creds.COMMENT_DELETE, - [comment.User]) - if not authorized: - _ = l10n.get_translator_for_request(request) - raise HTTPException( - status_code=HTTPStatus.UNAUTHORIZED, - detail=_("You are not allowed to delete this comment.")) - - now = int(datetime.utcnow().timestamp()) - with db.begin(): - comment.Deleter = request.user - comment.DelTS = now - - if not next: - next = f"/pkgbase/{name}" - - return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER) - - -@router.post("/pkgbase/{name}/comments/{id}/undelete") -@auth_required() -async def pkgbase_comment_undelete(request: Request, name: str, id: int, - next: str = Form(default=None)): - pkgbase = get_pkg_or_base(name, models.PackageBase) - comment = get_pkgbase_comment(pkgbase, id) - - has_cred = request.user.has_credential(creds.COMMENT_UNDELETE, - approved=[comment.User]) - if not has_cred: - _ = l10n.get_translator_for_request(request) - raise HTTPException( - status_code=HTTPStatus.UNAUTHORIZED, - detail=_("You are not allowed to undelete this comment.")) - - with db.begin(): - comment.Deleter = None - comment.DelTS = None - - if not next: - next = f"/pkgbase/{name}" - - return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER) - - -@router.post("/pkgbase/{name}/comments/{id}/pin") -@auth_required() -async def pkgbase_comment_pin(request: Request, name: str, id: int, - next: str = Form(default=None)): - pkgbase = get_pkg_or_base(name, models.PackageBase) - comment = get_pkgbase_comment(pkgbase, id) - - has_cred = request.user.has_credential(creds.COMMENT_PIN, - approved=[pkgbase.Maintainer]) - if not has_cred: - _ = l10n.get_translator_for_request(request) - raise HTTPException( - status_code=HTTPStatus.UNAUTHORIZED, - detail=_("You are not allowed to pin this comment.")) - - now = int(datetime.utcnow().timestamp()) - with db.begin(): - comment.PinnedTS = now - - if not next: - next = f"/pkgbase/{name}" - - return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER) - - -@router.post("/pkgbase/{name}/comments/{id}/unpin") -@auth_required() -async def pkgbase_comment_unpin(request: Request, name: str, id: int, - next: str = Form(default=None)): - pkgbase = get_pkg_or_base(name, models.PackageBase) - comment = get_pkgbase_comment(pkgbase, id) - - has_cred = request.user.has_credential(creds.COMMENT_PIN, - approved=[pkgbase.Maintainer]) - if not has_cred: - _ = l10n.get_translator_for_request(request) - raise HTTPException( - status_code=HTTPStatus.UNAUTHORIZED, - detail=_("You are not allowed to unpin this comment.")) - - with db.begin(): - comment.PinnedTS = 0 - - if not next: - next = f"/pkgbase/{name}" - - return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER) - - @router.get("/pkgbase/{name}/comaintainers") @auth_required() async def package_base_comaintainers(request: Request, name: str) -> Response: @@ -654,7 +357,8 @@ async def pkgbase_request_post(request: Request, name: str, logger.debug(f"New request #{pkgreq.ID} is marked for auto-orphan.") elif type == "deletion" and is_maintainer and outdated: # This request should be auto-accepted. - notifs = pkgbase_delete_instance(request, pkgbase, comments=comments) + notifs = pkgbase_actions.pkgbase_delete_instance( + request, pkgbase, comments=comments) util.apply_all(notifs, lambda n: n.send()) logger.debug(f"New request #{pkgreq.ID} is marked for auto-deletion.") @@ -704,339 +408,6 @@ async def requests_close_post(request: Request, id: int, return RedirectResponse("/requests", status_code=HTTPStatus.SEE_OTHER) -@router.post("/pkgbase/{name}/keywords") -async def pkgbase_keywords(request: Request, name: str, - keywords: str = Form(default=str())): - pkgbase = get_pkg_or_base(name, models.PackageBase) - keywords = set(keywords.split(" ")) - - # Delete all keywords which are not supplied by the user. - other_keywords = pkgbase.keywords.filter( - ~models.PackageKeyword.Keyword.in_(keywords)) - other_keyword_strings = [kwd.Keyword for kwd in other_keywords] - - existing_keywords = set( - kwd.Keyword for kwd in - pkgbase.keywords.filter( - ~models.PackageKeyword.Keyword.in_(other_keyword_strings)) - ) - with db.begin(): - db.delete_all(other_keywords) - for keyword in keywords.difference(existing_keywords): - db.create(models.PackageKeyword, - PackageBase=pkgbase, - Keyword=keyword) - - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - -@router.get("/pkgbase/{name}/flag") -@auth_required() -async def pkgbase_flag_get(request: Request, name: str): - pkgbase = get_pkg_or_base(name, models.PackageBase) - - has_cred = request.user.has_credential(creds.PKGBASE_FLAG) - if not has_cred or pkgbase.Flagger is not None: - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - context = make_context(request, "Flag Package Out-Of-Date") - context["pkgbase"] = pkgbase - return render_template(request, "packages/flag.html", context) - - -@router.post("/pkgbase/{name}/flag") -@auth_required() -async def pkgbase_flag_post(request: Request, name: str, - comments: str = Form(default=str())): - pkgbase = get_pkg_or_base(name, models.PackageBase) - - if not comments: - context = make_context(request, "Flag Package Out-Of-Date") - context["pkgbase"] = pkgbase - context["errors"] = ["The selected packages have not been flagged, " - "please enter a comment."] - return render_template(request, "packages/flag.html", context, - status_code=HTTPStatus.BAD_REQUEST) - - has_cred = request.user.has_credential(creds.PKGBASE_FLAG) - if has_cred and not pkgbase.Flagger: - now = int(datetime.utcnow().timestamp()) - with db.begin(): - pkgbase.OutOfDateTS = now - pkgbase.Flagger = request.user - pkgbase.FlaggerComment = comments - - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - -@router.get("/pkgbase/{name}/flag-comment") -async def pkgbase_flag_comment(request: Request, name: str): - pkgbase = get_pkg_or_base(name, models.PackageBase) - - if pkgbase.Flagger is None: - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - context = make_context(request, "Flag Comment") - context["pkgbase"] = pkgbase - return render_template(request, "packages/flag-comment.html", context) - - -def pkgbase_unflag_instance(request: Request, pkgbase: models.PackageBase): - has_cred = request.user.has_credential( - creds.PKGBASE_UNFLAG, approved=[pkgbase.Flagger, pkgbase.Maintainer]) - if has_cred: - with db.begin(): - pkgbase.OutOfDateTS = None - pkgbase.Flagger = None - pkgbase.FlaggerComment = str() - - -@router.post("/pkgbase/{name}/unflag") -@auth_required() -async def pkgbase_unflag(request: Request, name: str): - pkgbase = get_pkg_or_base(name, models.PackageBase) - pkgbase_unflag_instance(request, pkgbase) - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - -def pkgbase_notify_instance(request: Request, pkgbase: models.PackageBase): - notif = db.query(pkgbase.notifications.filter( - models.PackageNotification.UserID == request.user.ID - ).exists()).scalar() - has_cred = request.user.has_credential(creds.PKGBASE_NOTIFY) - if has_cred and not notif: - with db.begin(): - db.create(models.PackageNotification, - PackageBase=pkgbase, - User=request.user) - - -@router.post("/pkgbase/{name}/notify") -@auth_required() -async def pkgbase_notify(request: Request, name: str): - pkgbase = get_pkg_or_base(name, models.PackageBase) - pkgbase_notify_instance(request, pkgbase) - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - -def pkgbase_unnotify_instance(request: Request, pkgbase: models.PackageBase): - notif = pkgbase.notifications.filter( - models.PackageNotification.UserID == request.user.ID - ).first() - has_cred = request.user.has_credential(creds.PKGBASE_NOTIFY) - if has_cred and notif: - with db.begin(): - db.delete(notif) - - -@router.post("/pkgbase/{name}/unnotify") -@auth_required() -async def pkgbase_unnotify(request: Request, name: str): - pkgbase = get_pkg_or_base(name, models.PackageBase) - pkgbase_unnotify_instance(request, pkgbase) - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - -@router.post("/pkgbase/{name}/vote") -@auth_required() -async def pkgbase_vote(request: Request, name: str): - pkgbase = get_pkg_or_base(name, models.PackageBase) - - vote = pkgbase.package_votes.filter( - models.PackageVote.UsersID == request.user.ID - ).first() - has_cred = request.user.has_credential(creds.PKGBASE_VOTE) - if has_cred and not vote: - now = int(datetime.utcnow().timestamp()) - with db.begin(): - db.create(models.PackageVote, - User=request.user, - PackageBase=pkgbase, - VoteTS=now) - - # Update NumVotes/Popularity. - popupdate.run_single(pkgbase) - - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - -@router.post("/pkgbase/{name}/unvote") -@auth_required() -async def pkgbase_unvote(request: Request, name: str): - pkgbase = get_pkg_or_base(name, models.PackageBase) - - vote = pkgbase.package_votes.filter( - models.PackageVote.UsersID == request.user.ID - ).first() - has_cred = request.user.has_credential(creds.PKGBASE_VOTE) - if has_cred and vote: - with db.begin(): - db.delete(vote) - - # Update NumVotes/Popularity. - popupdate.run_single(pkgbase) - - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - -def pkgbase_disown_instance(request: Request, pkgbase: models.PackageBase): - disowner = request.user - notifs = [notify.DisownNotification(disowner.ID, pkgbase.ID)] - - is_maint = disowner == pkgbase.Maintainer - if is_maint: - with db.begin(): - # Comaintainer with the lowest Priority value; next-in-line. - prio_comaint = pkgbase.comaintainers.order_by( - models.PackageComaintainer.Priority.asc() - ).first() - if prio_comaint: - # If there is such a comaintainer, promote them to maint. - pkgbase.Maintainer = prio_comaint.User - notifs.append(pkgutil.remove_comaintainer(prio_comaint)) - else: - # Otherwise, just orphan the package completely. - pkgbase.Maintainer = None - elif request.user.has_credential(creds.PKGBASE_DISOWN): - # Otherwise, the request user performing this disownage is a - # Trusted User and we treat it like a standard orphan request. - notifs += handle_request(request, ORPHAN_ID, pkgbase) - with db.begin(): - pkgbase.Maintainer = None - - util.apply_all(notifs, lambda n: n.send()) - - -@router.get("/pkgbase/{name}/disown") -@auth_required() -async def pkgbase_disown_get(request: Request, name: str): - pkgbase = get_pkg_or_base(name, models.PackageBase) - - has_cred = request.user.has_credential(creds.PKGBASE_DISOWN, - approved=[pkgbase.Maintainer]) - if not has_cred: - return RedirectResponse(f"/pkgbase/{name}", - HTTPStatus.SEE_OTHER) - - context = make_context(request, "Disown Package") - context["pkgbase"] = pkgbase - return render_template(request, "packages/disown.html", context) - - -@router.post("/pkgbase/{name}/disown") -@auth_required() -async def pkgbase_disown_post(request: Request, name: str, - comments: str = Form(default=str()), - confirm: bool = Form(default=False)): - pkgbase = get_pkg_or_base(name, models.PackageBase) - - has_cred = request.user.has_credential(creds.PKGBASE_DISOWN, - approved=[pkgbase.Maintainer]) - if not has_cred: - return RedirectResponse(f"/pkgbase/{name}", - HTTPStatus.SEE_OTHER) - - context = make_context(request, "Disown Package") - context["pkgbase"] = pkgbase - if not confirm: - context["errors"] = [("The selected packages have not been disowned, " - "check the confirmation checkbox.")] - return render_template(request, "packages/disown.html", context, - status_code=HTTPStatus.BAD_REQUEST) - - with db.begin(): - update_closure_comment(pkgbase, ORPHAN_ID, comments) - - try: - pkgbase_disown_instance(request, pkgbase) - except InvariantError as exc: - context["errors"] = [str(exc)] - return render_template(request, "packages/disown.html", context, - status_code=HTTPStatus.BAD_REQUEST) - - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - -def pkgbase_adopt_instance(request: Request, pkgbase: models.PackageBase): - with db.begin(): - pkgbase.Maintainer = request.user - - notif = notify.AdoptNotification(request.user.ID, pkgbase.ID) - notif.send() - - -@router.post("/pkgbase/{name}/adopt") -@auth_required() -async def pkgbase_adopt_post(request: Request, name: str): - pkgbase = get_pkg_or_base(name, models.PackageBase) - - has_cred = request.user.has_credential(creds.PKGBASE_ADOPT) - if has_cred or not pkgbase.Maintainer: - # If the user has credentials, they'll adopt the package regardless - # of maintainership. Otherwise, we'll promote the user to maintainer - # if no maintainer currently exists. - pkgbase_adopt_instance(request, pkgbase) - - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - -@router.get("/pkgbase/{name}/delete") -@auth_required() -async def pkgbase_delete_get(request: Request, name: str): - if not request.user.has_credential(creds.PKGBASE_DELETE): - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - context = make_context(request, "Package Deletion") - context["pkgbase"] = get_pkg_or_base(name, models.PackageBase) - return render_template(request, "packages/delete.html", context) - - -@router.post("/pkgbase/{name}/delete") -@auth_required() -async def pkgbase_delete_post(request: Request, name: str, - confirm: bool = Form(default=False), - comments: str = Form(default=str())): - pkgbase = get_pkg_or_base(name, models.PackageBase) - - if not request.user.has_credential(creds.PKGBASE_DELETE): - return RedirectResponse(f"/pkgbase/{name}", - status_code=HTTPStatus.SEE_OTHER) - - if not confirm: - context = make_context(request, "Package Deletion") - context["pkgbase"] = pkgbase - context["errors"] = [("The selected packages have not been deleted, " - "check the confirmation checkbox.")] - return render_template(request, "packages/delete.html", context, - status_code=HTTPStatus.BAD_REQUEST) - - if comments: - # Update any existing deletion requests' ClosureComment. - with db.begin(): - requests = pkgbase.requests.filter( - and_(models.PackageRequest.Status == PENDING_ID, - models.PackageRequest.ReqTypeID == DELETION_ID) - ) - for pkgreq in requests: - pkgreq.ClosureComment = comments - - notifs = pkgbase_delete_instance(request, pkgbase, comments=comments) - util.apply_all(notifs, lambda n: n.send()) - return RedirectResponse("/packages", status_code=HTTPStatus.SEE_OTHER) - - async def packages_unflag(request: Request, package_ids: List[int] = [], **kwargs): if not package_ids: @@ -1059,7 +430,7 @@ async def packages_unflag(request: Request, package_ids: List[int] = [], bases.update({pkg.PackageBase}) for pkgbase in bases: - pkgbase_unflag_instance(request, pkgbase) + pkgbase_actions.pkgbase_unflag_instance(request, pkgbase) return (True, ["The selected packages have been unflagged."]) @@ -1096,7 +467,7 @@ async def packages_notify(request: Request, package_ids: List[int] = [], # If we get here, user input is good. for pkgbase in bases: - pkgbase_notify_instance(request, pkgbase) + pkgbase_actions.pkgbase_notify_instance(request, pkgbase) # TODO: This message does not yet have a translation. return (True, ["The selected packages' notifications have been enabled."]) @@ -1133,7 +504,7 @@ async def packages_unnotify(request: Request, package_ids: List[int] = [], return error_tuple for pkgbase in bases: - pkgbase_unnotify_instance(request, pkgbase) + pkgbase_actions.pkgbase_unnotify_instance(request, pkgbase) # TODO: This message does not yet have a translation. return (True, ["The selected packages' notifications have been removed."]) @@ -1167,7 +538,7 @@ async def packages_adopt(request: Request, package_ids: List[int] = [], # Now, really adopt the bases. for pkgbase in bases: - pkgbase_adopt_instance(request, pkgbase) + pkgbase_actions.pkgbase_adopt_instance(request, pkgbase) return (True, ["The selected packages have been adopted."]) @@ -1177,7 +548,7 @@ def disown_all(request: Request, pkgbases: List[models.PackageBase]) \ errors = [] for pkgbase in pkgbases: try: - pkgbase_disown_instance(request, pkgbase) + pkgbase_actions.pkgbase_disown_instance(request, pkgbase) except InvariantError as exc: errors.append(str(exc)) return errors @@ -1246,7 +617,7 @@ async def packages_delete(request: Request, package_ids: List[int] = [], deleted_bases, notifs = [], [] for pkgbase in bases: deleted_bases.append(pkgbase.Name) - notifs += pkgbase_delete_instance(request, pkgbase) + notifs += pkgbase_actions.pkgbase_delete_instance(request, pkgbase) # Log out the fact that this happened for accountability. logger.info(f"Privileged user '{request.user.Username}' deleted the " @@ -1298,132 +669,3 @@ async def packages_post(request: Request, context["success"] = messages return await packages_get(request, context) - - -@router.get("/pkgbase/{name}/merge") -@auth_required() -async def pkgbase_merge_get(request: Request, name: str, - into: str = Query(default=str()), - next: str = Query(default=str())): - pkgbase = get_pkg_or_base(name, models.PackageBase) - - if not next: - next = f"/pkgbase/{pkgbase.Name}" - - context = make_context(request, "Package Merging") - context.update({ - "pkgbase": pkgbase, - "into": into, - "next": next - }) - - status_code = HTTPStatus.OK - # TODO: Lookup errors from credential instead of hardcoding them. - # Idea: Something like credential_errors(creds.PKGBASE_MERGE). - # Perhaps additionally: bad_credential_status_code(creds.PKGBASE_MERGE). - # Don't take these examples verbatim. We should find good naming. - if not request.user.has_credential(creds.PKGBASE_MERGE): - context["errors"] = [ - "Only Trusted Users and Developers can merge packages."] - status_code = HTTPStatus.UNAUTHORIZED - - return render_template(request, "pkgbase/merge.html", context, - status_code=status_code) - - -def pkgbase_merge_instance(request: Request, pkgbase: models.PackageBase, - target: models.PackageBase, comments: str = str()): - pkgbasename = str(pkgbase.Name) - - # Create notifications. - notifs = handle_request(request, MERGE_ID, pkgbase, target) - - # Target votes and notifications sets of user IDs that are - # looking to be migrated. - target_votes = set(v.UsersID for v in target.package_votes) - target_notifs = set(n.UserID for n in target.notifications) - - with db.begin(): - # Merge pkgbase's comments. - for comment in pkgbase.comments: - comment.PackageBase = target - - # Merge notifications that don't yet exist in the target. - for notif in pkgbase.notifications: - if notif.UserID not in target_notifs: - notif.PackageBase = target - - # Merge votes that don't yet exist in the target. - for vote in pkgbase.package_votes: - if vote.UsersID not in target_votes: - vote.PackageBase = target - - # Run popupdate. - popupdate.run_single(target) - - with db.begin(): - # Delete pkgbase and its packages now that everything's merged. - for pkg in pkgbase.packages: - db.delete(pkg) - db.delete(pkgbase) - - # Log this out for accountability purposes. - logger.info(f"Trusted User '{request.user.Username}' merged " - f"'{pkgbasename}' into '{target.Name}'.") - - # Send notifications. - util.apply_all(notifs, lambda n: n.send()) - - -@router.post("/pkgbase/{name}/merge") -@auth_required() -async def pkgbase_merge_post(request: Request, name: str, - into: str = Form(default=str()), - comments: str = Form(default=str()), - confirm: bool = Form(default=False), - next: str = Form(default=str())): - - pkgbase = get_pkg_or_base(name, models.PackageBase) - context = await make_variable_context(request, "Package Merging") - context["pkgbase"] = pkgbase - - # TODO: Lookup errors from credential instead of hardcoding them. - if not request.user.has_credential(creds.PKGBASE_MERGE): - context["errors"] = [ - "Only Trusted Users and Developers can merge packages."] - return render_template(request, "pkgbase/merge.html", context, - status_code=HTTPStatus.UNAUTHORIZED) - - if not confirm: - context["errors"] = ["The selected packages have not been deleted, " - "check the confirmation checkbox."] - return render_template(request, "pkgbase/merge.html", context, - status_code=HTTPStatus.BAD_REQUEST) - - try: - target = get_pkg_or_base(into, models.PackageBase) - except HTTPException: - context["errors"] = [ - "Cannot find package to merge votes and comments into."] - return render_template(request, "pkgbase/merge.html", context, - status_code=HTTPStatus.BAD_REQUEST) - - if pkgbase == target: - context["errors"] = ["Cannot merge a package base with itself."] - return render_template(request, "pkgbase/merge.html", context, - status_code=HTTPStatus.BAD_REQUEST) - - with db.begin(): - update_closure_comment(pkgbase, MERGE_ID, comments, target=target) - - # Merge pkgbase into target. - pkgbase_merge_instance(request, pkgbase, target, comments=comments) - - # Run popupdate on the target. - popupdate.run_single(target) - - if not next: - next = f"/pkgbase/{target.Name}" - - # Redirect to the newly merged into package. - return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER) diff --git a/aurweb/routers/pkgbase.py b/aurweb/routers/pkgbase.py new file mode 100644 index 00000000..a6c02118 --- /dev/null +++ b/aurweb/routers/pkgbase.py @@ -0,0 +1,701 @@ +from datetime import datetime +from http import HTTPStatus + +from fastapi import APIRouter, Form, HTTPException, Query, Request, Response +from fastapi.responses import JSONResponse, RedirectResponse +from sqlalchemy import and_ + +from aurweb import db, l10n, templates, util +from aurweb.auth import auth_required, creds +from aurweb.exceptions import InvariantError +from aurweb.models import PackageBase +from aurweb.models.package_comment import PackageComment +from aurweb.models.package_keyword import PackageKeyword +from aurweb.models.package_notification import PackageNotification +from aurweb.models.package_request import PENDING_ID, PackageRequest +from aurweb.models.package_vote import PackageVote +from aurweb.models.request_type import DELETION_ID, MERGE_ID, ORPHAN_ID +from aurweb.packages.requests import update_closure_comment +from aurweb.packages.util import get_pkg_or_base, get_pkgbase_comment +from aurweb.pkgbase import actions +from aurweb.pkgbase import util as pkgbaseutil +from aurweb.scripts import popupdate +from aurweb.scripts.rendercomment import update_comment_render_fastapi +from aurweb.templates import make_variable_context, render_template + +router = APIRouter() + + +@router.get("/pkgbase/{name}") +async def pkgbase(request: Request, name: str) -> Response: + """ + Single package base view. + + :param request: FastAPI Request + :param name: PackageBase.Name + :return: HTMLResponse + """ + # Get the PackageBase. + pkgbase = get_pkg_or_base(name, PackageBase) + + # If this is not a split package, redirect to /packages/{name}. + if pkgbase.packages.count() == 1: + return RedirectResponse(f"/packages/{name}", + status_code=int(HTTPStatus.SEE_OTHER)) + + # Add our base information. + context = pkgbaseutil.make_context(request, pkgbase) + context["packages"] = pkgbase.packages.all() + + return render_template(request, "pkgbase.html", context) + + +@router.get("/pkgbase/{name}/voters") +async def pkgbase_voters(request: Request, name: str) -> Response: + """ + View of package base voters. + + Requires `request.user` has creds.PKGBASE_LIST_VOTERS credential. + + :param request: FastAPI Request + :param name: PackageBase.Name + :return: HTMLResponse + """ + # Get the PackageBase. + pkgbase = get_pkg_or_base(name, PackageBase) + + if not request.user.has_credential(creds.PKGBASE_LIST_VOTERS): + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + context = templates.make_context(request, "Voters") + context["pkgbase"] = pkgbase + return render_template(request, "pkgbase/voters.html", context) + + +@router.get("/pkgbase/{name}/flag-comment") +async def pkgbase_flag_comment(request: Request, name: str): + pkgbase = get_pkg_or_base(name, PackageBase) + + if pkgbase.Flagger is None: + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + context = templates.make_context(request, "Flag Comment") + context["pkgbase"] = pkgbase + return render_template(request, "packages/flag-comment.html", context) + + +@router.post("/pkgbase/{name}/keywords") +async def pkgbase_keywords(request: Request, name: str, + keywords: str = Form(default=str())): + pkgbase = get_pkg_or_base(name, PackageBase) + keywords = set(keywords.split(" ")) + + # Delete all keywords which are not supplied by the user. + other_keywords = pkgbase.keywords.filter( + ~PackageKeyword.Keyword.in_(keywords)) + other_keyword_strings = [kwd.Keyword for kwd in other_keywords] + + existing_keywords = set( + kwd.Keyword for kwd in + pkgbase.keywords.filter( + ~PackageKeyword.Keyword.in_(other_keyword_strings)) + ) + with db.begin(): + db.delete_all(other_keywords) + for keyword in keywords.difference(existing_keywords): + db.create(PackageKeyword, + PackageBase=pkgbase, + Keyword=keyword) + + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + +@router.get("/pkgbase/{name}/flag") +@auth_required() +async def pkgbase_flag_get(request: Request, name: str): + pkgbase = get_pkg_or_base(name, PackageBase) + + has_cred = request.user.has_credential(creds.PKGBASE_FLAG) + if not has_cred or pkgbase.Flagger is not None: + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + context = templates.make_context(request, "Flag Package Out-Of-Date") + context["pkgbase"] = pkgbase + return render_template(request, "packages/flag.html", context) + + +@router.post("/pkgbase/{name}/flag") +@auth_required() +async def pkgbase_flag_post(request: Request, name: str, + comments: str = Form(default=str())): + pkgbase = get_pkg_or_base(name, PackageBase) + + if not comments: + context = templates.make_context(request, "Flag Package Out-Of-Date") + context["pkgbase"] = pkgbase + context["errors"] = ["The selected packages have not been flagged, " + "please enter a comment."] + return render_template(request, "packages/flag.html", context, + status_code=HTTPStatus.BAD_REQUEST) + + has_cred = request.user.has_credential(creds.PKGBASE_FLAG) + if has_cred and not pkgbase.Flagger: + now = int(datetime.utcnow().timestamp()) + with db.begin(): + pkgbase.OutOfDateTS = now + pkgbase.Flagger = request.user + pkgbase.FlaggerComment = comments + + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + +@router.post("/pkgbase/{name}/comments") +@auth_required() +async def pkgbase_comments_post( + request: Request, name: str, + comment: str = Form(default=str()), + enable_notifications: bool = Form(default=False)): + """ Add a new comment via POST request. """ + pkgbase = get_pkg_or_base(name, PackageBase) + + if not comment: + raise HTTPException(status_code=HTTPStatus.BAD_REQUEST) + + # If the provided comment is different than the record's version, + # update the db record. + now = int(datetime.utcnow().timestamp()) + with db.begin(): + comment = db.create(PackageComment, User=request.user, + PackageBase=pkgbase, + Comments=comment, RenderedComment=str(), + CommentTS=now) + + if enable_notifications and not request.user.notified(pkgbase): + db.create(PackageNotification, + User=request.user, + PackageBase=pkgbase) + update_comment_render_fastapi(comment) + + # Redirect to the pkgbase page. + return RedirectResponse(f"/pkgbase/{pkgbase.Name}#comment-{comment.ID}", + status_code=HTTPStatus.SEE_OTHER) + + +@router.get("/pkgbase/{name}/comments/{id}/form") +@auth_required() +async def pkgbase_comment_form(request: Request, name: str, id: int, + next: str = Query(default=None)): + """ + Produce a comment form for comment {id}. + + This route is used as a partial HTML endpoint when editing + package comments via Javascript. This endpoint used to be + part of the RPC as type=get-comment-form and has been + relocated here because the form returned cannot be used + externally and requires a POST request by the user. + + :param request: FastAPI Request + :param name: PackageBase.Name + :param id: PackageComment.ID + :param next: Optional `next` value used for the comment form + :return: JSONResponse + """ + pkgbase = get_pkg_or_base(name, PackageBase) + comment = pkgbase.comments.filter(PackageComment.ID == id).first() + if not comment: + return JSONResponse({}, status_code=HTTPStatus.NOT_FOUND) + + if not request.user.is_elevated() and request.user != comment.User: + return JSONResponse({}, status_code=HTTPStatus.UNAUTHORIZED) + + context = pkgbaseutil.make_context(request, pkgbase) + context["comment"] = comment + + if not next: + next = f"/pkgbase/{name}" + + context["next"] = next + + form = templates.render_raw_template( + request, "partials/packages/comment_form.html", context) + return JSONResponse({"form": form}) + + +@router.get("/pkgbase/{name}/comments/{id}/edit") +@auth_required() +async def pkgbase_comment_edit(request: Request, name: str, id: int, + next: str = Form(default=None)): + """ + Render the non-javascript edit form. + + :param request: FastAPI Request + :param name: PackageBase.Name + :param id: PackageComment.ID + :param next: Optional `next` parameter used in the POST request + :return: HTMLResponse + """ + pkgbase = get_pkg_or_base(name, PackageBase) + comment = get_pkgbase_comment(pkgbase, id) + + if not next: + next = f"/pkgbase/{name}" + + context = await make_variable_context(request, "Edit comment", next=next) + context["comment"] = comment + return render_template(request, "packages/comments/edit.html", context) + + +@router.post("/pkgbase/{name}/comments/{id}") +@auth_required() +async def pkgbase_comment_post( + request: Request, name: str, id: int, + comment: str = Form(default=str()), + enable_notifications: bool = Form(default=False), + next: str = Form(default=None)): + """ Edit an existing comment. """ + pkgbase = get_pkg_or_base(name, PackageBase) + db_comment = get_pkgbase_comment(pkgbase, id) + + if not comment: + raise HTTPException(status_code=HTTPStatus.BAD_REQUEST) + + # If the provided comment is different than the record's version, + # update the db record. + now = int(datetime.utcnow().timestamp()) + if db_comment.Comments != comment: + with db.begin(): + db_comment.Comments = comment + db_comment.Editor = request.user + db_comment.EditedTS = now + + db_notif = request.user.notifications.filter( + PackageNotification.PackageBaseID == pkgbase.ID + ).first() + if enable_notifications and not db_notif: + db.create(PackageNotification, + User=request.user, + PackageBase=pkgbase) + update_comment_render_fastapi(db_comment) + + if not next: + next = f"/pkgbase/{pkgbase.Name}" + + # Redirect to the pkgbase page anchored to the updated comment. + return RedirectResponse(f"{next}#comment-{db_comment.ID}", + status_code=HTTPStatus.SEE_OTHER) + + +@router.post("/pkgbase/{name}/comments/{id}/pin") +@auth_required() +async def pkgbase_comment_pin(request: Request, name: str, id: int, + next: str = Form(default=None)): + """ + Pin a comment. + + :param request: FastAPI Request + :param name: PackageBase.Name + :param id: PackageComment.ID + :param next: Optional `next` parameter used in the POST request + :return: RedirectResponse to `next` + """ + pkgbase = get_pkg_or_base(name, PackageBase) + comment = get_pkgbase_comment(pkgbase, id) + + has_cred = request.user.has_credential(creds.COMMENT_PIN, + approved=[pkgbase.Maintainer]) + if not has_cred: + _ = l10n.get_translator_for_request(request) + raise HTTPException( + status_code=HTTPStatus.UNAUTHORIZED, + detail=_("You are not allowed to pin this comment.")) + + now = int(datetime.utcnow().timestamp()) + with db.begin(): + comment.PinnedTS = now + + if not next: + next = f"/pkgbase/{name}" + + return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER) + + +@router.post("/pkgbase/{name}/comments/{id}/unpin") +@auth_required() +async def pkgbase_comment_unpin(request: Request, name: str, id: int, + next: str = Form(default=None)): + """ + Unpin a comment. + + :param request: FastAPI Request + :param name: PackageBase.Name + :param id: PackageComment.ID + :param next: Optional `next` parameter used in the POST request + :return: RedirectResponse to `next` + """ + pkgbase = get_pkg_or_base(name, PackageBase) + comment = get_pkgbase_comment(pkgbase, id) + + has_cred = request.user.has_credential(creds.COMMENT_PIN, + approved=[pkgbase.Maintainer]) + if not has_cred: + _ = l10n.get_translator_for_request(request) + raise HTTPException( + status_code=HTTPStatus.UNAUTHORIZED, + detail=_("You are not allowed to unpin this comment.")) + + with db.begin(): + comment.PinnedTS = 0 + + if not next: + next = f"/pkgbase/{name}" + + return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER) + + +@router.post("/pkgbase/{name}/comments/{id}/delete") +@auth_required() +async def pkgbase_comment_delete(request: Request, name: str, id: int, + next: str = Form(default=None)): + """ + Delete a comment. + + This action does **not** delete the comment from the database, but + sets PackageBase.DelTS and PackageBase.DeleterUID, which is used to + decide who gets to view the comment and what utilities it gets. + + :param request: FastAPI Request + :param name: PackageBase.Name + :param id: PackageComment.ID + :param next: Optional `next` parameter used in the POST request + :return: RedirectResposne to `next` + """ + pkgbase = get_pkg_or_base(name, PackageBase) + comment = get_pkgbase_comment(pkgbase, id) + + authorized = request.user.has_credential(creds.COMMENT_DELETE, + [comment.User]) + if not authorized: + _ = l10n.get_translator_for_request(request) + raise HTTPException( + status_code=HTTPStatus.UNAUTHORIZED, + detail=_("You are not allowed to delete this comment.")) + + now = int(datetime.utcnow().timestamp()) + with db.begin(): + comment.Deleter = request.user + comment.DelTS = now + + if not next: + next = f"/pkgbase/{name}" + + return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER) + + +@router.post("/pkgbase/{name}/comments/{id}/undelete") +@auth_required() +async def pkgbase_comment_undelete(request: Request, name: str, id: int, + next: str = Form(default=None)): + """ + Undelete a comment. + + This action does **not** undelete any comment from the database, but + unsets PackageBase.DelTS and PackageBase.DeleterUID which restores + the comment to a standard state. + + :param request: FastAPI Request + :param name: PackageBase.Name + :param id: PackageComment.ID + :param next: Optional `next` parameter used in the POST request + :return: RedirectResponse to `next` + """ + pkgbase = get_pkg_or_base(name, PackageBase) + comment = get_pkgbase_comment(pkgbase, id) + + has_cred = request.user.has_credential(creds.COMMENT_UNDELETE, + approved=[comment.User]) + if not has_cred: + _ = l10n.get_translator_for_request(request) + raise HTTPException( + status_code=HTTPStatus.UNAUTHORIZED, + detail=_("You are not allowed to undelete this comment.")) + + with db.begin(): + comment.Deleter = None + comment.DelTS = None + + if not next: + next = f"/pkgbase/{name}" + + return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER) + + +@router.post("/pkgbase/{name}/vote") +@auth_required() +async def pkgbase_vote(request: Request, name: str): + pkgbase = get_pkg_or_base(name, PackageBase) + + vote = pkgbase.package_votes.filter( + PackageVote.UsersID == request.user.ID + ).first() + has_cred = request.user.has_credential(creds.PKGBASE_VOTE) + if has_cred and not vote: + now = int(datetime.utcnow().timestamp()) + with db.begin(): + db.create(PackageVote, + User=request.user, + PackageBase=pkgbase, + VoteTS=now) + + # Update NumVotes/Popularity. + popupdate.run_single(pkgbase) + + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + +@router.post("/pkgbase/{name}/unvote") +@auth_required() +async def pkgbase_unvote(request: Request, name: str): + pkgbase = get_pkg_or_base(name, PackageBase) + + vote = pkgbase.package_votes.filter( + PackageVote.UsersID == request.user.ID + ).first() + has_cred = request.user.has_credential(creds.PKGBASE_VOTE) + if has_cred and vote: + with db.begin(): + db.delete(vote) + + # Update NumVotes/Popularity. + popupdate.run_single(pkgbase) + + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + +@router.post("/pkgbase/{name}/notify") +@auth_required() +async def pkgbase_notify(request: Request, name: str): + pkgbase = get_pkg_or_base(name, PackageBase) + actions.pkgbase_notify_instance(request, pkgbase) + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + +@router.post("/pkgbase/{name}/unnotify") +@auth_required() +async def pkgbase_unnotify(request: Request, name: str): + pkgbase = get_pkg_or_base(name, PackageBase) + actions.pkgbase_unnotify_instance(request, pkgbase) + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + +@router.post("/pkgbase/{name}/unflag") +@auth_required() +async def pkgbase_unflag(request: Request, name: str): + pkgbase = get_pkg_or_base(name, PackageBase) + actions.pkgbase_unflag_instance(request, pkgbase) + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + +@router.get("/pkgbase/{name}/disown") +@auth_required() +async def pkgbase_disown_get(request: Request, name: str): + pkgbase = get_pkg_or_base(name, PackageBase) + + has_cred = request.user.has_credential(creds.PKGBASE_DISOWN, + approved=[pkgbase.Maintainer]) + if not has_cred: + return RedirectResponse(f"/pkgbase/{name}", + HTTPStatus.SEE_OTHER) + + context = templates.make_context(request, "Disown Package") + context["pkgbase"] = pkgbase + return render_template(request, "packages/disown.html", context) + + +@router.post("/pkgbase/{name}/disown") +@auth_required() +async def pkgbase_disown_post(request: Request, name: str, + comments: str = Form(default=str()), + confirm: bool = Form(default=False)): + pkgbase = get_pkg_or_base(name, PackageBase) + + has_cred = request.user.has_credential(creds.PKGBASE_DISOWN, + approved=[pkgbase.Maintainer]) + if not has_cred: + return RedirectResponse(f"/pkgbase/{name}", + HTTPStatus.SEE_OTHER) + + context = templates.make_context(request, "Disown Package") + context["pkgbase"] = pkgbase + if not confirm: + context["errors"] = [("The selected packages have not been disowned, " + "check the confirmation checkbox.")] + return render_template(request, "packages/disown.html", context, + status_code=HTTPStatus.BAD_REQUEST) + + with db.begin(): + update_closure_comment(pkgbase, ORPHAN_ID, comments) + + try: + actions.pkgbase_disown_instance(request, pkgbase) + except InvariantError as exc: + context["errors"] = [str(exc)] + return render_template(request, "packages/disown.html", context, + status_code=HTTPStatus.BAD_REQUEST) + + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + +@router.post("/pkgbase/{name}/adopt") +@auth_required() +async def pkgbase_adopt_post(request: Request, name: str): + pkgbase = get_pkg_or_base(name, PackageBase) + + has_cred = request.user.has_credential(creds.PKGBASE_ADOPT) + if has_cred or not pkgbase.Maintainer: + # If the user has credentials, they'll adopt the package regardless + # of maintainership. Otherwise, we'll promote the user to maintainer + # if no maintainer currently exists. + actions.pkgbase_adopt_instance(request, pkgbase) + + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + +@router.get("/pkgbase/{name}/delete") +@auth_required() +async def pkgbase_delete_get(request: Request, name: str): + if not request.user.has_credential(creds.PKGBASE_DELETE): + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + context = templates.make_context(request, "Package Deletion") + context["pkgbase"] = get_pkg_or_base(name, PackageBase) + return render_template(request, "packages/delete.html", context) + + +@router.post("/pkgbase/{name}/delete") +@auth_required() +async def pkgbase_delete_post(request: Request, name: str, + confirm: bool = Form(default=False), + comments: str = Form(default=str())): + pkgbase = get_pkg_or_base(name, PackageBase) + + if not request.user.has_credential(creds.PKGBASE_DELETE): + return RedirectResponse(f"/pkgbase/{name}", + status_code=HTTPStatus.SEE_OTHER) + + if not confirm: + context = templates.make_context(request, "Package Deletion") + context["pkgbase"] = pkgbase + context["errors"] = [("The selected packages have not been deleted, " + "check the confirmation checkbox.")] + return render_template(request, "packages/delete.html", context, + status_code=HTTPStatus.BAD_REQUEST) + + if comments: + # Update any existing deletion requests' ClosureComment. + with db.begin(): + requests = pkgbase.requests.filter( + and_(PackageRequest.Status == PENDING_ID, + PackageRequest.ReqTypeID == DELETION_ID) + ) + for pkgreq in requests: + pkgreq.ClosureComment = comments + + notifs = actions.pkgbase_delete_instance( + request, pkgbase, comments=comments) + util.apply_all(notifs, lambda n: n.send()) + return RedirectResponse("/packages", status_code=HTTPStatus.SEE_OTHER) + + +@router.get("/pkgbase/{name}/merge") +@auth_required() +async def pkgbase_merge_get(request: Request, name: str, + into: str = Query(default=str()), + next: str = Query(default=str())): + pkgbase = get_pkg_or_base(name, PackageBase) + + if not next: + next = f"/pkgbase/{pkgbase.Name}" + + context = templates.make_context(request, "Package Merging") + context.update({ + "pkgbase": pkgbase, + "into": into, + "next": next + }) + + status_code = HTTPStatus.OK + # TODO: Lookup errors from credential instead of hardcoding them. + # Idea: Something like credential_errors(creds.PKGBASE_MERGE). + # Perhaps additionally: bad_credential_status_code(creds.PKGBASE_MERGE). + # Don't take these examples verbatim. We should find good naming. + if not request.user.has_credential(creds.PKGBASE_MERGE): + context["errors"] = [ + "Only Trusted Users and Developers can merge packages."] + status_code = HTTPStatus.UNAUTHORIZED + + return render_template(request, "pkgbase/merge.html", context, + status_code=status_code) + + +@router.post("/pkgbase/{name}/merge") +@auth_required() +async def pkgbase_merge_post(request: Request, name: str, + into: str = Form(default=str()), + comments: str = Form(default=str()), + confirm: bool = Form(default=False), + next: str = Form(default=str())): + + pkgbase = get_pkg_or_base(name, PackageBase) + context = await make_variable_context(request, "Package Merging") + context["pkgbase"] = pkgbase + + # TODO: Lookup errors from credential instead of hardcoding them. + if not request.user.has_credential(creds.PKGBASE_MERGE): + context["errors"] = [ + "Only Trusted Users and Developers can merge packages."] + return render_template(request, "pkgbase/merge.html", context, + status_code=HTTPStatus.UNAUTHORIZED) + + if not confirm: + context["errors"] = ["The selected packages have not been deleted, " + "check the confirmation checkbox."] + return render_template(request, "pkgbase/merge.html", context, + status_code=HTTPStatus.BAD_REQUEST) + + try: + target = get_pkg_or_base(into, PackageBase) + except HTTPException: + context["errors"] = [ + "Cannot find package to merge votes and comments into."] + return render_template(request, "pkgbase/merge.html", context, + status_code=HTTPStatus.BAD_REQUEST) + + if pkgbase == target: + context["errors"] = ["Cannot merge a package base with itself."] + return render_template(request, "pkgbase/merge.html", context, + status_code=HTTPStatus.BAD_REQUEST) + + with db.begin(): + update_closure_comment(pkgbase, MERGE_ID, comments, target=target) + + # Merge pkgbase into target. + actions.pkgbase_merge_instance(request, pkgbase, target, comments=comments) + + if not next: + next = f"/pkgbase/{target.Name}" + + # Redirect to the newly merged into package. + return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER)