feat(FastAPI): add /pkgbase/{name}/comaintainers (get, post)

Changes from PHP:

- Form action now points to `/pkgbase/{name}/comaintainers`.
- When an error occurs, users are sent back to
  `/pkgbase/{name}/comaintainers` with an error at the top of the page.
  (PHP used to send people to /pkgbase/, which ended up at a blank
  search page).

Closes: https://gitlab.archlinux.org/archlinux/aurweb/-/issues/51

Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
Kevin Morris 2021-09-16 19:42:09 -07:00
parent 2efd254974
commit 4d191b51f9
No known key found for this signature in database
GPG key ID: F7E46DED420788F3
4 changed files with 295 additions and 2 deletions

View file

@ -16,6 +16,7 @@ from aurweb.auth import auth_required
from aurweb.models.license import License from aurweb.models.license import License
from aurweb.models.package import Package from aurweb.models.package import Package
from aurweb.models.package_base import PackageBase from aurweb.models.package_base import PackageBase
from aurweb.models.package_comaintainer import PackageComaintainer
from aurweb.models.package_comment import PackageComment from aurweb.models.package_comment import PackageComment
from aurweb.models.package_dependency import PackageDependency from aurweb.models.package_dependency import PackageDependency
from aurweb.models.package_license import PackageLicense from aurweb.models.package_license import PackageLicense
@ -25,8 +26,10 @@ from aurweb.models.package_request import PackageRequest
from aurweb.models.package_source import PackageSource from aurweb.models.package_source import PackageSource
from aurweb.models.package_vote import PackageVote from aurweb.models.package_vote import PackageVote
from aurweb.models.relation_type import CONFLICTS_ID from aurweb.models.relation_type import CONFLICTS_ID
from aurweb.models.user import User
from aurweb.packages.search import PackageSearch from aurweb.packages.search import PackageSearch
from aurweb.packages.util import get_pkg_or_base, get_pkgbase_comment, query_notified, query_voted from aurweb.packages.util import get_pkg_or_base, get_pkgbase_comment, query_notified, query_voted
from aurweb.scripts import notify
from aurweb.scripts.rendercomment import update_comment_render from aurweb.scripts.rendercomment import update_comment_render
from aurweb.templates import make_context, render_raw_template, render_template from aurweb.templates import make_context, render_raw_template, render_template
@ -390,3 +393,145 @@ async def pkgbase_comment_unpin(request: Request, name: str, id: int):
return RedirectResponse(f"/pkgbase/{name}", return RedirectResponse(f"/pkgbase/{name}",
status_code=int(HTTPStatus.SEE_OTHER)) status_code=int(HTTPStatus.SEE_OTHER))
@router.get("/pkgbase/{name}/comaintainers")
@auth_required(True)
async def package_base_comaintainers(request: Request, name: str) -> Response:
# Get the PackageBase.
pkgbase = get_pkg_or_base(name, PackageBase)
# Unauthorized users (Non-TU/Dev and not the pkgbase maintainer)
# get redirected to the package base's page.
has_creds = request.user.has_credential("CRED_PKGBASE_EDIT_COMAINTAINERS",
approved=[pkgbase.Maintainer])
if not has_creds:
return RedirectResponse(f"/pkgbase/{name}",
status_code=int(HTTPStatus.SEE_OTHER))
# Add our base information.
context = make_context(request, "Manage Co-maintainers")
context["pkgbase"] = pkgbase
context["comaintainers"] = [
c.User.Username for c in pkgbase.comaintainers
]
return render_template(request, "pkgbase/comaintainers.html", context)
def remove_users(pkgbase, usernames):
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
notifications = []
with db.begin():
for username in usernames:
# We know that the users we passed here are in the DB.
# No need to check for their existence.
comaintainer = pkgbase.comaintainers.join(User).filter(
User.Username == username
).first()
notifications.append(
notify.ComaintainerRemoveNotification(
conn, comaintainer.User.ID, pkgbase.ID
)
)
db.session.delete(comaintainer)
# Send out notifications if need be.
for notify_ in notifications:
notify_.send()
@router.post("/pkgbase/{name}/comaintainers")
@auth_required(True)
async def package_base_comaintainers_post(
request: Request, name: str,
users: str = Form(default=str())) -> Response:
# Get the PackageBase.
pkgbase = get_pkg_or_base(name, PackageBase)
# Unauthorized users (Non-TU/Dev and not the pkgbase maintainer)
# get redirected to the package base's page.
has_creds = request.user.has_credential("CRED_PKGBASE_EDIT_COMAINTAINERS",
approved=[pkgbase.Maintainer])
if not has_creds:
return RedirectResponse(f"/pkgbase/{name}",
status_code=int(HTTPStatus.SEE_OTHER))
users = set(users.split("\n"))
users.remove(str()) # Remove any empty strings from the set.
records = {c.User.Username for c in pkgbase.comaintainers}
remove_users(pkgbase, records.difference(users))
# Default priority (lowest value; most preferred).
priority = 1
# Get the highest priority in the comaintainer set.
last_priority = pkgbase.comaintainers.order_by(
PackageComaintainer.Priority.desc()
).limit(1).first()
# If that record exists, we use a priority which is 1 higher.
# TODO: This needs to ensure that it wraps around and preserves
# ordering in the case where we hit the max number allowed by
# the Priority column type.
if last_priority:
priority = last_priority.Priority + 1
def add_users(usernames):
""" Add users as comaintainers to pkgbase.
:param usernames: An iterable of username strings
:return: None on success, an error string on failure. """
nonlocal request, pkgbase, priority
# First, perform a check against all usernames given; for each
# username, add its related User object to memo.
_ = l10n.get_translator_for_request(request)
memo = {}
for username in usernames:
user = db.query(User).filter(User.Username == username).first()
if not user:
return _("Invalid user name: %s") % username
memo[username] = user
# Alright, now that we got past the check, add them all to the DB.
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
notifications = []
with db.begin():
for username in usernames:
user = memo.get(username)
if pkgbase.Maintainer == user:
# Already a maintainer. Move along.
continue
# If we get here, our user model object is in the memo.
comaintainer = db.create(
PackageComaintainer,
PackageBase=pkgbase,
User=user,
Priority=priority)
priority += 1
notifications.append(
notify.ComaintainerAddNotification(
conn, comaintainer.User.ID, pkgbase.ID)
)
# Send out notifications.
for notify_ in notifications:
notify_.send()
error = add_users(users.difference(records))
if error:
context = make_context(request, "Manage Co-maintainers")
context["pkgbase"] = pkgbase
context["comaintainers"] = [
c.User.Username for c in pkgbase.comaintainers
]
context["errors"] = [error]
return render_template(request, "pkgbase/comaintainers.html", context)
return RedirectResponse(f"/pkgbase/{pkgbase.Name}",
status_code=int(HTTPStatus.SEE_OTHER))

View file

@ -117,9 +117,9 @@
{% endif %} {% endif %}
</form> </form>
{% if is_maintainer %} {% if request.user.has_credential('CRED_PKGBASE_EDIT_COMAINTAINERS', approved=[pkgbase.Maintainer]) %}
<li> <li>
<a href="/pkgbase/{{ result.Name }}/comaintainers/"> <a href="/pkgbase/{{ pkgbase.Name }}/comaintainers/">
{{ "Manage Co-Maintainers" | tr }} {{ "Manage Co-Maintainers" | tr }}
</a> </a>
</li> </li>

View file

@ -0,0 +1,40 @@
{% extends "partials/layout.html" %}
{% block pageContent %}
{% if errors %}
<ul class="errorlist">
{% for error in errors %}
<li>{{ error | tr }}</li>
{% endfor %}
</ul>
{% endif %}
<div class="box">
<h2>{{ "Manage Co-maintainers" | tr }}:</h2>
<p>
{{
"Use this form to add co-maintainers for %s%s%s "
"(one user name per line):"
| tr | format("<strong>", pkgbase.Name, "</strong>")
| safe
}}
</p>
<form action="/pkgbase/{{ pkgbase.Name }}/comaintainers" method="post">
<fieldset>
<p>
<label for="id_users">{{ "Users" | tr }}:</label>
<textarea id="id_users" name="users"
rows="5" cols="50">{{ "\n".join(comaintainers) }}</textarea>
</p>
<p>
<button class="button" type="submit">
{{ "Save" | tr }}
</button>
</p>
</fieldset>
</form>
</div>
{% endblock %}

View file

@ -1196,3 +1196,111 @@ def test_pkgbase_comment_unpin_unauthorized(client: TestClient,
with client as request: with client as request:
resp = request.post(endpoint, cookies=cookies) resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.UNAUTHORIZED) assert resp.status_code == int(HTTPStatus.UNAUTHORIZED)
def test_pkgbase_comaintainers_not_found(client: TestClient, maintainer: User):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
endpoint = "/pkgbase/fake/comaintainers"
with client as request:
resp = request.get(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.NOT_FOUND)
def test_pkgbase_comaintainers_post_not_found(client: TestClient,
maintainer: User):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
endpoint = "/pkgbase/fake/comaintainers"
with client as request:
resp = request.post(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.NOT_FOUND)
def test_pkgbase_comaintainers_unauthorized(client: TestClient, user: User,
package: Package):
pkgbase = package.PackageBase
endpoint = f"/pkgbase/{pkgbase.Name}/comaintainers"
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.get(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
def test_pkgbase_comaintainers_post_unauthorized(client: TestClient,
user: User,
package: Package):
pkgbase = package.PackageBase
endpoint = f"/pkgbase/{pkgbase.Name}/comaintainers"
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
def test_pkgbase_comaintainers_post_invalid_user(client: TestClient,
maintainer: User,
package: Package):
pkgbase = package.PackageBase
endpoint = f"/pkgbase/{pkgbase.Name}/comaintainers"
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
with client as request:
resp = request.post(endpoint, data={
"users": "\nfake\n"
}, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
error = root.xpath('//ul[@class="errorlist"]/li')[0]
assert error.text.strip() == "Invalid user name: fake"
def test_pkgbase_comaintainers(client: TestClient, user: User,
maintainer: User, package: Package):
pkgbase = package.PackageBase
endpoint = f"/pkgbase/{pkgbase.Name}/comaintainers"
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
# Start off by adding user as a comaintainer to package.
# The maintainer username given should be ignored.
with client as request:
resp = request.post(endpoint, data={
"users": f"\n{user.Username}\n{maintainer.Username}\n"
}, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
# Do it again to exercise the last_priority bump path.
with client as request:
resp = request.post(endpoint, data={
"users": f"\n{user.Username}\n{maintainer.Username}\n"
}, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
# Now that we've added a comaintainer to the pkgbase,
# let's perform a GET request to make sure that the backend produces
# the user we added in the users textarea.
with client as request:
resp = request.get(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
users = root.xpath('//textarea[@id="id_users"]')[0]
assert users.text.strip() == user.Username
# Finish off by removing all the comaintainers.
with client as request:
resp = request.post(endpoint, data={
"users": str()
}, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
with client as request:
resp = request.get(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
users = root.xpath('//textarea[@id="id_users"]')[0]
assert users is not None and users.text is None