change(python): put pkgbase routes & impl into their own modules

Introduces new router:
- `aurweb.routers.pkgbase`

Introduces new package:
- `aurweb.pkgbase`

Introduces new modules:
- `aurweb.pkgbase.actions`
- `aurweb.pkgbase.util`

Changes:
- `pkgbase_{action}_instance` functions are now located in
  `aurweb.pkgbase.actions`.
- `pkgbase`-wise routes have been moved to
  `aurweb.routers.pkgbase`.
- `make_single_context` was moved to
  `aurweb.pkgbase.util.make_context`.

Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
Kevin Morris 2022-01-01 20:09:22 -08:00
parent c735f9868b
commit bd2ad9b616
No known key found for this signature in database
GPG key ID: F7E46DED420788F3
6 changed files with 910 additions and 777 deletions

View file

@ -21,7 +21,7 @@ from aurweb.auth import BasicAuthBackend
from aurweb.db import get_engine, query from aurweb.db import get_engine, query
from aurweb.models import AcceptedTerm, Term from aurweb.models import AcceptedTerm, Term
from aurweb.prometheus import http_api_requests_total, http_requests_total, instrumentator from aurweb.prometheus import http_api_requests_total, http_requests_total, instrumentator
from aurweb.routers import accounts, auth, html, packages, rpc, rss, sso, trusted_user from aurweb.routers import accounts, auth, html, packages, pkgbase, rpc, rss, sso, trusted_user
from aurweb.templates import make_context, render_template from aurweb.templates import make_context, render_template
# Setup the FastAPI app. # Setup the FastAPI app.
@ -81,6 +81,7 @@ async def app_startup():
app.include_router(trusted_user.router) app.include_router(trusted_user.router)
app.include_router(rss.router) app.include_router(rss.router)
app.include_router(packages.router) app.include_router(packages.router)
app.include_router(pkgbase.router)
app.include_router(rpc.router) app.include_router(rpc.router)
# Initialize the database engine and ORM. # Initialize the database engine and ORM.

View file

142
aurweb/pkgbase/actions.py Normal file
View file

@ -0,0 +1,142 @@
from typing import List
from fastapi import Request
from aurweb import db, logging, util
from aurweb.auth import creds
from aurweb.models import PackageBase
from aurweb.models.package_comaintainer import PackageComaintainer
from aurweb.models.package_notification import PackageNotification
from aurweb.models.request_type import DELETION_ID, MERGE_ID, ORPHAN_ID
from aurweb.packages.requests import handle_request, update_closure_comment
from aurweb.scripts import notify, popupdate
logger = logging.get_logger(__name__)
def pkgbase_notify_instance(request: Request, pkgbase: PackageBase) -> None:
notif = db.query(pkgbase.notifications.filter(
PackageNotification.UserID == request.user.ID
).exists()).scalar()
has_cred = request.user.has_credential(creds.PKGBASE_NOTIFY)
if has_cred and not notif:
with db.begin():
db.create(PackageNotification,
PackageBase=pkgbase,
User=request.user)
def pkgbase_unnotify_instance(request: Request, pkgbase: PackageBase) -> None:
notif = pkgbase.notifications.filter(
PackageNotification.UserID == request.user.ID
).first()
has_cred = request.user.has_credential(creds.PKGBASE_NOTIFY)
if has_cred and notif:
with db.begin():
db.delete(notif)
def pkgbase_unflag_instance(request: Request, pkgbase: PackageBase) -> None:
has_cred = request.user.has_credential(
creds.PKGBASE_UNFLAG, approved=[pkgbase.Flagger, pkgbase.Maintainer])
if has_cred:
with db.begin():
pkgbase.OutOfDateTS = None
pkgbase.Flagger = None
pkgbase.FlaggerComment = str()
def pkgbase_disown_instance(request: Request, pkgbase: PackageBase) -> None:
import aurweb.packages.util as pkgutil
disowner = request.user
notifs = [notify.DisownNotification(disowner.ID, pkgbase.ID)]
is_maint = disowner == pkgbase.Maintainer
if is_maint:
with db.begin():
# Comaintainer with the lowest Priority value; next-in-line.
prio_comaint = pkgbase.comaintainers.order_by(
PackageComaintainer.Priority.asc()
).first()
if prio_comaint:
# If there is such a comaintainer, promote them to maint.
pkgbase.Maintainer = prio_comaint.User
notifs.append(pkgutil.remove_comaintainer(prio_comaint))
else:
# Otherwise, just orphan the package completely.
pkgbase.Maintainer = None
elif request.user.has_credential(creds.PKGBASE_DISOWN):
# Otherwise, the request user performing this disownage is a
# Trusted User and we treat it like a standard orphan request.
notifs += handle_request(request, ORPHAN_ID, pkgbase)
with db.begin():
pkgbase.Maintainer = None
util.apply_all(notifs, lambda n: n.send())
def pkgbase_adopt_instance(request: Request, pkgbase: PackageBase) -> None:
with db.begin():
pkgbase.Maintainer = request.user
notif = notify.AdoptNotification(request.user.ID, pkgbase.ID)
notif.send()
def pkgbase_delete_instance(request: Request, pkgbase: PackageBase,
comments: str = str()) \
-> List[notify.Notification]:
notifs = handle_request(request, DELETION_ID, pkgbase) + [
notify.DeleteNotification(request.user.ID, pkgbase.ID)
]
with db.begin():
update_closure_comment(pkgbase, DELETION_ID, comments)
db.delete(pkgbase)
return notifs
def pkgbase_merge_instance(request: Request, pkgbase: PackageBase,
target: PackageBase, comments: str = str()) -> None:
pkgbasename = str(pkgbase.Name)
# Create notifications.
notifs = handle_request(request, MERGE_ID, pkgbase, target)
# Target votes and notifications sets of user IDs that are
# looking to be migrated.
target_votes = set(v.UsersID for v in target.package_votes)
target_notifs = set(n.UserID for n in target.notifications)
with db.begin():
# Merge pkgbase's comments.
for comment in pkgbase.comments:
comment.PackageBase = target
# Merge notifications that don't yet exist in the target.
for notif in pkgbase.notifications:
if notif.UserID not in target_notifs:
notif.PackageBase = target
# Merge votes that don't yet exist in the target.
for vote in pkgbase.package_votes:
if vote.UsersID not in target_votes:
vote.PackageBase = target
# Run popupdate.
popupdate.run_single(target)
with db.begin():
# Delete pkgbase and its packages now that everything's merged.
for pkg in pkgbase.packages:
db.delete(pkg)
db.delete(pkgbase)
# Log this out for accountability purposes.
logger.info(f"Trusted User '{request.user.Username}' merged "
f"'{pkgbasename}' into '{target.Name}'.")
# Send notifications.
util.apply_all(notifs, lambda n: n.send())

47
aurweb/pkgbase/util.py Normal file
View file

@ -0,0 +1,47 @@
from typing import Any, Dict
from fastapi import Request
from aurweb import config
from aurweb.models import PackageBase
from aurweb.models.package_comment import PackageComment
from aurweb.models.package_request import PackageRequest
from aurweb.models.package_vote import PackageVote
from aurweb.templates import make_context as _make_context
def make_context(request: Request, pkgbase: PackageBase) -> Dict[str, Any]:
""" Make a basic context for package or pkgbase.
:param request: FastAPI request
:param pkgbase: PackageBase instance
:return: A pkgbase context without specific differences
"""
context = _make_context(request, pkgbase.Name)
context["git_clone_uri_anon"] = config.get("options", "git_clone_uri_anon")
context["git_clone_uri_priv"] = config.get("options", "git_clone_uri_priv")
context["pkgbase"] = pkgbase
context["packages_count"] = pkgbase.packages.count()
context["keywords"] = pkgbase.keywords
context["comments"] = pkgbase.comments.order_by(
PackageComment.CommentTS.desc()
)
context["pinned_comments"] = pkgbase.comments.filter(
PackageComment.PinnedTS != 0
).order_by(PackageComment.CommentTS.desc())
context["is_maintainer"] = bool(request.user == pkgbase.Maintainer)
context["notified"] = request.user.notified(pkgbase)
context["out_of_date"] = bool(pkgbase.OutOfDateTS)
context["voted"] = request.user.package_votes.filter(
PackageVote.PackageBaseID == pkgbase.ID
).scalar()
context["requests"] = pkgbase.requests.filter(
PackageRequest.ClosedTS.is_(None)
).count()
return context

View file

@ -3,27 +3,26 @@ from datetime import datetime
from http import HTTPStatus from http import HTTPStatus
from typing import Any, Dict, List from typing import Any, Dict, List
from fastapi import APIRouter, Form, HTTPException, Query, Request, Response from fastapi import APIRouter, Form, Query, Request, Response
from fastapi.responses import JSONResponse, RedirectResponse from fastapi.responses import RedirectResponse
from sqlalchemy import and_, case from sqlalchemy import case
import aurweb.filters import aurweb.filters
import aurweb.packages.util import aurweb.packages.util
from aurweb import config, db, defaults, l10n, logging, models, util from aurweb import config, db, defaults, logging, models, util
from aurweb.auth import auth_required, creds from aurweb.auth import auth_required, creds
from aurweb.exceptions import InvariantError, ValidationError from aurweb.exceptions import InvariantError, ValidationError
from aurweb.models.package_request import ACCEPTED_ID, PENDING_ID, REJECTED_ID from aurweb.models.package_request import ACCEPTED_ID, PENDING_ID, REJECTED_ID
from aurweb.models.relation_type import CONFLICTS_ID, PROVIDES_ID, REPLACES_ID from aurweb.models.relation_type import CONFLICTS_ID, PROVIDES_ID, REPLACES_ID
from aurweb.models.request_type import DELETION_ID, MERGE_ID, ORPHAN_ID
from aurweb.packages import util as pkgutil from aurweb.packages import util as pkgutil
from aurweb.packages import validate from aurweb.packages import validate
from aurweb.packages.requests import handle_request, update_closure_comment
from aurweb.packages.search import PackageSearch from aurweb.packages.search import PackageSearch
from aurweb.packages.util import get_pkg_or_base, get_pkgbase_comment, get_pkgreq_by_id from aurweb.packages.util import get_pkg_or_base, get_pkgreq_by_id
from aurweb.scripts import notify, popupdate from aurweb.pkgbase import actions as pkgbase_actions
from aurweb.scripts.rendercomment import update_comment_render_fastapi from aurweb.pkgbase import util as pkgbaseutil
from aurweb.templates import make_context, make_variable_context, render_raw_template, render_template from aurweb.scripts import notify
from aurweb.templates import make_context, make_variable_context, render_template
logger = logging.get_logger(__name__) logger = logging.get_logger(__name__)
router = APIRouter() router = APIRouter()
@ -130,59 +129,6 @@ async def packages(request: Request) -> Response:
return await packages_get(request, context) return await packages_get(request, context)
def pkgbase_delete_instance(request: Request, pkgbase: models.PackageBase,
comments: str = str()) \
-> List[notify.Notification]:
notifs = handle_request(request, DELETION_ID, pkgbase) + [
notify.DeleteNotification(request.user.ID, pkgbase.ID)
]
with db.begin():
update_closure_comment(pkgbase, DELETION_ID, comments)
db.delete(pkgbase)
return notifs
async def make_single_context(request: Request,
pkgbase: models.PackageBase) -> Dict[str, Any]:
""" Make a basic context for package or pkgbase.
:param request: FastAPI request
:param pkgbase: PackageBase instance
:return: A pkgbase context without specific differences
"""
context = make_context(request, pkgbase.Name)
context["git_clone_uri_anon"] = aurweb.config.get("options",
"git_clone_uri_anon")
context["git_clone_uri_priv"] = aurweb.config.get("options",
"git_clone_uri_priv")
context["pkgbase"] = pkgbase
context["packages_count"] = pkgbase.packages.count()
context["keywords"] = pkgbase.keywords
context["comments"] = pkgbase.comments.order_by(
models.PackageComment.CommentTS.desc()
)
context["pinned_comments"] = pkgbase.comments.filter(
models.PackageComment.PinnedTS != 0
).order_by(models.PackageComment.CommentTS.desc())
context["is_maintainer"] = (request.user.is_authenticated()
and request.user.ID == pkgbase.MaintainerUID)
context["notified"] = request.user.notified(pkgbase)
context["out_of_date"] = bool(pkgbase.OutOfDateTS)
context["voted"] = request.user.package_votes.filter(
models.PackageVote.PackageBaseID == pkgbase.ID).scalar()
context["requests"] = pkgbase.requests.filter(
models.PackageRequest.ClosedTS.is_(None)
).count()
return context
@router.get("/packages/{name}") @router.get("/packages/{name}")
async def package(request: Request, name: str) -> Response: async def package(request: Request, name: str) -> Response:
# Get the Package. # Get the Package.
@ -200,7 +146,7 @@ async def package(request: Request, name: str) -> Response:
rels_data["r"].append(rel) rels_data["r"].append(rel)
# Add our base information. # Add our base information.
context = await make_single_context(request, pkgbase) context = pkgbaseutil.make_context(request, pkgbase)
context["package"] = pkg context["package"] = pkg
# Package sources. # Package sources.
@ -235,249 +181,6 @@ async def package(request: Request, name: str) -> Response:
return render_template(request, "packages/show.html", context) return render_template(request, "packages/show.html", context)
@router.get("/pkgbase/{name}")
async def package_base(request: Request, name: str) -> Response:
# Get the PackageBase.
pkgbase = get_pkg_or_base(name, models.PackageBase)
# If this is not a split package, redirect to /packages/{name}.
if pkgbase.packages.count() == 1:
return RedirectResponse(f"/packages/{name}",
status_code=int(HTTPStatus.SEE_OTHER))
# Add our base information.
context = await make_single_context(request, pkgbase)
context["packages"] = pkgbase.packages.all()
return render_template(request, "pkgbase.html", context)
@router.get("/pkgbase/{name}/voters")
async def package_base_voters(request: Request, name: str) -> Response:
# Get the PackageBase.
pkgbase = get_pkg_or_base(name, models.PackageBase)
if not request.user.has_credential(creds.PKGBASE_LIST_VOTERS):
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
context = make_context(request, "Voters")
context["pkgbase"] = pkgbase
return render_template(request, "pkgbase/voters.html", context)
@router.post("/pkgbase/{name}/comments")
@auth_required()
async def pkgbase_comments_post(
request: Request, name: str,
comment: str = Form(default=str()),
enable_notifications: bool = Form(default=False)):
""" Add a new comment. """
pkgbase = get_pkg_or_base(name, models.PackageBase)
if not comment:
raise HTTPException(status_code=HTTPStatus.BAD_REQUEST)
# If the provided comment is different than the record's version,
# update the db record.
now = int(datetime.utcnow().timestamp())
with db.begin():
comment = db.create(models.PackageComment, User=request.user,
PackageBase=pkgbase,
Comments=comment, RenderedComment=str(),
CommentTS=now)
if enable_notifications and not request.user.notified(pkgbase):
db.create(models.PackageNotification,
User=request.user,
PackageBase=pkgbase)
update_comment_render_fastapi(comment)
# Redirect to the pkgbase page.
return RedirectResponse(f"/pkgbase/{pkgbase.Name}#comment-{comment.ID}",
status_code=HTTPStatus.SEE_OTHER)
@router.get("/pkgbase/{name}/comments/{id}/form")
@auth_required()
async def pkgbase_comment_form(request: Request, name: str, id: int,
next: str = Query(default=None)):
""" Produce a comment form for comment {id}. """
pkgbase = get_pkg_or_base(name, models.PackageBase)
comment = pkgbase.comments.filter(models.PackageComment.ID == id).first()
if not comment:
return JSONResponse({}, status_code=HTTPStatus.NOT_FOUND)
if not request.user.is_elevated() and request.user != comment.User:
return JSONResponse({}, status_code=HTTPStatus.UNAUTHORIZED)
context = await make_single_context(request, pkgbase)
context["comment"] = comment
if not next:
next = f"/pkgbase/{name}"
context["next"] = next
form = render_raw_template(
request, "partials/packages/comment_form.html", context)
return JSONResponse({"form": form})
@router.post("/pkgbase/{name}/comments/{id}")
@auth_required()
async def pkgbase_comment_post(
request: Request, name: str, id: int,
comment: str = Form(default=str()),
enable_notifications: bool = Form(default=False),
next: str = Form(default=None)):
pkgbase = get_pkg_or_base(name, models.PackageBase)
db_comment = get_pkgbase_comment(pkgbase, id)
if not comment:
raise HTTPException(status_code=HTTPStatus.BAD_REQUEST)
# If the provided comment is different than the record's version,
# update the db record.
now = int(datetime.utcnow().timestamp())
if db_comment.Comments != comment:
with db.begin():
db_comment.Comments = comment
db_comment.Editor = request.user
db_comment.EditedTS = now
db_notif = request.user.notifications.filter(
models.PackageNotification.PackageBaseID == pkgbase.ID
).first()
if enable_notifications and not db_notif:
db.create(models.PackageNotification,
User=request.user,
PackageBase=pkgbase)
update_comment_render_fastapi(db_comment)
if not next:
next = f"/pkgbase/{pkgbase.Name}"
# Redirect to the pkgbase page anchored to the updated comment.
return RedirectResponse(f"{next}#comment-{db_comment.ID}",
status_code=HTTPStatus.SEE_OTHER)
@router.get("/pkgbase/{name}/comments/{id}/edit")
@auth_required()
async def pkgbase_comment_edit(request: Request, name: str, id: int,
next: str = Form(default=None)):
pkgbase = get_pkg_or_base(name, models.PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
if not next:
next = f"/pkgbase/{name}"
context = await make_variable_context(request, "Edit comment", next=next)
context["comment"] = comment
return render_template(request, "packages/comments/edit.html", context)
@router.post("/pkgbase/{name}/comments/{id}/delete")
@auth_required()
async def pkgbase_comment_delete(request: Request, name: str, id: int,
next: str = Form(default=None)):
pkgbase = get_pkg_or_base(name, models.PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
authorized = request.user.has_credential(creds.COMMENT_DELETE,
[comment.User])
if not authorized:
_ = l10n.get_translator_for_request(request)
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail=_("You are not allowed to delete this comment."))
now = int(datetime.utcnow().timestamp())
with db.begin():
comment.Deleter = request.user
comment.DelTS = now
if not next:
next = f"/pkgbase/{name}"
return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/comments/{id}/undelete")
@auth_required()
async def pkgbase_comment_undelete(request: Request, name: str, id: int,
next: str = Form(default=None)):
pkgbase = get_pkg_or_base(name, models.PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
has_cred = request.user.has_credential(creds.COMMENT_UNDELETE,
approved=[comment.User])
if not has_cred:
_ = l10n.get_translator_for_request(request)
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail=_("You are not allowed to undelete this comment."))
with db.begin():
comment.Deleter = None
comment.DelTS = None
if not next:
next = f"/pkgbase/{name}"
return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/comments/{id}/pin")
@auth_required()
async def pkgbase_comment_pin(request: Request, name: str, id: int,
next: str = Form(default=None)):
pkgbase = get_pkg_or_base(name, models.PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
has_cred = request.user.has_credential(creds.COMMENT_PIN,
approved=[pkgbase.Maintainer])
if not has_cred:
_ = l10n.get_translator_for_request(request)
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail=_("You are not allowed to pin this comment."))
now = int(datetime.utcnow().timestamp())
with db.begin():
comment.PinnedTS = now
if not next:
next = f"/pkgbase/{name}"
return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/comments/{id}/unpin")
@auth_required()
async def pkgbase_comment_unpin(request: Request, name: str, id: int,
next: str = Form(default=None)):
pkgbase = get_pkg_or_base(name, models.PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
has_cred = request.user.has_credential(creds.COMMENT_PIN,
approved=[pkgbase.Maintainer])
if not has_cred:
_ = l10n.get_translator_for_request(request)
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail=_("You are not allowed to unpin this comment."))
with db.begin():
comment.PinnedTS = 0
if not next:
next = f"/pkgbase/{name}"
return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER)
@router.get("/pkgbase/{name}/comaintainers") @router.get("/pkgbase/{name}/comaintainers")
@auth_required() @auth_required()
async def package_base_comaintainers(request: Request, name: str) -> Response: async def package_base_comaintainers(request: Request, name: str) -> Response:
@ -654,7 +357,8 @@ async def pkgbase_request_post(request: Request, name: str,
logger.debug(f"New request #{pkgreq.ID} is marked for auto-orphan.") logger.debug(f"New request #{pkgreq.ID} is marked for auto-orphan.")
elif type == "deletion" and is_maintainer and outdated: elif type == "deletion" and is_maintainer and outdated:
# This request should be auto-accepted. # This request should be auto-accepted.
notifs = pkgbase_delete_instance(request, pkgbase, comments=comments) notifs = pkgbase_actions.pkgbase_delete_instance(
request, pkgbase, comments=comments)
util.apply_all(notifs, lambda n: n.send()) util.apply_all(notifs, lambda n: n.send())
logger.debug(f"New request #{pkgreq.ID} is marked for auto-deletion.") logger.debug(f"New request #{pkgreq.ID} is marked for auto-deletion.")
@ -704,339 +408,6 @@ async def requests_close_post(request: Request, id: int,
return RedirectResponse("/requests", status_code=HTTPStatus.SEE_OTHER) return RedirectResponse("/requests", status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/keywords")
async def pkgbase_keywords(request: Request, name: str,
keywords: str = Form(default=str())):
pkgbase = get_pkg_or_base(name, models.PackageBase)
keywords = set(keywords.split(" "))
# Delete all keywords which are not supplied by the user.
other_keywords = pkgbase.keywords.filter(
~models.PackageKeyword.Keyword.in_(keywords))
other_keyword_strings = [kwd.Keyword for kwd in other_keywords]
existing_keywords = set(
kwd.Keyword for kwd in
pkgbase.keywords.filter(
~models.PackageKeyword.Keyword.in_(other_keyword_strings))
)
with db.begin():
db.delete_all(other_keywords)
for keyword in keywords.difference(existing_keywords):
db.create(models.PackageKeyword,
PackageBase=pkgbase,
Keyword=keyword)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.get("/pkgbase/{name}/flag")
@auth_required()
async def pkgbase_flag_get(request: Request, name: str):
pkgbase = get_pkg_or_base(name, models.PackageBase)
has_cred = request.user.has_credential(creds.PKGBASE_FLAG)
if not has_cred or pkgbase.Flagger is not None:
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
context = make_context(request, "Flag Package Out-Of-Date")
context["pkgbase"] = pkgbase
return render_template(request, "packages/flag.html", context)
@router.post("/pkgbase/{name}/flag")
@auth_required()
async def pkgbase_flag_post(request: Request, name: str,
comments: str = Form(default=str())):
pkgbase = get_pkg_or_base(name, models.PackageBase)
if not comments:
context = make_context(request, "Flag Package Out-Of-Date")
context["pkgbase"] = pkgbase
context["errors"] = ["The selected packages have not been flagged, "
"please enter a comment."]
return render_template(request, "packages/flag.html", context,
status_code=HTTPStatus.BAD_REQUEST)
has_cred = request.user.has_credential(creds.PKGBASE_FLAG)
if has_cred and not pkgbase.Flagger:
now = int(datetime.utcnow().timestamp())
with db.begin():
pkgbase.OutOfDateTS = now
pkgbase.Flagger = request.user
pkgbase.FlaggerComment = comments
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.get("/pkgbase/{name}/flag-comment")
async def pkgbase_flag_comment(request: Request, name: str):
pkgbase = get_pkg_or_base(name, models.PackageBase)
if pkgbase.Flagger is None:
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
context = make_context(request, "Flag Comment")
context["pkgbase"] = pkgbase
return render_template(request, "packages/flag-comment.html", context)
def pkgbase_unflag_instance(request: Request, pkgbase: models.PackageBase):
has_cred = request.user.has_credential(
creds.PKGBASE_UNFLAG, approved=[pkgbase.Flagger, pkgbase.Maintainer])
if has_cred:
with db.begin():
pkgbase.OutOfDateTS = None
pkgbase.Flagger = None
pkgbase.FlaggerComment = str()
@router.post("/pkgbase/{name}/unflag")
@auth_required()
async def pkgbase_unflag(request: Request, name: str):
pkgbase = get_pkg_or_base(name, models.PackageBase)
pkgbase_unflag_instance(request, pkgbase)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
def pkgbase_notify_instance(request: Request, pkgbase: models.PackageBase):
notif = db.query(pkgbase.notifications.filter(
models.PackageNotification.UserID == request.user.ID
).exists()).scalar()
has_cred = request.user.has_credential(creds.PKGBASE_NOTIFY)
if has_cred and not notif:
with db.begin():
db.create(models.PackageNotification,
PackageBase=pkgbase,
User=request.user)
@router.post("/pkgbase/{name}/notify")
@auth_required()
async def pkgbase_notify(request: Request, name: str):
pkgbase = get_pkg_or_base(name, models.PackageBase)
pkgbase_notify_instance(request, pkgbase)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
def pkgbase_unnotify_instance(request: Request, pkgbase: models.PackageBase):
notif = pkgbase.notifications.filter(
models.PackageNotification.UserID == request.user.ID
).first()
has_cred = request.user.has_credential(creds.PKGBASE_NOTIFY)
if has_cred and notif:
with db.begin():
db.delete(notif)
@router.post("/pkgbase/{name}/unnotify")
@auth_required()
async def pkgbase_unnotify(request: Request, name: str):
pkgbase = get_pkg_or_base(name, models.PackageBase)
pkgbase_unnotify_instance(request, pkgbase)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/vote")
@auth_required()
async def pkgbase_vote(request: Request, name: str):
pkgbase = get_pkg_or_base(name, models.PackageBase)
vote = pkgbase.package_votes.filter(
models.PackageVote.UsersID == request.user.ID
).first()
has_cred = request.user.has_credential(creds.PKGBASE_VOTE)
if has_cred and not vote:
now = int(datetime.utcnow().timestamp())
with db.begin():
db.create(models.PackageVote,
User=request.user,
PackageBase=pkgbase,
VoteTS=now)
# Update NumVotes/Popularity.
popupdate.run_single(pkgbase)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/unvote")
@auth_required()
async def pkgbase_unvote(request: Request, name: str):
pkgbase = get_pkg_or_base(name, models.PackageBase)
vote = pkgbase.package_votes.filter(
models.PackageVote.UsersID == request.user.ID
).first()
has_cred = request.user.has_credential(creds.PKGBASE_VOTE)
if has_cred and vote:
with db.begin():
db.delete(vote)
# Update NumVotes/Popularity.
popupdate.run_single(pkgbase)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
def pkgbase_disown_instance(request: Request, pkgbase: models.PackageBase):
disowner = request.user
notifs = [notify.DisownNotification(disowner.ID, pkgbase.ID)]
is_maint = disowner == pkgbase.Maintainer
if is_maint:
with db.begin():
# Comaintainer with the lowest Priority value; next-in-line.
prio_comaint = pkgbase.comaintainers.order_by(
models.PackageComaintainer.Priority.asc()
).first()
if prio_comaint:
# If there is such a comaintainer, promote them to maint.
pkgbase.Maintainer = prio_comaint.User
notifs.append(pkgutil.remove_comaintainer(prio_comaint))
else:
# Otherwise, just orphan the package completely.
pkgbase.Maintainer = None
elif request.user.has_credential(creds.PKGBASE_DISOWN):
# Otherwise, the request user performing this disownage is a
# Trusted User and we treat it like a standard orphan request.
notifs += handle_request(request, ORPHAN_ID, pkgbase)
with db.begin():
pkgbase.Maintainer = None
util.apply_all(notifs, lambda n: n.send())
@router.get("/pkgbase/{name}/disown")
@auth_required()
async def pkgbase_disown_get(request: Request, name: str):
pkgbase = get_pkg_or_base(name, models.PackageBase)
has_cred = request.user.has_credential(creds.PKGBASE_DISOWN,
approved=[pkgbase.Maintainer])
if not has_cred:
return RedirectResponse(f"/pkgbase/{name}",
HTTPStatus.SEE_OTHER)
context = make_context(request, "Disown Package")
context["pkgbase"] = pkgbase
return render_template(request, "packages/disown.html", context)
@router.post("/pkgbase/{name}/disown")
@auth_required()
async def pkgbase_disown_post(request: Request, name: str,
comments: str = Form(default=str()),
confirm: bool = Form(default=False)):
pkgbase = get_pkg_or_base(name, models.PackageBase)
has_cred = request.user.has_credential(creds.PKGBASE_DISOWN,
approved=[pkgbase.Maintainer])
if not has_cred:
return RedirectResponse(f"/pkgbase/{name}",
HTTPStatus.SEE_OTHER)
context = make_context(request, "Disown Package")
context["pkgbase"] = pkgbase
if not confirm:
context["errors"] = [("The selected packages have not been disowned, "
"check the confirmation checkbox.")]
return render_template(request, "packages/disown.html", context,
status_code=HTTPStatus.BAD_REQUEST)
with db.begin():
update_closure_comment(pkgbase, ORPHAN_ID, comments)
try:
pkgbase_disown_instance(request, pkgbase)
except InvariantError as exc:
context["errors"] = [str(exc)]
return render_template(request, "packages/disown.html", context,
status_code=HTTPStatus.BAD_REQUEST)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
def pkgbase_adopt_instance(request: Request, pkgbase: models.PackageBase):
with db.begin():
pkgbase.Maintainer = request.user
notif = notify.AdoptNotification(request.user.ID, pkgbase.ID)
notif.send()
@router.post("/pkgbase/{name}/adopt")
@auth_required()
async def pkgbase_adopt_post(request: Request, name: str):
pkgbase = get_pkg_or_base(name, models.PackageBase)
has_cred = request.user.has_credential(creds.PKGBASE_ADOPT)
if has_cred or not pkgbase.Maintainer:
# If the user has credentials, they'll adopt the package regardless
# of maintainership. Otherwise, we'll promote the user to maintainer
# if no maintainer currently exists.
pkgbase_adopt_instance(request, pkgbase)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.get("/pkgbase/{name}/delete")
@auth_required()
async def pkgbase_delete_get(request: Request, name: str):
if not request.user.has_credential(creds.PKGBASE_DELETE):
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
context = make_context(request, "Package Deletion")
context["pkgbase"] = get_pkg_or_base(name, models.PackageBase)
return render_template(request, "packages/delete.html", context)
@router.post("/pkgbase/{name}/delete")
@auth_required()
async def pkgbase_delete_post(request: Request, name: str,
confirm: bool = Form(default=False),
comments: str = Form(default=str())):
pkgbase = get_pkg_or_base(name, models.PackageBase)
if not request.user.has_credential(creds.PKGBASE_DELETE):
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
if not confirm:
context = make_context(request, "Package Deletion")
context["pkgbase"] = pkgbase
context["errors"] = [("The selected packages have not been deleted, "
"check the confirmation checkbox.")]
return render_template(request, "packages/delete.html", context,
status_code=HTTPStatus.BAD_REQUEST)
if comments:
# Update any existing deletion requests' ClosureComment.
with db.begin():
requests = pkgbase.requests.filter(
and_(models.PackageRequest.Status == PENDING_ID,
models.PackageRequest.ReqTypeID == DELETION_ID)
)
for pkgreq in requests:
pkgreq.ClosureComment = comments
notifs = pkgbase_delete_instance(request, pkgbase, comments=comments)
util.apply_all(notifs, lambda n: n.send())
return RedirectResponse("/packages", status_code=HTTPStatus.SEE_OTHER)
async def packages_unflag(request: Request, package_ids: List[int] = [], async def packages_unflag(request: Request, package_ids: List[int] = [],
**kwargs): **kwargs):
if not package_ids: if not package_ids:
@ -1059,7 +430,7 @@ async def packages_unflag(request: Request, package_ids: List[int] = [],
bases.update({pkg.PackageBase}) bases.update({pkg.PackageBase})
for pkgbase in bases: for pkgbase in bases:
pkgbase_unflag_instance(request, pkgbase) pkgbase_actions.pkgbase_unflag_instance(request, pkgbase)
return (True, ["The selected packages have been unflagged."]) return (True, ["The selected packages have been unflagged."])
@ -1096,7 +467,7 @@ async def packages_notify(request: Request, package_ids: List[int] = [],
# If we get here, user input is good. # If we get here, user input is good.
for pkgbase in bases: for pkgbase in bases:
pkgbase_notify_instance(request, pkgbase) pkgbase_actions.pkgbase_notify_instance(request, pkgbase)
# TODO: This message does not yet have a translation. # TODO: This message does not yet have a translation.
return (True, ["The selected packages' notifications have been enabled."]) return (True, ["The selected packages' notifications have been enabled."])
@ -1133,7 +504,7 @@ async def packages_unnotify(request: Request, package_ids: List[int] = [],
return error_tuple return error_tuple
for pkgbase in bases: for pkgbase in bases:
pkgbase_unnotify_instance(request, pkgbase) pkgbase_actions.pkgbase_unnotify_instance(request, pkgbase)
# TODO: This message does not yet have a translation. # TODO: This message does not yet have a translation.
return (True, ["The selected packages' notifications have been removed."]) return (True, ["The selected packages' notifications have been removed."])
@ -1167,7 +538,7 @@ async def packages_adopt(request: Request, package_ids: List[int] = [],
# Now, really adopt the bases. # Now, really adopt the bases.
for pkgbase in bases: for pkgbase in bases:
pkgbase_adopt_instance(request, pkgbase) pkgbase_actions.pkgbase_adopt_instance(request, pkgbase)
return (True, ["The selected packages have been adopted."]) return (True, ["The selected packages have been adopted."])
@ -1177,7 +548,7 @@ def disown_all(request: Request, pkgbases: List[models.PackageBase]) \
errors = [] errors = []
for pkgbase in pkgbases: for pkgbase in pkgbases:
try: try:
pkgbase_disown_instance(request, pkgbase) pkgbase_actions.pkgbase_disown_instance(request, pkgbase)
except InvariantError as exc: except InvariantError as exc:
errors.append(str(exc)) errors.append(str(exc))
return errors return errors
@ -1246,7 +617,7 @@ async def packages_delete(request: Request, package_ids: List[int] = [],
deleted_bases, notifs = [], [] deleted_bases, notifs = [], []
for pkgbase in bases: for pkgbase in bases:
deleted_bases.append(pkgbase.Name) deleted_bases.append(pkgbase.Name)
notifs += pkgbase_delete_instance(request, pkgbase) notifs += pkgbase_actions.pkgbase_delete_instance(request, pkgbase)
# Log out the fact that this happened for accountability. # Log out the fact that this happened for accountability.
logger.info(f"Privileged user '{request.user.Username}' deleted the " logger.info(f"Privileged user '{request.user.Username}' deleted the "
@ -1298,132 +669,3 @@ async def packages_post(request: Request,
context["success"] = messages context["success"] = messages
return await packages_get(request, context) return await packages_get(request, context)
@router.get("/pkgbase/{name}/merge")
@auth_required()
async def pkgbase_merge_get(request: Request, name: str,
into: str = Query(default=str()),
next: str = Query(default=str())):
pkgbase = get_pkg_or_base(name, models.PackageBase)
if not next:
next = f"/pkgbase/{pkgbase.Name}"
context = make_context(request, "Package Merging")
context.update({
"pkgbase": pkgbase,
"into": into,
"next": next
})
status_code = HTTPStatus.OK
# TODO: Lookup errors from credential instead of hardcoding them.
# Idea: Something like credential_errors(creds.PKGBASE_MERGE).
# Perhaps additionally: bad_credential_status_code(creds.PKGBASE_MERGE).
# Don't take these examples verbatim. We should find good naming.
if not request.user.has_credential(creds.PKGBASE_MERGE):
context["errors"] = [
"Only Trusted Users and Developers can merge packages."]
status_code = HTTPStatus.UNAUTHORIZED
return render_template(request, "pkgbase/merge.html", context,
status_code=status_code)
def pkgbase_merge_instance(request: Request, pkgbase: models.PackageBase,
target: models.PackageBase, comments: str = str()):
pkgbasename = str(pkgbase.Name)
# Create notifications.
notifs = handle_request(request, MERGE_ID, pkgbase, target)
# Target votes and notifications sets of user IDs that are
# looking to be migrated.
target_votes = set(v.UsersID for v in target.package_votes)
target_notifs = set(n.UserID for n in target.notifications)
with db.begin():
# Merge pkgbase's comments.
for comment in pkgbase.comments:
comment.PackageBase = target
# Merge notifications that don't yet exist in the target.
for notif in pkgbase.notifications:
if notif.UserID not in target_notifs:
notif.PackageBase = target
# Merge votes that don't yet exist in the target.
for vote in pkgbase.package_votes:
if vote.UsersID not in target_votes:
vote.PackageBase = target
# Run popupdate.
popupdate.run_single(target)
with db.begin():
# Delete pkgbase and its packages now that everything's merged.
for pkg in pkgbase.packages:
db.delete(pkg)
db.delete(pkgbase)
# Log this out for accountability purposes.
logger.info(f"Trusted User '{request.user.Username}' merged "
f"'{pkgbasename}' into '{target.Name}'.")
# Send notifications.
util.apply_all(notifs, lambda n: n.send())
@router.post("/pkgbase/{name}/merge")
@auth_required()
async def pkgbase_merge_post(request: Request, name: str,
into: str = Form(default=str()),
comments: str = Form(default=str()),
confirm: bool = Form(default=False),
next: str = Form(default=str())):
pkgbase = get_pkg_or_base(name, models.PackageBase)
context = await make_variable_context(request, "Package Merging")
context["pkgbase"] = pkgbase
# TODO: Lookup errors from credential instead of hardcoding them.
if not request.user.has_credential(creds.PKGBASE_MERGE):
context["errors"] = [
"Only Trusted Users and Developers can merge packages."]
return render_template(request, "pkgbase/merge.html", context,
status_code=HTTPStatus.UNAUTHORIZED)
if not confirm:
context["errors"] = ["The selected packages have not been deleted, "
"check the confirmation checkbox."]
return render_template(request, "pkgbase/merge.html", context,
status_code=HTTPStatus.BAD_REQUEST)
try:
target = get_pkg_or_base(into, models.PackageBase)
except HTTPException:
context["errors"] = [
"Cannot find package to merge votes and comments into."]
return render_template(request, "pkgbase/merge.html", context,
status_code=HTTPStatus.BAD_REQUEST)
if pkgbase == target:
context["errors"] = ["Cannot merge a package base with itself."]
return render_template(request, "pkgbase/merge.html", context,
status_code=HTTPStatus.BAD_REQUEST)
with db.begin():
update_closure_comment(pkgbase, MERGE_ID, comments, target=target)
# Merge pkgbase into target.
pkgbase_merge_instance(request, pkgbase, target, comments=comments)
# Run popupdate on the target.
popupdate.run_single(target)
if not next:
next = f"/pkgbase/{target.Name}"
# Redirect to the newly merged into package.
return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER)

701
aurweb/routers/pkgbase.py Normal file
View file

@ -0,0 +1,701 @@
from datetime import datetime
from http import HTTPStatus
from fastapi import APIRouter, Form, HTTPException, Query, Request, Response
from fastapi.responses import JSONResponse, RedirectResponse
from sqlalchemy import and_
from aurweb import db, l10n, templates, util
from aurweb.auth import auth_required, creds
from aurweb.exceptions import InvariantError
from aurweb.models import PackageBase
from aurweb.models.package_comment import PackageComment
from aurweb.models.package_keyword import PackageKeyword
from aurweb.models.package_notification import PackageNotification
from aurweb.models.package_request import PENDING_ID, PackageRequest
from aurweb.models.package_vote import PackageVote
from aurweb.models.request_type import DELETION_ID, MERGE_ID, ORPHAN_ID
from aurweb.packages.requests import update_closure_comment
from aurweb.packages.util import get_pkg_or_base, get_pkgbase_comment
from aurweb.pkgbase import actions
from aurweb.pkgbase import util as pkgbaseutil
from aurweb.scripts import popupdate
from aurweb.scripts.rendercomment import update_comment_render_fastapi
from aurweb.templates import make_variable_context, render_template
router = APIRouter()
@router.get("/pkgbase/{name}")
async def pkgbase(request: Request, name: str) -> Response:
"""
Single package base view.
:param request: FastAPI Request
:param name: PackageBase.Name
:return: HTMLResponse
"""
# Get the PackageBase.
pkgbase = get_pkg_or_base(name, PackageBase)
# If this is not a split package, redirect to /packages/{name}.
if pkgbase.packages.count() == 1:
return RedirectResponse(f"/packages/{name}",
status_code=int(HTTPStatus.SEE_OTHER))
# Add our base information.
context = pkgbaseutil.make_context(request, pkgbase)
context["packages"] = pkgbase.packages.all()
return render_template(request, "pkgbase.html", context)
@router.get("/pkgbase/{name}/voters")
async def pkgbase_voters(request: Request, name: str) -> Response:
"""
View of package base voters.
Requires `request.user` has creds.PKGBASE_LIST_VOTERS credential.
:param request: FastAPI Request
:param name: PackageBase.Name
:return: HTMLResponse
"""
# Get the PackageBase.
pkgbase = get_pkg_or_base(name, PackageBase)
if not request.user.has_credential(creds.PKGBASE_LIST_VOTERS):
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
context = templates.make_context(request, "Voters")
context["pkgbase"] = pkgbase
return render_template(request, "pkgbase/voters.html", context)
@router.get("/pkgbase/{name}/flag-comment")
async def pkgbase_flag_comment(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
if pkgbase.Flagger is None:
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
context = templates.make_context(request, "Flag Comment")
context["pkgbase"] = pkgbase
return render_template(request, "packages/flag-comment.html", context)
@router.post("/pkgbase/{name}/keywords")
async def pkgbase_keywords(request: Request, name: str,
keywords: str = Form(default=str())):
pkgbase = get_pkg_or_base(name, PackageBase)
keywords = set(keywords.split(" "))
# Delete all keywords which are not supplied by the user.
other_keywords = pkgbase.keywords.filter(
~PackageKeyword.Keyword.in_(keywords))
other_keyword_strings = [kwd.Keyword for kwd in other_keywords]
existing_keywords = set(
kwd.Keyword for kwd in
pkgbase.keywords.filter(
~PackageKeyword.Keyword.in_(other_keyword_strings))
)
with db.begin():
db.delete_all(other_keywords)
for keyword in keywords.difference(existing_keywords):
db.create(PackageKeyword,
PackageBase=pkgbase,
Keyword=keyword)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.get("/pkgbase/{name}/flag")
@auth_required()
async def pkgbase_flag_get(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
has_cred = request.user.has_credential(creds.PKGBASE_FLAG)
if not has_cred or pkgbase.Flagger is not None:
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
context = templates.make_context(request, "Flag Package Out-Of-Date")
context["pkgbase"] = pkgbase
return render_template(request, "packages/flag.html", context)
@router.post("/pkgbase/{name}/flag")
@auth_required()
async def pkgbase_flag_post(request: Request, name: str,
comments: str = Form(default=str())):
pkgbase = get_pkg_or_base(name, PackageBase)
if not comments:
context = templates.make_context(request, "Flag Package Out-Of-Date")
context["pkgbase"] = pkgbase
context["errors"] = ["The selected packages have not been flagged, "
"please enter a comment."]
return render_template(request, "packages/flag.html", context,
status_code=HTTPStatus.BAD_REQUEST)
has_cred = request.user.has_credential(creds.PKGBASE_FLAG)
if has_cred and not pkgbase.Flagger:
now = int(datetime.utcnow().timestamp())
with db.begin():
pkgbase.OutOfDateTS = now
pkgbase.Flagger = request.user
pkgbase.FlaggerComment = comments
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/comments")
@auth_required()
async def pkgbase_comments_post(
request: Request, name: str,
comment: str = Form(default=str()),
enable_notifications: bool = Form(default=False)):
""" Add a new comment via POST request. """
pkgbase = get_pkg_or_base(name, PackageBase)
if not comment:
raise HTTPException(status_code=HTTPStatus.BAD_REQUEST)
# If the provided comment is different than the record's version,
# update the db record.
now = int(datetime.utcnow().timestamp())
with db.begin():
comment = db.create(PackageComment, User=request.user,
PackageBase=pkgbase,
Comments=comment, RenderedComment=str(),
CommentTS=now)
if enable_notifications and not request.user.notified(pkgbase):
db.create(PackageNotification,
User=request.user,
PackageBase=pkgbase)
update_comment_render_fastapi(comment)
# Redirect to the pkgbase page.
return RedirectResponse(f"/pkgbase/{pkgbase.Name}#comment-{comment.ID}",
status_code=HTTPStatus.SEE_OTHER)
@router.get("/pkgbase/{name}/comments/{id}/form")
@auth_required()
async def pkgbase_comment_form(request: Request, name: str, id: int,
next: str = Query(default=None)):
"""
Produce a comment form for comment {id}.
This route is used as a partial HTML endpoint when editing
package comments via Javascript. This endpoint used to be
part of the RPC as type=get-comment-form and has been
relocated here because the form returned cannot be used
externally and requires a POST request by the user.
:param request: FastAPI Request
:param name: PackageBase.Name
:param id: PackageComment.ID
:param next: Optional `next` value used for the comment form
:return: JSONResponse
"""
pkgbase = get_pkg_or_base(name, PackageBase)
comment = pkgbase.comments.filter(PackageComment.ID == id).first()
if not comment:
return JSONResponse({}, status_code=HTTPStatus.NOT_FOUND)
if not request.user.is_elevated() and request.user != comment.User:
return JSONResponse({}, status_code=HTTPStatus.UNAUTHORIZED)
context = pkgbaseutil.make_context(request, pkgbase)
context["comment"] = comment
if not next:
next = f"/pkgbase/{name}"
context["next"] = next
form = templates.render_raw_template(
request, "partials/packages/comment_form.html", context)
return JSONResponse({"form": form})
@router.get("/pkgbase/{name}/comments/{id}/edit")
@auth_required()
async def pkgbase_comment_edit(request: Request, name: str, id: int,
next: str = Form(default=None)):
"""
Render the non-javascript edit form.
:param request: FastAPI Request
:param name: PackageBase.Name
:param id: PackageComment.ID
:param next: Optional `next` parameter used in the POST request
:return: HTMLResponse
"""
pkgbase = get_pkg_or_base(name, PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
if not next:
next = f"/pkgbase/{name}"
context = await make_variable_context(request, "Edit comment", next=next)
context["comment"] = comment
return render_template(request, "packages/comments/edit.html", context)
@router.post("/pkgbase/{name}/comments/{id}")
@auth_required()
async def pkgbase_comment_post(
request: Request, name: str, id: int,
comment: str = Form(default=str()),
enable_notifications: bool = Form(default=False),
next: str = Form(default=None)):
""" Edit an existing comment. """
pkgbase = get_pkg_or_base(name, PackageBase)
db_comment = get_pkgbase_comment(pkgbase, id)
if not comment:
raise HTTPException(status_code=HTTPStatus.BAD_REQUEST)
# If the provided comment is different than the record's version,
# update the db record.
now = int(datetime.utcnow().timestamp())
if db_comment.Comments != comment:
with db.begin():
db_comment.Comments = comment
db_comment.Editor = request.user
db_comment.EditedTS = now
db_notif = request.user.notifications.filter(
PackageNotification.PackageBaseID == pkgbase.ID
).first()
if enable_notifications and not db_notif:
db.create(PackageNotification,
User=request.user,
PackageBase=pkgbase)
update_comment_render_fastapi(db_comment)
if not next:
next = f"/pkgbase/{pkgbase.Name}"
# Redirect to the pkgbase page anchored to the updated comment.
return RedirectResponse(f"{next}#comment-{db_comment.ID}",
status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/comments/{id}/pin")
@auth_required()
async def pkgbase_comment_pin(request: Request, name: str, id: int,
next: str = Form(default=None)):
"""
Pin a comment.
:param request: FastAPI Request
:param name: PackageBase.Name
:param id: PackageComment.ID
:param next: Optional `next` parameter used in the POST request
:return: RedirectResponse to `next`
"""
pkgbase = get_pkg_or_base(name, PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
has_cred = request.user.has_credential(creds.COMMENT_PIN,
approved=[pkgbase.Maintainer])
if not has_cred:
_ = l10n.get_translator_for_request(request)
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail=_("You are not allowed to pin this comment."))
now = int(datetime.utcnow().timestamp())
with db.begin():
comment.PinnedTS = now
if not next:
next = f"/pkgbase/{name}"
return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/comments/{id}/unpin")
@auth_required()
async def pkgbase_comment_unpin(request: Request, name: str, id: int,
next: str = Form(default=None)):
"""
Unpin a comment.
:param request: FastAPI Request
:param name: PackageBase.Name
:param id: PackageComment.ID
:param next: Optional `next` parameter used in the POST request
:return: RedirectResponse to `next`
"""
pkgbase = get_pkg_or_base(name, PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
has_cred = request.user.has_credential(creds.COMMENT_PIN,
approved=[pkgbase.Maintainer])
if not has_cred:
_ = l10n.get_translator_for_request(request)
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail=_("You are not allowed to unpin this comment."))
with db.begin():
comment.PinnedTS = 0
if not next:
next = f"/pkgbase/{name}"
return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/comments/{id}/delete")
@auth_required()
async def pkgbase_comment_delete(request: Request, name: str, id: int,
next: str = Form(default=None)):
"""
Delete a comment.
This action does **not** delete the comment from the database, but
sets PackageBase.DelTS and PackageBase.DeleterUID, which is used to
decide who gets to view the comment and what utilities it gets.
:param request: FastAPI Request
:param name: PackageBase.Name
:param id: PackageComment.ID
:param next: Optional `next` parameter used in the POST request
:return: RedirectResposne to `next`
"""
pkgbase = get_pkg_or_base(name, PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
authorized = request.user.has_credential(creds.COMMENT_DELETE,
[comment.User])
if not authorized:
_ = l10n.get_translator_for_request(request)
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail=_("You are not allowed to delete this comment."))
now = int(datetime.utcnow().timestamp())
with db.begin():
comment.Deleter = request.user
comment.DelTS = now
if not next:
next = f"/pkgbase/{name}"
return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/comments/{id}/undelete")
@auth_required()
async def pkgbase_comment_undelete(request: Request, name: str, id: int,
next: str = Form(default=None)):
"""
Undelete a comment.
This action does **not** undelete any comment from the database, but
unsets PackageBase.DelTS and PackageBase.DeleterUID which restores
the comment to a standard state.
:param request: FastAPI Request
:param name: PackageBase.Name
:param id: PackageComment.ID
:param next: Optional `next` parameter used in the POST request
:return: RedirectResponse to `next`
"""
pkgbase = get_pkg_or_base(name, PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
has_cred = request.user.has_credential(creds.COMMENT_UNDELETE,
approved=[comment.User])
if not has_cred:
_ = l10n.get_translator_for_request(request)
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail=_("You are not allowed to undelete this comment."))
with db.begin():
comment.Deleter = None
comment.DelTS = None
if not next:
next = f"/pkgbase/{name}"
return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/vote")
@auth_required()
async def pkgbase_vote(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
vote = pkgbase.package_votes.filter(
PackageVote.UsersID == request.user.ID
).first()
has_cred = request.user.has_credential(creds.PKGBASE_VOTE)
if has_cred and not vote:
now = int(datetime.utcnow().timestamp())
with db.begin():
db.create(PackageVote,
User=request.user,
PackageBase=pkgbase,
VoteTS=now)
# Update NumVotes/Popularity.
popupdate.run_single(pkgbase)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/unvote")
@auth_required()
async def pkgbase_unvote(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
vote = pkgbase.package_votes.filter(
PackageVote.UsersID == request.user.ID
).first()
has_cred = request.user.has_credential(creds.PKGBASE_VOTE)
if has_cred and vote:
with db.begin():
db.delete(vote)
# Update NumVotes/Popularity.
popupdate.run_single(pkgbase)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/notify")
@auth_required()
async def pkgbase_notify(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
actions.pkgbase_notify_instance(request, pkgbase)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/unnotify")
@auth_required()
async def pkgbase_unnotify(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
actions.pkgbase_unnotify_instance(request, pkgbase)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/unflag")
@auth_required()
async def pkgbase_unflag(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
actions.pkgbase_unflag_instance(request, pkgbase)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.get("/pkgbase/{name}/disown")
@auth_required()
async def pkgbase_disown_get(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
has_cred = request.user.has_credential(creds.PKGBASE_DISOWN,
approved=[pkgbase.Maintainer])
if not has_cred:
return RedirectResponse(f"/pkgbase/{name}",
HTTPStatus.SEE_OTHER)
context = templates.make_context(request, "Disown Package")
context["pkgbase"] = pkgbase
return render_template(request, "packages/disown.html", context)
@router.post("/pkgbase/{name}/disown")
@auth_required()
async def pkgbase_disown_post(request: Request, name: str,
comments: str = Form(default=str()),
confirm: bool = Form(default=False)):
pkgbase = get_pkg_or_base(name, PackageBase)
has_cred = request.user.has_credential(creds.PKGBASE_DISOWN,
approved=[pkgbase.Maintainer])
if not has_cred:
return RedirectResponse(f"/pkgbase/{name}",
HTTPStatus.SEE_OTHER)
context = templates.make_context(request, "Disown Package")
context["pkgbase"] = pkgbase
if not confirm:
context["errors"] = [("The selected packages have not been disowned, "
"check the confirmation checkbox.")]
return render_template(request, "packages/disown.html", context,
status_code=HTTPStatus.BAD_REQUEST)
with db.begin():
update_closure_comment(pkgbase, ORPHAN_ID, comments)
try:
actions.pkgbase_disown_instance(request, pkgbase)
except InvariantError as exc:
context["errors"] = [str(exc)]
return render_template(request, "packages/disown.html", context,
status_code=HTTPStatus.BAD_REQUEST)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.post("/pkgbase/{name}/adopt")
@auth_required()
async def pkgbase_adopt_post(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
has_cred = request.user.has_credential(creds.PKGBASE_ADOPT)
if has_cred or not pkgbase.Maintainer:
# If the user has credentials, they'll adopt the package regardless
# of maintainership. Otherwise, we'll promote the user to maintainer
# if no maintainer currently exists.
actions.pkgbase_adopt_instance(request, pkgbase)
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
@router.get("/pkgbase/{name}/delete")
@auth_required()
async def pkgbase_delete_get(request: Request, name: str):
if not request.user.has_credential(creds.PKGBASE_DELETE):
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
context = templates.make_context(request, "Package Deletion")
context["pkgbase"] = get_pkg_or_base(name, PackageBase)
return render_template(request, "packages/delete.html", context)
@router.post("/pkgbase/{name}/delete")
@auth_required()
async def pkgbase_delete_post(request: Request, name: str,
confirm: bool = Form(default=False),
comments: str = Form(default=str())):
pkgbase = get_pkg_or_base(name, PackageBase)
if not request.user.has_credential(creds.PKGBASE_DELETE):
return RedirectResponse(f"/pkgbase/{name}",
status_code=HTTPStatus.SEE_OTHER)
if not confirm:
context = templates.make_context(request, "Package Deletion")
context["pkgbase"] = pkgbase
context["errors"] = [("The selected packages have not been deleted, "
"check the confirmation checkbox.")]
return render_template(request, "packages/delete.html", context,
status_code=HTTPStatus.BAD_REQUEST)
if comments:
# Update any existing deletion requests' ClosureComment.
with db.begin():
requests = pkgbase.requests.filter(
and_(PackageRequest.Status == PENDING_ID,
PackageRequest.ReqTypeID == DELETION_ID)
)
for pkgreq in requests:
pkgreq.ClosureComment = comments
notifs = actions.pkgbase_delete_instance(
request, pkgbase, comments=comments)
util.apply_all(notifs, lambda n: n.send())
return RedirectResponse("/packages", status_code=HTTPStatus.SEE_OTHER)
@router.get("/pkgbase/{name}/merge")
@auth_required()
async def pkgbase_merge_get(request: Request, name: str,
into: str = Query(default=str()),
next: str = Query(default=str())):
pkgbase = get_pkg_or_base(name, PackageBase)
if not next:
next = f"/pkgbase/{pkgbase.Name}"
context = templates.make_context(request, "Package Merging")
context.update({
"pkgbase": pkgbase,
"into": into,
"next": next
})
status_code = HTTPStatus.OK
# TODO: Lookup errors from credential instead of hardcoding them.
# Idea: Something like credential_errors(creds.PKGBASE_MERGE).
# Perhaps additionally: bad_credential_status_code(creds.PKGBASE_MERGE).
# Don't take these examples verbatim. We should find good naming.
if not request.user.has_credential(creds.PKGBASE_MERGE):
context["errors"] = [
"Only Trusted Users and Developers can merge packages."]
status_code = HTTPStatus.UNAUTHORIZED
return render_template(request, "pkgbase/merge.html", context,
status_code=status_code)
@router.post("/pkgbase/{name}/merge")
@auth_required()
async def pkgbase_merge_post(request: Request, name: str,
into: str = Form(default=str()),
comments: str = Form(default=str()),
confirm: bool = Form(default=False),
next: str = Form(default=str())):
pkgbase = get_pkg_or_base(name, PackageBase)
context = await make_variable_context(request, "Package Merging")
context["pkgbase"] = pkgbase
# TODO: Lookup errors from credential instead of hardcoding them.
if not request.user.has_credential(creds.PKGBASE_MERGE):
context["errors"] = [
"Only Trusted Users and Developers can merge packages."]
return render_template(request, "pkgbase/merge.html", context,
status_code=HTTPStatus.UNAUTHORIZED)
if not confirm:
context["errors"] = ["The selected packages have not been deleted, "
"check the confirmation checkbox."]
return render_template(request, "pkgbase/merge.html", context,
status_code=HTTPStatus.BAD_REQUEST)
try:
target = get_pkg_or_base(into, PackageBase)
except HTTPException:
context["errors"] = [
"Cannot find package to merge votes and comments into."]
return render_template(request, "pkgbase/merge.html", context,
status_code=HTTPStatus.BAD_REQUEST)
if pkgbase == target:
context["errors"] = ["Cannot merge a package base with itself."]
return render_template(request, "pkgbase/merge.html", context,
status_code=HTTPStatus.BAD_REQUEST)
with db.begin():
update_closure_comment(pkgbase, MERGE_ID, comments, target=target)
# Merge pkgbase into target.
actions.pkgbase_merge_instance(request, pkgbase, target, comments=comments)
if not next:
next = f"/pkgbase/{target.Name}"
# Redirect to the newly merged into package.
return RedirectResponse(next, status_code=HTTPStatus.SEE_OTHER)