diff --git a/aurweb/routers/packages.py b/aurweb/routers/packages.py index 5ae19d07..385d91db 100644 --- a/aurweb/routers/packages.py +++ b/aurweb/routers/packages.py @@ -16,6 +16,7 @@ from aurweb.auth import auth_required from aurweb.models.license import License from aurweb.models.package import Package from aurweb.models.package_base import PackageBase +from aurweb.models.package_comaintainer import PackageComaintainer from aurweb.models.package_comment import PackageComment from aurweb.models.package_dependency import PackageDependency from aurweb.models.package_license import PackageLicense @@ -25,8 +26,10 @@ from aurweb.models.package_request import PackageRequest from aurweb.models.package_source import PackageSource from aurweb.models.package_vote import PackageVote from aurweb.models.relation_type import CONFLICTS_ID +from aurweb.models.user import User from aurweb.packages.search import PackageSearch from aurweb.packages.util import get_pkg_or_base, get_pkgbase_comment, query_notified, query_voted +from aurweb.scripts import notify from aurweb.scripts.rendercomment import update_comment_render from aurweb.templates import make_context, render_raw_template, render_template @@ -390,3 +393,145 @@ async def pkgbase_comment_unpin(request: Request, name: str, id: int): return RedirectResponse(f"/pkgbase/{name}", status_code=int(HTTPStatus.SEE_OTHER)) + + +@router.get("/pkgbase/{name}/comaintainers") +@auth_required(True) +async def package_base_comaintainers(request: Request, name: str) -> Response: + # Get the PackageBase. + pkgbase = get_pkg_or_base(name, PackageBase) + + # Unauthorized users (Non-TU/Dev and not the pkgbase maintainer) + # get redirected to the package base's page. + has_creds = request.user.has_credential("CRED_PKGBASE_EDIT_COMAINTAINERS", + approved=[pkgbase.Maintainer]) + if not has_creds: + return RedirectResponse(f"/pkgbase/{name}", + status_code=int(HTTPStatus.SEE_OTHER)) + + # Add our base information. + context = make_context(request, "Manage Co-maintainers") + context["pkgbase"] = pkgbase + + context["comaintainers"] = [ + c.User.Username for c in pkgbase.comaintainers + ] + + return render_template(request, "pkgbase/comaintainers.html", context) + + +def remove_users(pkgbase, usernames): + conn = db.ConnectionExecutor(db.get_engine().raw_connection()) + notifications = [] + with db.begin(): + for username in usernames: + # We know that the users we passed here are in the DB. + # No need to check for their existence. + comaintainer = pkgbase.comaintainers.join(User).filter( + User.Username == username + ).first() + notifications.append( + notify.ComaintainerRemoveNotification( + conn, comaintainer.User.ID, pkgbase.ID + ) + ) + db.session.delete(comaintainer) + + # Send out notifications if need be. + for notify_ in notifications: + notify_.send() + + +@router.post("/pkgbase/{name}/comaintainers") +@auth_required(True) +async def package_base_comaintainers_post( + request: Request, name: str, + users: str = Form(default=str())) -> Response: + # Get the PackageBase. + pkgbase = get_pkg_or_base(name, PackageBase) + + # Unauthorized users (Non-TU/Dev and not the pkgbase maintainer) + # get redirected to the package base's page. + has_creds = request.user.has_credential("CRED_PKGBASE_EDIT_COMAINTAINERS", + approved=[pkgbase.Maintainer]) + if not has_creds: + return RedirectResponse(f"/pkgbase/{name}", + status_code=int(HTTPStatus.SEE_OTHER)) + + users = set(users.split("\n")) + users.remove(str()) # Remove any empty strings from the set. + records = {c.User.Username for c in pkgbase.comaintainers} + + remove_users(pkgbase, records.difference(users)) + + # Default priority (lowest value; most preferred). + priority = 1 + + # Get the highest priority in the comaintainer set. + last_priority = pkgbase.comaintainers.order_by( + PackageComaintainer.Priority.desc() + ).limit(1).first() + + # If that record exists, we use a priority which is 1 higher. + # TODO: This needs to ensure that it wraps around and preserves + # ordering in the case where we hit the max number allowed by + # the Priority column type. + if last_priority: + priority = last_priority.Priority + 1 + + def add_users(usernames): + """ Add users as comaintainers to pkgbase. + + :param usernames: An iterable of username strings + :return: None on success, an error string on failure. """ + nonlocal request, pkgbase, priority + + # First, perform a check against all usernames given; for each + # username, add its related User object to memo. + _ = l10n.get_translator_for_request(request) + memo = {} + for username in usernames: + user = db.query(User).filter(User.Username == username).first() + if not user: + return _("Invalid user name: %s") % username + memo[username] = user + + # Alright, now that we got past the check, add them all to the DB. + conn = db.ConnectionExecutor(db.get_engine().raw_connection()) + notifications = [] + with db.begin(): + for username in usernames: + user = memo.get(username) + if pkgbase.Maintainer == user: + # Already a maintainer. Move along. + continue + + # If we get here, our user model object is in the memo. + comaintainer = db.create( + PackageComaintainer, + PackageBase=pkgbase, + User=user, + Priority=priority) + priority += 1 + + notifications.append( + notify.ComaintainerAddNotification( + conn, comaintainer.User.ID, pkgbase.ID) + ) + + # Send out notifications. + for notify_ in notifications: + notify_.send() + + error = add_users(users.difference(records)) + if error: + context = make_context(request, "Manage Co-maintainers") + context["pkgbase"] = pkgbase + context["comaintainers"] = [ + c.User.Username for c in pkgbase.comaintainers + ] + context["errors"] = [error] + return render_template(request, "pkgbase/comaintainers.html", context) + + return RedirectResponse(f"/pkgbase/{pkgbase.Name}", + status_code=int(HTTPStatus.SEE_OTHER)) diff --git a/templates/partials/packages/actions.html b/templates/partials/packages/actions.html index d552f2dd..6c30153c 100644 --- a/templates/partials/packages/actions.html +++ b/templates/partials/packages/actions.html @@ -117,9 +117,9 @@ {% endif %} - {% if is_maintainer %} + {% if request.user.has_credential('CRED_PKGBASE_EDIT_COMAINTAINERS', approved=[pkgbase.Maintainer]) %}
  • - + {{ "Manage Co-Maintainers" | tr }}
  • diff --git a/templates/pkgbase/comaintainers.html b/templates/pkgbase/comaintainers.html new file mode 100644 index 00000000..06e8b9d7 --- /dev/null +++ b/templates/pkgbase/comaintainers.html @@ -0,0 +1,40 @@ +{% extends "partials/layout.html" %} + +{% block pageContent %} + {% if errors %} + + {% endif %} + +
    +

    {{ "Manage Co-maintainers" | tr }}:

    +

    + {{ + "Use this form to add co-maintainers for %s%s%s " + "(one user name per line):" + | tr | format("", pkgbase.Name, "") + | safe + }} +

    + +
    +
    +

    + + +

    + +

    + +

    +
    +
    + +
    +{% endblock %} diff --git a/test/test_packages_routes.py b/test/test_packages_routes.py index 1c7d5d3e..f1c20067 100644 --- a/test/test_packages_routes.py +++ b/test/test_packages_routes.py @@ -1196,3 +1196,111 @@ def test_pkgbase_comment_unpin_unauthorized(client: TestClient, with client as request: resp = request.post(endpoint, cookies=cookies) assert resp.status_code == int(HTTPStatus.UNAUTHORIZED) + + +def test_pkgbase_comaintainers_not_found(client: TestClient, maintainer: User): + cookies = {"AURSID": maintainer.login(Request(), "testPassword")} + endpoint = "/pkgbase/fake/comaintainers" + with client as request: + resp = request.get(endpoint, cookies=cookies, allow_redirects=False) + assert resp.status_code == int(HTTPStatus.NOT_FOUND) + + +def test_pkgbase_comaintainers_post_not_found(client: TestClient, + maintainer: User): + cookies = {"AURSID": maintainer.login(Request(), "testPassword")} + endpoint = "/pkgbase/fake/comaintainers" + with client as request: + resp = request.post(endpoint, cookies=cookies, allow_redirects=False) + assert resp.status_code == int(HTTPStatus.NOT_FOUND) + + +def test_pkgbase_comaintainers_unauthorized(client: TestClient, user: User, + package: Package): + pkgbase = package.PackageBase + endpoint = f"/pkgbase/{pkgbase.Name}/comaintainers" + cookies = {"AURSID": user.login(Request(), "testPassword")} + with client as request: + resp = request.get(endpoint, cookies=cookies, allow_redirects=False) + assert resp.status_code == int(HTTPStatus.SEE_OTHER) + assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}" + + +def test_pkgbase_comaintainers_post_unauthorized(client: TestClient, + user: User, + package: Package): + pkgbase = package.PackageBase + endpoint = f"/pkgbase/{pkgbase.Name}/comaintainers" + cookies = {"AURSID": user.login(Request(), "testPassword")} + with client as request: + resp = request.post(endpoint, cookies=cookies, allow_redirects=False) + assert resp.status_code == int(HTTPStatus.SEE_OTHER) + assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}" + + +def test_pkgbase_comaintainers_post_invalid_user(client: TestClient, + maintainer: User, + package: Package): + pkgbase = package.PackageBase + endpoint = f"/pkgbase/{pkgbase.Name}/comaintainers" + cookies = {"AURSID": maintainer.login(Request(), "testPassword")} + with client as request: + resp = request.post(endpoint, data={ + "users": "\nfake\n" + }, cookies=cookies, allow_redirects=False) + assert resp.status_code == int(HTTPStatus.OK) + + root = parse_root(resp.text) + error = root.xpath('//ul[@class="errorlist"]/li')[0] + assert error.text.strip() == "Invalid user name: fake" + + +def test_pkgbase_comaintainers(client: TestClient, user: User, + maintainer: User, package: Package): + pkgbase = package.PackageBase + endpoint = f"/pkgbase/{pkgbase.Name}/comaintainers" + cookies = {"AURSID": maintainer.login(Request(), "testPassword")} + + # Start off by adding user as a comaintainer to package. + # The maintainer username given should be ignored. + with client as request: + resp = request.post(endpoint, data={ + "users": f"\n{user.Username}\n{maintainer.Username}\n" + }, cookies=cookies, allow_redirects=False) + assert resp.status_code == int(HTTPStatus.SEE_OTHER) + assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}" + + # Do it again to exercise the last_priority bump path. + with client as request: + resp = request.post(endpoint, data={ + "users": f"\n{user.Username}\n{maintainer.Username}\n" + }, cookies=cookies, allow_redirects=False) + assert resp.status_code == int(HTTPStatus.SEE_OTHER) + assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}" + + # Now that we've added a comaintainer to the pkgbase, + # let's perform a GET request to make sure that the backend produces + # the user we added in the users textarea. + with client as request: + resp = request.get(endpoint, cookies=cookies, allow_redirects=False) + assert resp.status_code == int(HTTPStatus.OK) + + root = parse_root(resp.text) + users = root.xpath('//textarea[@id="id_users"]')[0] + assert users.text.strip() == user.Username + + # Finish off by removing all the comaintainers. + with client as request: + resp = request.post(endpoint, data={ + "users": str() + }, cookies=cookies, allow_redirects=False) + assert resp.status_code == int(HTTPStatus.SEE_OTHER) + assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}" + + with client as request: + resp = request.get(endpoint, cookies=cookies, allow_redirects=False) + assert resp.status_code == int(HTTPStatus.OK) + + root = parse_root(resp.text) + users = root.xpath('//textarea[@id="id_users"]')[0] + assert users is not None and users.text is None