diff --git a/aurweb/models/user.py b/aurweb/models/user.py index 1762f004..4705c050 100644 --- a/aurweb/models/user.py +++ b/aurweb/models/user.py @@ -168,6 +168,15 @@ class User(Base): aurweb.models.account_type.TRUSTED_USER_AND_DEV_ID } + def can_edit_user(self, user): + """ Can this account record edit the target user? It must either + be the target user or a user with enough permissions to do so. + + :param user: Target user + :return: Boolean indicating whether this instance can edit `user` + """ + return self == user or self.is_trusted_user() or self.is_developer() + def __repr__(self): return "" % ( self.ID, str(self.AccountType), self.Username) diff --git a/aurweb/routers/accounts.py b/aurweb/routers/accounts.py index 5d798ae8..466d129d 100644 --- a/aurweb/routers/accounts.py +++ b/aurweb/routers/accounts.py @@ -12,17 +12,18 @@ from sqlalchemy import and_, func, or_ import aurweb.config from aurweb import db, l10n, time, util -from aurweb.auth import auth_required +from aurweb.auth import account_type_required, auth_required from aurweb.captcha import get_captcha_answer, get_captcha_salts, get_captcha_token from aurweb.l10n import get_translator_for_request from aurweb.models.accepted_term import AcceptedTerm -from aurweb.models.account_type import AccountType +from aurweb.models.account_type import (DEVELOPER, DEVELOPER_ID, TRUSTED_USER, TRUSTED_USER_AND_DEV, TRUSTED_USER_AND_DEV_ID, + TRUSTED_USER_ID, USER_ID, AccountType) from aurweb.models.ban import Ban from aurweb.models.ssh_pub_key import SSHPubKey, get_fingerprint from aurweb.models.term import Term from aurweb.models.user import User from aurweb.scripts.notify import ResetKeyNotification -from aurweb.templates import make_variable_context, render_template +from aurweb.templates import make_context, make_variable_context, render_template router = APIRouter() logger = logging.getLogger(__name__) @@ -591,6 +592,91 @@ async def account(request: Request, username: str): return render_template(request, "account/show.html", context) +@router.get("/accounts/") +@auth_required(True) +@account_type_required({TRUSTED_USER, DEVELOPER, TRUSTED_USER_AND_DEV}) +async def accounts(request: Request): + context = make_context(request, "Accounts") + return render_template(request, "account/search.html", context) + + +@router.post("/accounts/") +@auth_required(True) +@account_type_required({TRUSTED_USER, DEVELOPER, TRUSTED_USER_AND_DEV}) +async def accounts_post(request: Request, + O: int = Form(default=0), # Offset + SB: str = Form(default=str()), # Search By + U: str = Form(default=str()), # Username + T: str = Form(default=str()), # Account Type + S: bool = Form(default=False), # Suspended + E: str = Form(default=str()), # Email + R: str = Form(default=str()), # Real Name + I: str = Form(default=str()), # IRC Nick + K: str = Form(default=str())): # PGP Key + context = await make_variable_context(request, "Accounts") + context["pp"] = pp = 50 # Hits per page. + + offset = max(O, 0) # Minimize offset at 0. + context["offset"] = offset # Offset. + + context["params"] = dict(await request.form()) + if "O" in context["params"]: + context["params"].pop("O") + + # Setup order by criteria based on SB. + order_by_columns = { + "t": (AccountType.ID.asc(), User.Username.asc()), + "r": (User.RealName.asc(), AccountType.ID.asc()), + "i": (User.IRCNick.asc(), AccountType.ID.asc()), + } + default_order = (User.Username.asc(), AccountType.ID.asc()) + order_by = order_by_columns.get(SB, default_order) + + # Convert parameter T to an AccountType ID. + account_types = { + "u": USER_ID, + "t": TRUSTED_USER_ID, + "d": DEVELOPER_ID, + "td": TRUSTED_USER_AND_DEV_ID + } + account_type_id = account_types.get(T, None) + + # Get a query handle to users, populate the total user + # count into a jinja2 context variable. + query = db.query(User).join(AccountType) + context["total_users"] = query.count() + + # Populate this list with any additional statements to + # be ANDed together. + statements = [] + if account_type_id is not None: + statements.append(AccountType.ID == account_type_id) + if U: + statements.append(User.Username.like(f"%{U}%")) + if S: + statements.append(User.Suspended == S) + if E: + statements.append(User.Email.like(f"%{E}%")) + if R: + statements.append(User.RealName.like(f"%{R}%")) + if I: + statements.append(User.IRCNick.like(f"%{I}%")) + if K: + statements.append(User.PGPKey.like(f"%{K}%")) + + # Filter the query by combining all statements added above into + # an AND statement, unless there's just one statement, which + # we pass on to filter() as args. + if statements: + query = query.filter(and_(*statements)) + + # Finally, order and truncate our users for the current page. + users = query.order_by(*order_by).limit(pp).offset(offset) + context["users"] = users + + return render_template(request, "account/index.html", context) + + def render_terms_of_service(request: Request, context: dict, terms: typing.Iterable): diff --git a/templates/account/index.html b/templates/account/index.html new file mode 100644 index 00000000..32b2ca16 --- /dev/null +++ b/templates/account/index.html @@ -0,0 +1,13 @@ +{% extends "partials/layout.html" %} + +{% block pageContent %} +
+

{{ "Accounts" | tr }}

+ + {% if not users %} + {{ "No results matched your search criteria." | tr }} + {% else %} + {% include "partials/account/results.html" %} + {% endif %} +
+{% endblock %} diff --git a/templates/account/search.html b/templates/account/search.html new file mode 100644 index 00000000..3f83d25b --- /dev/null +++ b/templates/account/search.html @@ -0,0 +1,69 @@ +{% extends "partials/layout.html" %} + +{% block pageContent %} +
+

{{ "Accounts" | tr }}

+ {{ "Use this form to search existing accounts." | tr }} +
+
+ +
+{% endblock %} diff --git a/templates/partials/account/results.html b/templates/partials/account/results.html new file mode 100644 index 00000000..910ae637 --- /dev/null +++ b/templates/partials/account/results.html @@ -0,0 +1,82 @@ + + + + + + + + + + + + + + + {% for user in users %} + + + + + + + + + + {% endfor %} + + +
{{ "Username" | tr }}{{ "Type" | tr }}{{ "Status" | tr }}{{ "Real Name" | tr }}{{ "IRC Nick" | tr }}{{ "PGP Key Fingerprint" | tr }}{{ "Edit Account" | tr }}
+ + {{ user.Username }} + + {{ user.AccountType.AccountType }}{{ "Suspended" if user.Suspended else "Active" }}{{ user.RealName | e }}{{ user.IRCNick | e }}{{ user.PGPKey or '' | e }} + {% if request.user.can_edit_user(user) %} + + {{ "Edit" | tr }} + + {% endif %} +
+ + + + + + +
+
+
+ + {% for k, v in params.items() %} + + {% endfor %} + +
+
+
+
+
+ + {% for k, v in params.items() %} + + {% endfor %} + +
+
+
+ diff --git a/test/test_accounts_routes.py b/test/test_accounts_routes.py index dbf13d19..567b3426 100644 --- a/test/test_accounts_routes.py +++ b/test/test_accounts_routes.py @@ -1,3 +1,4 @@ +import logging import re import tempfile @@ -14,7 +15,7 @@ from aurweb import captcha from aurweb.asgi import app from aurweb.db import commit, create, query from aurweb.models.accepted_term import AcceptedTerm -from aurweb.models.account_type import AccountType +from aurweb.models.account_type import DEVELOPER_ID, TRUSTED_USER_AND_DEV_ID, TRUSTED_USER_ID, AccountType from aurweb.models.ban import Ban from aurweb.models.session import Session from aurweb.models.ssh_pub_key import SSHPubKey, get_fingerprint @@ -31,6 +32,8 @@ TEST_EMAIL = "test@example.org" client = TestClient(app) user = None +logger = logging.getLogger(__name__) + def make_ssh_pubkey(): # Create a public key with ssh-keygen (this adds ssh-keygen as a @@ -55,8 +58,8 @@ def setup(): account_type = query(AccountType, AccountType.AccountType == "User").first() user = create(User, Username=TEST_USERNAME, Email=TEST_EMAIL, - RealName="Test User", Passwd="testPassword", - AccountType=account_type) + RealName="Test UserZ", Passwd="testPassword", + IRCNick="testZ", AccountType=account_type) yield user @@ -65,6 +68,14 @@ def setup(): setup_test_db("Terms", "AcceptedTerms") +@pytest.fixture +def tu_user(): + user.AccountType = query(AccountType, + AccountType.ID == TRUSTED_USER_AND_DEV_ID).first() + commit() + yield user + + def test_get_passreset_authed_redirects(): sid = user.login(Request(), "testPassword") assert sid is not None @@ -929,6 +940,479 @@ def test_get_account_unauthenticated(): assert "You must log in to view user information." in content +def test_get_accounts(tu_user): + """ Test that we can GET request /accounts/ and receive + a form which can be used to POST /accounts/. """ + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.get("/accounts/", cookies=cookies) + assert response.status_code == int(HTTPStatus.OK) + + parser = lxml.etree.HTMLParser() + root = lxml.etree.fromstring(response.text, parser=parser) + + # Get the form. + form = root.xpath('//form[contains(@class, "account-search-form")]') + + # Make sure there's only one form and it goes where it should. + assert len(form) == 1 + form = next(iter(form)) + assert form.attrib.get("method") == "post" + assert form.attrib.get("action") == "/accounts/" + + def field(element): + """ Return the given element string as a valid + selector in the form. """ + return f"./fieldset/p/{element}" + + username = form.xpath(field('input[@id="id_username"]')) + assert bool(username) + + account_type = form.xpath(field('select[@id="id_type"]')) + assert bool(account_type) + + suspended = form.xpath(field('input[@id="id_suspended"]')) + assert bool(suspended) + + email = form.xpath(field('input[@id="id_email"]')) + assert bool(email) + + realname = form.xpath(field('input[@id="id_realname"]')) + assert bool(realname) + + irc = form.xpath(field('input[@id="id_irc"]')) + assert bool(irc) + + sortby = form.xpath(field('select[@id="id_sortby"]')) + assert bool(sortby) + + +def parse_root(html): + parser = lxml.etree.HTMLParser() + return lxml.etree.fromstring(html, parser=parser) + + +def get_rows(html): + root = parse_root(html) + return root.xpath('//table[contains(@class, "users")]/tbody/tr') + + +def test_post_accounts(tu_user): + # Set a PGPKey. + user.PGPKey = "5F18B20346188419750745D7335F2CB41F253D30" + + # Create a few more users. + users = [user] + for i in range(10): + _user = create(User, Username=f"test_{i}", + Email=f"test_{i}@example.org", + RealName=f"Test #{i}", + Passwd="testPassword", + IRCNick=f"test_#{i}", + autocommit=False) + users.append(_user) + + # Commit everything to the database. + commit() + + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.post("/accounts/", cookies=cookies) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 11 + + # Simulate default ascending ORDER_BY. + sorted_users = sorted(users, key=lambda u: u.Username) + for i, _user in enumerate(sorted_users): + columns = rows[i].xpath("./td") + assert len(columns) == 7 + + username, atype, suspended, real_name, \ + irc_nick, pgp_key, edit = columns + + username = next(iter(username.xpath("./a"))) + assert username.text.strip() == _user.Username + + assert atype.text.strip() == str(_user.AccountType) + assert suspended.text.strip() == "Active" + assert real_name.text.strip() == _user.RealName + assert irc_nick.text == _user.IRCNick + assert pgp_key.text == (_user.PGPKey or None) + + edit = edit.xpath("./a") + if user.can_edit_user(_user): + edit = next(iter(edit)) + assert edit.text.strip() == "Edit" + else: + assert not edit + + logger.debug('Checked user row {"id": %s, "username": "%s"}.' + % (_user.ID, _user.Username)) + + +def test_post_accounts_username(tu_user): + # Test the U parameter path. + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"U": user.Username}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + + username = next(iter(username.xpath("./a"))) + assert username.text.strip() == user.Username + + +def test_post_accounts_account_type(tu_user): + # Check the different account type options. + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + # Make a user with the "User" role here so we can + # test the `u` parameter. + account_type = query(AccountType, + AccountType.AccountType == "User").first() + create(User, Username="test_2", + Email="test_2@example.org", + RealName="Test User 2", + Passwd="testPassword", + AccountType=account_type) + + # Expect no entries; we marked our only user as a User type. + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"T": "t"}) + assert response.status_code == int(HTTPStatus.OK) + assert len(get_rows(response.text)) == 0 + + # So, let's also ensure that specifying "u" returns our user. + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"T": "u"}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + + assert type.text.strip() == "User" + + # Set our only user to a Trusted User. + user.AccountType = query(AccountType, + AccountType.ID == TRUSTED_USER_ID).first() + commit() + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"T": "t"}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + + assert type.text.strip() == "Trusted User" + + user.AccountType = query(AccountType, + AccountType.ID == DEVELOPER_ID).first() + commit() + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"T": "d"}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + + assert type.text.strip() == "Developer" + + user.AccountType = query(AccountType, + AccountType.ID == TRUSTED_USER_AND_DEV_ID + ).first() + commit() + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"T": "td"}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + + assert type.text.strip() == "Trusted User & Developer" + + +def test_post_accounts_status(tu_user): + # Test the functionality of Suspended. + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.post("/accounts/", cookies=cookies) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + assert status.text.strip() == "Active" + + user.Suspended = True + commit() + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"S": True}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + row = next(iter(rows)) + username, type, status, realname, irc, pgp_key, edit = row + assert status.text.strip() == "Suspended" + + +def test_post_accounts_email(tu_user): + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + # Search via email. + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"E": user.Email}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + +def test_post_accounts_realname(tu_user): + # Test the R parameter path. + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"R": user.RealName}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + +def test_post_accounts_irc(tu_user): + # Test the I parameter path. + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"I": user.IRCNick}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + +def test_post_accounts_sortby(tu_user): + # Create a second user so we can compare sorts. + account_type = query(AccountType, + AccountType.ID == DEVELOPER_ID).first() + create(User, Username="test2", + Email="test2@example.org", + RealName="Test User 2", + Passwd="testPassword", + IRCNick="test2", + AccountType=account_type) + + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + # Show that "u" is the default search order, by username. + with client as request: + response = request.post("/accounts/", cookies=cookies) + assert response.status_code == int(HTTPStatus.OK) + rows = get_rows(response.text) + assert len(rows) == 2 + first_rows = rows + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"SB": "u"}) + assert response.status_code == int(HTTPStatus.OK) + rows = get_rows(response.text) + assert len(rows) == 2 + + def compare_text_values(column, lhs, rhs): + return [row[column].text.strip() for row in lhs] \ + == [row[column].text.strip() for row in rhs] + + # Test the username rows are ordered the same. + assert compare_text_values(0, first_rows, rows) is True + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"SB": "i"}) + assert response.status_code == int(HTTPStatus.OK) + rows = get_rows(response.text) + assert len(rows) == 2 + + # Test the rows are reversed when ordering by IRCNick. + assert compare_text_values(4, first_rows, reversed(rows)) is True + + # Sort by "i" -> RealName. + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"SB": "r"}) + assert response.status_code == int(HTTPStatus.OK) + rows = get_rows(response.text) + assert len(rows) == 2 + + # Test the rows are reversed when ordering by RealName. + assert compare_text_values(4, first_rows, reversed(rows)) is True + + user.AccountType = query(AccountType, + AccountType.ID == TRUSTED_USER_AND_DEV_ID).first() + commit() + + # Fetch first_rows again with our new AccountType ordering. + with client as request: + response = request.post("/accounts/", cookies=cookies) + assert response.status_code == int(HTTPStatus.OK) + rows = get_rows(response.text) + assert len(rows) == 2 + first_rows = rows + + # Sort by "t" -> AccountType. + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"SB": "t"}) + assert response.status_code == int(HTTPStatus.OK) + rows = get_rows(response.text) + assert len(rows) == 2 + + # Test that rows again got reversed. + assert compare_text_values(1, first_rows, reversed(rows)) + + +def test_post_accounts_pgp_key(tu_user): + user.PGPKey = "5F18B20346188419750745D7335F2CB41F253D30" + commit() + + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + # Search via PGPKey. + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"K": user.PGPKey}) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 1 + + +def test_post_accounts_paged(tu_user): + # Create 150 users. + users = [user] + account_type = query(AccountType, + AccountType.AccountType == "User").first() + for i in range(150): + _user = create(User, Username=f"test_#{i}", + Email=f"test_#{i}@example.org", + RealName=f"Test User #{i}", + Passwd="testPassword", + AccountType=account_type, + autocommit=False) + users.append(_user) + commit() + + sid = user.login(Request(), "testPassword") + cookies = {"AURSID": sid} + + with client as request: + response = request.post("/accounts/", cookies=cookies) + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 50 # `pp`, or hits per page is defined at 50. + + # Sort users in ascending default sort by order. + sorted_users = sorted(users, key=lambda u: u.Username) + + # Get the first fifty sorted users and assert that's what + # we got in the first search result page. + first_fifty = sorted_users[:50] + + for i, _user in enumerate(first_fifty): + row = rows[i] + username = row[0].xpath("./a")[0] # First column + assert username.text.strip() == _user.Username + + root = parse_root(response.text) + page_prev = root.xpath('//button[contains(@class, "page-prev")]')[0] + page_next = root.xpath('//button[contains(@class, "page-next")]')[0] + + assert page_prev.attrib["disabled"] == "disabled" + assert "disabled" not in page_next.attrib + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"O": 50}) # +50 offset. + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 50 + + second_fifty = sorted_users[50:100] + + for i, _user in enumerate(second_fifty): + row = rows[i] + username = row[0].xpath("./a")[0] # First column + assert username.text.strip() == _user.Username + + with client as request: + response = request.post("/accounts/", cookies=cookies, + data={"O": 101}) # Last page. + assert response.status_code == int(HTTPStatus.OK) + + rows = get_rows(response.text) + assert len(rows) == 50 + + root = parse_root(response.text) + page_prev = root.xpath('//button[contains(@class, "page-prev")]')[0] + page_next = root.xpath('//button[contains(@class, "page-next")]')[0] + + assert "disabled" not in page_prev.attrib + assert page_next.attrib["disabled"] == "disabled" + + def test_get_terms_of_service(): term = create(Term, Description="Test term.", URL="http://localhost", Revision=1) diff --git a/web/html/css/aurweb.css b/web/html/css/aurweb.css index 2748462f..b36dbd4d 100644 --- a/web/html/css/aurweb.css +++ b/web/html/css/aurweb.css @@ -209,6 +209,16 @@ label.confirmation, margin: .33em 0 1em; } -button[type="submit"] { +button[type="submit"], +button[type="reset"] { padding: 0 0.6em; } + +.results tr td[align="left"] fieldset { + text-align: left; +} + +.results tr td[align="right"] fieldset { + text-align: right; +} +