From 021a1c8fb6a0c74082617d465490efeafe99e98d Mon Sep 17 00:00:00 2001 From: Kevin Morris Date: Thu, 24 Jun 2021 22:39:16 -0700 Subject: [PATCH] add /accounts/ (get, post) routes Slight markup changes, same style overall and same form parameters as the PHP implementation. In addition, we've disabled the "left" and "right" navigation buttons when we're at the border of the table. CSS Changes: - Added similar styling to submit `` that submit `` had. - Added .results tr td[align="{left,right}"] styling to align the result table's `More -->` button to the right of the table. Signed-off-by: Kevin Morris --- aurweb/models/user.py | 9 + aurweb/routers/accounts.py | 92 ++++- templates/account/index.html | 13 + templates/account/search.html | 69 ++++ templates/partials/account/results.html | 82 ++++ test/test_accounts_routes.py | 490 +++++++++++++++++++++++- web/html/css/aurweb.css | 12 +- 7 files changed, 760 insertions(+), 7 deletions(-) create mode 100644 templates/account/index.html create mode 100644 templates/account/search.html create mode 100644 templates/partials/account/results.html diff --git a/aurweb/models/user.py b/aurweb/models/user.py index 1762f004..4705c050 100644 --- a/aurweb/models/user.py +++ b/aurweb/models/user.py @@ -168,6 +168,15 @@ class User(Base): aurweb.models.account_type.TRUSTED_USER_AND_DEV_ID } + def can_edit_user(self, user): + """ Can this account record edit the target user? It must either + be the target user or a user with enough permissions to do so. + + :param user: Target user + :return: Boolean indicating whether this instance can edit `user` + """ + return self == user or self.is_trusted_user() or self.is_developer() + def __repr__(self): return "" % ( self.ID, str(self.AccountType), self.Username) diff --git a/aurweb/routers/accounts.py b/aurweb/routers/accounts.py index 5d798ae8..466d129d 100644 --- a/aurweb/routers/accounts.py +++ b/aurweb/routers/accounts.py @@ -12,17 +12,18 @@ from sqlalchemy import and_, func, or_ import aurweb.config from aurweb import db, l10n, time, util -from aurweb.auth import auth_required +from aurweb.auth import account_type_required, auth_required from aurweb.captcha import get_captcha_answer, get_captcha_salts, get_captcha_token from aurweb.l10n import get_translator_for_request from aurweb.models.accepted_term import AcceptedTerm -from aurweb.models.account_type import AccountType +from aurweb.models.account_type import (DEVELOPER, DEVELOPER_ID, TRUSTED_USER, TRUSTED_USER_AND_DEV, TRUSTED_USER_AND_DEV_ID, + TRUSTED_USER_ID, USER_ID, AccountType) from aurweb.models.ban import Ban from aurweb.models.ssh_pub_key import SSHPubKey, get_fingerprint from aurweb.models.term import Term from aurweb.models.user import User from aurweb.scripts.notify import ResetKeyNotification -from aurweb.templates import make_variable_context, render_template +from aurweb.templates import make_context, make_variable_context, render_template router = APIRouter() logger = logging.getLogger(__name__) @@ -591,6 +592,91 @@ async def account(request: Request, username: str): return render_template(request, "account/show.html", context) +@router.get("/accounts/") +@auth_required(True) +@account_type_required({TRUSTED_USER, DEVELOPER, TRUSTED_USER_AND_DEV}) +async def accounts(request: Request): + context = make_context(request, "Accounts") + return render_template(request, "account/search.html", context) + + +@router.post("/accounts/") +@auth_required(True) +@account_type_required({TRUSTED_USER, DEVELOPER, TRUSTED_USER_AND_DEV}) +async def accounts_post(request: Request, + O: int = Form(default=0), # Offset + SB: str = Form(default=str()), # Search By + U: str = Form(default=str()), # Username + T: str = Form(default=str()), # Account Type + S: bool = Form(default=False), # Suspended + E: str = Form(default=str()), # Email + R: str = Form(default=str()), # Real Name + I: str = Form(default=str()), # IRC Nick + K: str = Form(default=str())): # PGP Key + context = await make_variable_context(request, "Accounts") + context["pp"] = pp = 50 # Hits per page. + + offset = max(O, 0) # Minimize offset at 0. + context["offset"] = offset # Offset. + + context["params"] = dict(await request.form()) + if "O" in context["params"]: + context["params"].pop("O") + + # Setup order by criteria based on SB. + order_by_columns = { + "t": (AccountType.ID.asc(), User.Username.asc()), + "r": (User.RealName.asc(), AccountType.ID.asc()), + "i": (User.IRCNick.asc(), AccountType.ID.asc()), + } + default_order = (User.Username.asc(), AccountType.ID.asc()) + order_by = order_by_columns.get(SB, default_order) + + # Convert parameter T to an AccountType ID. + account_types = { + "u": USER_ID, + "t": TRUSTED_USER_ID, + "d": DEVELOPER_ID, + "td": TRUSTED_USER_AND_DEV_ID + } + account_type_id = account_types.get(T, None) + + # Get a query handle to users, populate the total user + # count into a jinja2 context variable. + query = db.query(User).join(AccountType) + context["total_users"] = query.count() + + # Populate this list with any additional statements to + # be ANDed together. + statements = [] + if account_type_id is not None: + statements.append(AccountType.ID == account_type_id) + if U: + statements.append(User.Username.like(f"%{U}%")) + if S: + statements.append(User.Suspended == S) + if E: + statements.append(User.Email.like(f"%{E}%")) + if R: + statements.append(User.RealName.like(f"%{R}%")) + if I: + statements.append(User.IRCNick.like(f"%{I}%")) + if K: + statements.append(User.PGPKey.like(f"%{K}%")) + + # Filter the query by combining all statements added above into + # an AND statement, unless there's just one statement, which + # we pass on to filter() as args. + if statements: + query = query.filter(and_(*statements)) + + # Finally, order and truncate our users for the current page. + users = query.order_by(*order_by).limit(pp).offset(offset) + context["users"] = users + + return render_template(request, "account/index.html", context) + + def render_terms_of_service(request: Request, context: dict, terms: typing.Iterable): diff --git a/templates/account/index.html b/templates/account/index.html new file mode 100644 index 00000000..32b2ca16 --- /dev/null +++ b/templates/account/index.html @@ -0,0 +1,13 @@ +{% extends "partials/layout.html" %} + +{% block pageContent %} +
+

{{ "Accounts" | tr }}

+ + {% if not users %} + {{ "No results matched your search criteria." | tr }} + {% else %} + {% include "partials/account/results.html" %} + {% endif %} +
+{% endblock %} diff --git a/templates/account/search.html b/templates/account/search.html new file mode 100644 index 00000000..3f83d25b --- /dev/null +++ b/templates/account/search.html @@ -0,0 +1,69 @@ +{% extends "partials/layout.html" %} + +{% block pageContent %} +
+

{{ "Accounts" | tr }}

+ {{ "Use this form to search existing accounts." | tr }} +
+
+ +
+{% endblock %} diff --git a/templates/partials/account/results.html b/templates/partials/account/results.html new file mode 100644 index 00000000..910ae637 --- /dev/null +++ b/templates/partials/account/results.html @@ -0,0 +1,82 @@ + + + + + + + + + + + + + + + {% for user in users %} + + + + + + + + + + {% endfor %} + + +
{{ "Username" | tr }}{{ "Type" | tr }}{{ "Status" | tr }}{{ "Real Name" | tr }}{{ "IRC Nick" | tr }}{{ "PGP Key Fingerprint" | tr }}{{ "Edit Account" | tr }}
+ + {{ user.Username }} + + {{ user.AccountType.AccountType }}{{ "Suspended" if user.Suspended else "Active" }}{{ user.RealName | e }}{{ user.IRCNick | e }}{{ user.PGPKey or '' | e }} + {% if request.user.can_edit_user(user) %} + + {{ "Edit" | tr }} + + {% endif %} +
+ + + + + + +
+
+
+ + {% for k, v in params.items() %} + + {% endfor %} + +
+
+
+
+
+ + {% for k, v in params.items() %} + + {% endfor %} + +
+
+
+ diff --git a/test/test_accounts_routes.py b/test/test_accounts_routes.py index dbf13d19..567b3426 100644 --- a/test/test_accounts_routes.py +++ b/test/test_accounts_routes.py @@ -1,3 +1,4 @@ +import logging import re import tempfile @@ -14,7 +15,7 @@ from aurweb import captcha from aurweb.asgi import app from aurweb.db import commit, create, query from aurweb.models.accepted_term import AcceptedTerm -from aurweb.models.account_type import AccountType +from aurweb.models.account_type import DEVELOPER_ID, TRUSTED_USER_AND_DEV_ID, TRUSTED_USER_ID, AccountType from aurweb.models.ban import Ban from aurweb.models.session import Session from aurweb.models.ssh_pub_key import SSHPubKey, get_fingerprint @@ -31,6 +32,8 @@ TEST_EMAIL = "test@example.org" client = TestClient(app) user = None +logger = logging.getLogger(__name__) + def make_ssh_pubkey(): # Create a public key with ssh-keygen (this adds ssh-keygen as a @@ -55,8 +58,8 @@ def setup(): account_type = query(AccountType, AccountType.AccountType == "User").first() user = create(User, Username=TEST_USERNAME, Email=TEST_EMAIL, - RealName="Test User", Passwd="testPassword", - AccountType=account_type) + RealName="Test UserZ", Passwd="testPassword", + IRCNick="testZ", AccountType=account_type) yield user @@ -65,6 +68,14 @@ def setup(): setup_test_db("Terms", "AcceptedTerms") +@pytest.fixture +def tu_user(): + user.AccountType = query(AccountType, + AccountType.ID == TRUSTED_USER_AND_DEV_ID).first() + commit() + yield user + + def test_get_passreset_authed_redirects(): sid = user.login(Request(), "testPassword") assert sid is not None @@ -929,6 +940,479 @@ def test_get_account_unauthenticated(): assert "You must log in to view user information." in content +def test_get_accounts(tu_user): + """ Test that we can GET request /accounts/ and receive + a form which can be used to POST /accounts/. """ + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.get("/accounts/", cookies=cookies) + assert response.status_code == int(HTTPStatus.OK) + + parser = lxml.etree.HTMLParser() + root = lxml.etree.fromstring(response.text, parser=parser) + + # Get the form. + form = root.xpath('//form[contains(@class, "account-search-form")]') + + # Make sure there's only one form and it goes where it should. + assert len(form) == 1 + form = next(iter(form)) + assert form.attrib.get("method") == "post" + assert form.attrib.get("action") == "/accounts/" + + def field(element): + """ Return the given element string as a valid + selector in the form. """ + return f"./fieldset/p/{element}" + + username = form.xpath(field('input[@id="id_username"]')) + assert bool(username) + + account_type = form.xpath(field('select[@id="id_type"]')) + assert bool(account_type) + + suspended = form.xpath(field('input[@id="id_suspended"]')) + assert bool(suspended) + + email = form.xpath(field('input[@id="id_email"]')) + assert bool(email) + + realname = form.xpath(field('input[@id="id_realname"]')) + assert bool(realname) + + irc = form.xpath(field('input[@id="id_irc"]')) + assert bool(irc) + + sortby = form.xpath(field('select[@id="id_sortby"]')) + assert bool(sortby) + + +def parse_root(html): + parser = lxml.etree.HTMLParser() + return lxml.etree.fromstring(html, parser=parser) + + +def get_rows(html): + root = parse_root(html) + return root.xpath('//table[contains(@class, "users")]/tbody/tr') + + +def test_post_accounts(tu_user): + # Set a PGPKey. + user.PGPKey = "5F18B20346188419750745D7335F2CB41F253D30" + + # Create a few more users. + users = [user] + for i in range(10): + _user = create(User, Username=f"test_{i}", + Email=f"test_{i}@example.org", + RealName=f"Test #{i}", + Passwd="testPassword", + IRCNick=f"test_#{i}", + autocommit=False) + users.append(_user) + + # Commit everything to the database. + commit() + + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.post("/accounts/", cookies=cookies) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 11 + + # Simulate default ascending ORDER_BY. + sorted_users = sorted(users, key=lambda u: u.Username) + for i, _user in enumerate(sorted_users): + columns = rows[i].xpath("./td") + assert len(columns) == 7 + + username, atype, suspended, real_name, \ + irc_nick, pgp_key, edit = columns + + username = next(iter(username.xpath("./a"))) + assert username.text.strip() == _user.Username + + assert atype.text.strip() == str(_user.AccountType) + assert suspended.text.strip() == "Active" + assert real_name.text.strip() == _user.RealName + assert irc_nick.text == _user.IRCNick + assert pgp_key.text == (_user.PGPKey or None) + + edit = edit.xpath("./a") + if user.can_edit_user(_user): + edit = next(iter(edit)) + assert edit.text.strip() == "Edit" + else: + assert not edit + + logger.debug('Checked user row {"id": %s, "username": "%s"}.' + % (_user.ID, _user.Username)) + + +def test_post_accounts_username(tu_user): + # Test the U parameter path. + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"U": user.Username}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + + username = next(iter(username.xpath("./a"))) + assert username.text.strip() == user.Username + + +def test_post_accounts_account_type(tu_user): + # Check the different account type options. + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + # Make a user with the "User" role here so we can + # test the `u` parameter. + account_type = query(AccountType, + AccountType.AccountType == "User").first() + create(User, Username="test_2", + Email="test_2@example.org", + RealName="Test User 2", + Passwd="testPassword", + AccountType=account_type) + + # Expect no entries; we marked our only user as a User type. + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"T": "t"}) + assert response.status_code == int(HTTPStatus.OK) + assert len(get_rows(response.text)) == 0 + + # So, let's also ensure that specifying "u" returns our user. + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"T": "u"}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + + assert type.text.strip() == "User" + + # Set our only user to a Trusted User. + user.AccountType = query(AccountType, + AccountType.ID == TRUSTED_USER_ID).first() + commit() + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"T": "t"}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + + assert type.text.strip() == "Trusted User" + + user.AccountType = query(AccountType, + AccountType.ID == DEVELOPER_ID).first() + commit() + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"T": "d"}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + + assert type.text.strip() == "Developer" + + user.AccountType = query(AccountType, + AccountType.ID == TRUSTED_USER_AND_DEV_ID + ).first() + commit() + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"T": "td"}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + + assert type.text.strip() == "Trusted User & Developer" + + +def test_post_accounts_status(tu_user): + # Test the functionality of Suspended. + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.post("/accounts/", cookies=cookies) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + assert status.text.strip() == "Active" + + user.Suspended = True + commit() + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"S": True}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + assert status.text.strip() == "Suspended" + + +def test_post_accounts_email(tu_user): + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + # Search via email. + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"E": user.Email}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + +def test_post_accounts_realname(tu_user): + # Test the R parameter path. + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"R": user.RealName}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + +def test_post_accounts_irc(tu_user): + # Test the I parameter path. + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"I": user.IRCNick}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + +def test_post_accounts_sortby(tu_user): + # Create a second user so we can compare sorts. + account_type = query(AccountType, + AccountType.ID == DEVELOPER_ID).first() + create(User, Username="test2", + Email="test2@example.org", + RealName="Test User 2", + Passwd="testPassword", + IRCNick="test2", + AccountType=account_type) + + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + # Show that "u" is the default search order, by username. + with client as request: + response = request.post("/accounts/", cookies=cookies) + assert response.status_code == int(HTTPStatus.OK) + rows = get_rows(response.text) + assert len(rows) == 2 + first_rows = rows + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"SB": "u"}) + assert response.status_code == int(HTTPStatus.OK) + rows = get_rows(response.text) + assert len(rows) == 2 + + def compare_text_values(column, lhs, rhs): + return [row[column].text.strip() for row in lhs] \ + == [row[column].text.strip() for row in rhs] + + # Test the username rows are ordered the same. + assert compare_text_values(0, first_rows, rows) is True + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"SB": "i"}) + assert response.status_code == int(HTTPStatus.OK) + rows = get_rows(response.text) + assert len(rows) == 2 + + # Test the rows are reversed when ordering by IRCNick. + assert compare_text_values(4, first_rows, reversed(rows)) is True + + # Sort by "i" -> RealName. + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"SB": "r"}) + assert response.status_code == int(HTTPStatus.OK) + rows = get_rows(response.text) + assert len(rows) == 2 + + # Test the rows are reversed when ordering by RealName. + assert compare_text_values(4, first_rows, reversed(rows)) is True + + user.AccountType = query(AccountType, + AccountType.ID == TRUSTED_USER_AND_DEV_ID).first() + commit() + + # Fetch first_rows again with our new AccountType ordering. + with client as request: + response = request.post("/accounts/", cookies=cookies) + assert response.status_code == int(HTTPStatus.OK) + rows = get_rows(response.text) + assert len(rows) == 2 + first_rows = rows + + # Sort by "t" -> AccountType. + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"SB": "t"}) + assert response.status_code == int(HTTPStatus.OK) + rows = get_rows(response.text) + assert len(rows) == 2 + + # Test that rows again got reversed. + assert compare_text_values(1, first_rows, reversed(rows)) + + +def test_post_accounts_pgp_key(tu_user): + user.PGPKey = "5F18B20346188419750745D7335F2CB41F253D30" + commit() + + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + # Search via PGPKey. + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"K": user.PGPKey}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + +def test_post_accounts_paged(tu_user): + # Create 150 users. + users = [user] + account_type = query(AccountType, + AccountType.AccountType == "User").first() + for i in range(150): + _user = create(User, Username=f"test_#{i}", + Email=f"test_#{i}@example.org", + RealName=f"Test User #{i}", + Passwd="testPassword", + AccountType=account_type, + autocommit=False) + users.append(_user) + commit() + + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.post("/accounts/", cookies=cookies) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 50 # `pp`, or hits per page is defined at 50. + + # Sort users in ascending default sort by order. + sorted_users = sorted(users, key=lambda u: u.Username) + + # Get the first fifty sorted users and assert that's what + # we got in the first search result page. + first_fifty = sorted_users[:50] + + for i, _user in enumerate(first_fifty): + row = rows[i] + username = row[0].xpath("./a")[0] # First column + assert username.text.strip() == _user.Username + + root = parse_root(response.text) + page_prev = root.xpath('//button[contains(@class, "page-prev")]')[0] + page_next = root.xpath('//button[contains(@class, "page-next")]')[0] + + assert page_prev.attrib["disabled"] == "disabled" + assert "disabled" not in page_next.attrib + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"O": 50}) # +50 offset. + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 50 + + second_fifty = sorted_users[50:100] + + for i, _user in enumerate(second_fifty): + row = rows[i] + username = row[0].xpath("./a")[0] # First column + assert username.text.strip() == _user.Username + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"O": 101}) # Last page. + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 50 + + root = parse_root(response.text) + page_prev = root.xpath('//button[contains(@class, "page-prev")]')[0] + page_next = root.xpath('//button[contains(@class, "page-next")]')[0] + + assert "disabled" not in page_prev.attrib + assert page_next.attrib["disabled"] == "disabled" + + def test_get_terms_of_service(): term = create(Term, Description="Test term.", URL="http://localhost", Revision=1) diff --git a/web/html/css/aurweb.css b/web/html/css/aurweb.css index 2748462f..b36dbd4d 100644 --- a/web/html/css/aurweb.css +++ b/web/html/css/aurweb.css @@ -209,6 +209,16 @@ label.confirmation, margin: .33em 0 1em; } -button[type="submit"] { +button[type="submit"], +button[type="reset"] { padding: 0 0.6em; } + +.results tr td[align="left"] fieldset { + text-align: left; +} + +.results tr td[align="right"] fieldset { + text-align: right; +} +