aurweb/test/test_accounts_routes.py
Kevin Morris 691b7b9091
feat(fastapi): add comment actions to /account/{username}/comments
With this change, we've decoupled some partials shared between
`/pkgbase/{name}` and `/account/{username}/comments`. The comment
actions template now resolves its package base via the `comment`
instance instead of requiring `pkgbase`.

We've also modified the existing package comment routes to
support execution from any location using the `next` parameter.
This allows us to reuse code from package comments for
account comments actions.

Moved the majority of comment editing javascript to its own
.js file.

Signed-off-by: Kevin Morris <kevr@0cost.org>
2021-10-29 17:18:49 -07:00

1634 lines
51 KiB
Python

import re
import tempfile
from datetime import datetime
from http import HTTPStatus
from subprocess import Popen
import lxml.html
import pytest
from fastapi.testclient import TestClient
from aurweb import captcha, db, logging
from aurweb.asgi import app
from aurweb.db import create, query
from aurweb.models.accepted_term import AcceptedTerm
from aurweb.models.account_type import DEVELOPER_ID, TRUSTED_USER_AND_DEV_ID, TRUSTED_USER_ID, USER_ID, AccountType
from aurweb.models.ban import Ban
from aurweb.models.session import Session
from aurweb.models.ssh_pub_key import SSHPubKey, get_fingerprint
from aurweb.models.term import Term
from aurweb.models.user import User
from aurweb.testing import setup_test_db
from aurweb.testing.html import get_errors
from aurweb.testing.requests import Request
# Some test global constants.
TEST_USERNAME = "test"
TEST_EMAIL = "test@example.org"
# Global mutables.
client = TestClient(app)
user = None
logger = logging.get_logger(__name__)
def make_ssh_pubkey():
# Create a public key with ssh-keygen (this adds ssh-keygen as a
# dependency to passing this test).
with tempfile.TemporaryDirectory() as tmpdir:
with open("/dev/null", "w") as null:
proc = Popen(["ssh-keygen", "-f", f"{tmpdir}/test.ssh", "-N", ""],
stdout=null, stderr=null)
proc.wait()
assert proc.returncode == 0
# Read in the public key, then delete the temp dir we made.
return open(f"{tmpdir}/test.ssh.pub").read().rstrip()
@pytest.fixture(autouse=True)
def setup():
global user
setup_test_db("Users", "Sessions", "Bans", "Terms", "AcceptedTerms")
account_type = query(AccountType,
AccountType.AccountType == "User").first()
with db.begin():
user = create(User, Username=TEST_USERNAME, Email=TEST_EMAIL,
RealName="Test UserZ", Passwd="testPassword",
IRCNick="testZ", AccountType=account_type)
yield user
# Remove term records so other tests don't get them
# and falsely redirect.
setup_test_db("Terms", "AcceptedTerms")
@pytest.fixture
def tu_user():
with db.begin():
user.AccountType = query(AccountType).filter(
AccountType.ID == TRUSTED_USER_AND_DEV_ID
).first()
yield user
def test_get_passreset_authed_redirects():
sid = user.login(Request(), "testPassword")
assert sid is not None
with client as request:
response = request.get("/passreset", cookies={"AURSID": sid},
allow_redirects=False)
assert response.status_code == int(HTTPStatus.SEE_OTHER)
assert response.headers.get("location") == "/"
def test_get_passreset():
with client as request:
response = request.get("/passreset")
assert response.status_code == int(HTTPStatus.OK)
def test_get_passreset_translation():
# Test that translation works; set it to de.
with client as request:
response = request.get("/passreset", cookies={"AURLANG": "de"})
# The header title should be translated.
assert "Passwort zurücksetzen".encode("utf-8") in response.content
# The form input label should be translated.
assert "Benutzername oder primäre E-Mail-Adresse eingeben:".encode(
"utf-8") in response.content
# And the button.
assert "Weiter".encode("utf-8") in response.content
# Restore english.
with client as request:
response = request.get("/passreset", cookies={"AURLANG": "en"})
def test_get_passreset_with_resetkey():
with client as request:
response = request.get("/passreset", data={"resetkey": "abcd"})
assert response.status_code == int(HTTPStatus.OK)
def test_post_passreset_authed_redirects():
sid = user.login(Request(), "testPassword")
assert sid is not None
with client as request:
response = request.post("/passreset",
cookies={"AURSID": sid},
data={"user": "blah"},
allow_redirects=False)
assert response.status_code == int(HTTPStatus.SEE_OTHER)
assert response.headers.get("location") == "/"
def test_post_passreset_user():
# With username.
with client as request:
response = request.post("/passreset", data={"user": TEST_USERNAME})
assert response.status_code == int(HTTPStatus.SEE_OTHER)
assert response.headers.get("location") == "/passreset?step=confirm"
# With e-mail.
with client as request:
response = request.post("/passreset", data={"user": TEST_EMAIL})
assert response.status_code == int(HTTPStatus.SEE_OTHER)
assert response.headers.get("location") == "/passreset?step=confirm"
def test_post_passreset_resetkey():
with db.begin():
user.session = Session(UsersID=user.ID, SessionID="blah",
LastUpdateTS=datetime.utcnow().timestamp())
# Prepare a password reset.
with client as request:
response = request.post("/passreset", data={"user": TEST_USERNAME})
assert response.status_code == int(HTTPStatus.SEE_OTHER)
assert response.headers.get("location") == "/passreset?step=confirm"
# Now that we've prepared the password reset, prepare a POST
# request with the user's ResetKey.
resetkey = user.ResetKey
post_data = {
"user": TEST_USERNAME,
"resetkey": resetkey,
"password": "abcd1234",
"confirm": "abcd1234"
}
with client as request:
response = request.post("/passreset", data=post_data)
assert response.status_code == int(HTTPStatus.SEE_OTHER)
assert response.headers.get("location") == "/passreset?step=complete"
def test_post_passreset_error_invalid_email():
# First, test with a user that doesn't even exist.
with client as request:
response = request.post("/passreset", data={"user": "invalid"})
assert response.status_code == int(HTTPStatus.NOT_FOUND)
error = "Invalid e-mail."
assert error in response.content.decode("utf-8")
# Then, test with an invalid resetkey for a real user.
_ = make_resetkey()
post_data = make_passreset_data("fake")
post_data["password"] = "abcd1234"
post_data["confirm"] = "abcd1234"
with client as request:
response = request.post("/passreset", data=post_data)
assert response.status_code == int(HTTPStatus.NOT_FOUND)
assert error in response.content.decode("utf-8")
def make_resetkey():
with client as request:
response = request.post("/passreset", data={"user": TEST_USERNAME})
assert response.status_code == int(HTTPStatus.SEE_OTHER)
assert response.headers.get("location") == "/passreset?step=confirm"
return user.ResetKey
def make_passreset_data(resetkey):
return {
"user": user.Username,
"resetkey": resetkey
}
def test_post_passreset_error_missing_field():
# Now that we've prepared the password reset, prepare a POST
# request with the user's ResetKey.
resetkey = make_resetkey()
post_data = make_passreset_data(resetkey)
with client as request:
response = request.post("/passreset", data=post_data)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
error = "Missing a required field."
assert error in response.content.decode("utf-8")
def test_post_passreset_error_password_mismatch():
resetkey = make_resetkey()
post_data = make_passreset_data(resetkey)
post_data["password"] = "abcd1234"
post_data["confirm"] = "mismatched"
with client as request:
response = request.post("/passreset", data=post_data)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
error = "Password fields do not match."
assert error in response.content.decode("utf-8")
def test_post_passreset_error_password_requirements():
resetkey = make_resetkey()
post_data = make_passreset_data(resetkey)
passwd_min_len = User.minimum_passwd_length()
assert passwd_min_len >= 4
post_data["password"] = "x"
post_data["confirm"] = "x"
with client as request:
response = request.post("/passreset", data=post_data)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
error = f"Your password must be at least {passwd_min_len} characters."
assert error in response.content.decode("utf-8")
def test_get_register():
with client as request:
response = request.get("/register")
assert response.status_code == int(HTTPStatus.OK)
def post_register(request, **kwargs):
""" A simple helper that allows overrides to test defaults. """
salt = captcha.get_captcha_salts()[0]
token = captcha.get_captcha_token(salt)
answer = captcha.get_captcha_answer(token)
data = {
"U": "newUser",
"E": "newUser@email.org",
"P": "newUserPassword",
"C": "newUserPassword",
"L": "en",
"TZ": "UTC",
"captcha": answer,
"captcha_salt": salt
}
# For any kwargs given, override their k:v pairs in data.
args = dict(kwargs)
for k, v in args.items():
data[k] = v
return request.post("/register", data=data, allow_redirects=False)
def test_post_register():
with client as request:
response = post_register(request)
assert response.status_code == int(HTTPStatus.OK)
expected = "The account, <strong>'newUser'</strong>, "
expected += "has been successfully created."
assert expected in response.content.decode()
def test_post_register_rejects_case_insensitive_spoof():
with client as request:
response = post_register(request, U="newUser", E="newUser@example.org")
assert response.status_code == int(HTTPStatus.OK)
with client as request:
response = post_register(request, U="NEWUSER", E="BLAH@GMAIL.COM")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
expected = "The username, <strong>NEWUSER</strong>, is already in use."
assert expected in response.content.decode()
with client as request:
response = post_register(request, U="BLAH", E="NEWUSER@EXAMPLE.ORG")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
expected = "The address, <strong>NEWUSER@EXAMPLE.ORG</strong>, "
expected += "is already in use."
assert expected in response.content.decode()
def test_post_register_error_expired_captcha():
with client as request:
response = post_register(request, captcha_salt="invalid-salt")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "This CAPTCHA has expired. Please try again." in content
def test_post_register_error_missing_captcha():
with client as request:
response = post_register(request, captcha=None)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "The CAPTCHA is missing." in content
def test_post_register_error_invalid_captcha():
with client as request:
response = post_register(request, captcha="invalid blah blah")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "The entered CAPTCHA answer is invalid." in content
def test_post_register_error_ip_banned():
# 'testclient' is used as request.client.host via FastAPI TestClient.
with db.begin():
create(Ban, IPAddress="testclient", BanTS=datetime.utcnow())
with client as request:
response = post_register(request)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert ("Account registration has been disabled for your IP address, " +
"probably due to sustained spam attacks. Sorry for the " +
"inconvenience.") in content
def test_post_register_error_missing_username():
with client as request:
response = post_register(request, U="")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "Missing a required field." in content
def test_post_register_error_missing_email():
with client as request:
response = post_register(request, E="")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "Missing a required field." in content
def test_post_register_error_invalid_username():
with client as request:
# Our test config requires at least three characters for a
# valid username, so test against two characters: 'ba'.
response = post_register(request, U="ba")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "The username is invalid." in content
def test_post_register_invalid_password():
with client as request:
response = post_register(request, P="abc", C="abc")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
expected = r"Your password must be at least \d+ characters."
assert re.search(expected, content)
def test_post_register_error_missing_confirm():
with client as request:
response = post_register(request, C=None)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "Please confirm your new password." in content
def test_post_register_error_mismatched_confirm():
with client as request:
response = post_register(request, C="mismatched")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "Password fields do not match." in content
def test_post_register_error_invalid_email():
with client as request:
response = post_register(request, E="bad@email")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "The email address is invalid." in content
def test_post_register_error_undeliverable_email():
with client as request:
# At the time of writing, webchat.freenode.net does not contain
# mx records; if it ever does, it'll break this test.
response = post_register(request, E="email@bad.c")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "The email address is invalid." in content
def test_post_register_invalid_backup_email():
with client as request:
response = post_register(request, BE="bad@email")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "The backup email address is invalid." in content
def test_post_register_error_invalid_homepage():
with client as request:
response = post_register(request, HP="bad")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
expected = "The home page is invalid, please specify the full HTTP(s) URL."
assert expected in content
def test_post_register_error_invalid_pgp_fingerprints():
with client as request:
response = post_register(request, K="bad")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
expected = "The PGP key fingerprint is invalid."
assert expected in content
pk = 'z' + ('a' * 39)
with client as request:
response = post_register(request, K=pk)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
expected = "The PGP key fingerprint is invalid."
assert expected in content
def test_post_register_error_invalid_ssh_pubkeys():
with client as request:
response = post_register(request, PK="bad")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "The SSH public key is invalid." in content
with client as request:
response = post_register(request, PK="ssh-rsa ")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "The SSH public key is invalid." in content
def test_post_register_error_unsupported_language():
with client as request:
response = post_register(request, L="bad")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
expected = "Language is not currently supported."
assert expected in content
def test_post_register_error_unsupported_timezone():
with client as request:
response = post_register(request, TZ="ABCDEFGH")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
expected = "Timezone is not currently supported."
assert expected in content
def test_post_register_error_username_taken():
with client as request:
response = post_register(request, U="test")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
expected = r"The username, .*, is already in use."
assert re.search(expected, content)
def test_post_register_error_email_taken():
with client as request:
response = post_register(request, E="test@example.org")
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
expected = r"The address, .*, is already in use."
assert re.search(expected, content)
def test_post_register_error_ssh_pubkey_taken():
pk = str()
# Create a public key with ssh-keygen (this adds ssh-keygen as a
# dependency to passing this test).
with tempfile.TemporaryDirectory() as tmpdir:
with open("/dev/null", "w") as null:
proc = Popen(["ssh-keygen", "-f", f"{tmpdir}/test.ssh", "-N", ""],
stdout=null, stderr=null)
proc.wait()
assert proc.returncode == 0
# Read in the public key, then delete the temp dir we made.
pk = open(f"{tmpdir}/test.ssh.pub").read().rstrip()
# Take the sha256 fingerprint of the ssh public key, create it.
fp = get_fingerprint(pk)
with db.begin():
create(SSHPubKey, UserID=user.ID, PubKey=pk, Fingerprint=fp)
with client as request:
response = post_register(request, PK=pk)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
expected = r"The SSH public key, .*, is already in use."
assert re.search(expected, content)
def test_post_register_with_ssh_pubkey():
pk = str()
# Create a public key with ssh-keygen (this adds ssh-keygen as a
# dependency to passing this test).
with tempfile.TemporaryDirectory() as tmpdir:
with open("/dev/null", "w") as null:
proc = Popen(["ssh-keygen", "-f", f"{tmpdir}/test.ssh", "-N", ""],
stdout=null, stderr=null)
proc.wait()
assert proc.returncode == 0
# Read in the public key, then delete the temp dir we made.
pk = open(f"{tmpdir}/test.ssh.pub").read().rstrip()
with client as request:
response = post_register(request, PK=pk)
assert response.status_code == int(HTTPStatus.OK)
def test_get_account_edit():
request = Request()
sid = user.login(request, "testPassword")
with client as request:
response = request.get("/account/test/edit", cookies={
"AURSID": sid
}, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
def test_get_account_edit_unauthorized():
request = Request()
sid = user.login(request, "testPassword")
create(User, Username="test2", Email="test2@example.org",
Passwd="testPassword")
with client as request:
# Try to edit `test2` while authenticated as `test`.
response = request.get("/account/test2/edit", cookies={
"AURSID": sid
}, allow_redirects=False)
assert response.status_code == int(HTTPStatus.UNAUTHORIZED)
def test_post_account_edit():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test666@example.org",
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
expected = "The account, <strong>test</strong>, "
expected += "has been successfully modified."
assert expected in response.content.decode()
def test_post_account_edit_dev():
# Modify our user to be a "Trusted User & Developer"
name = "Trusted User & Developer"
tu_or_dev = query(AccountType, AccountType.AccountType == name).first()
with db.begin():
user.AccountType = tu_or_dev
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test666@example.org",
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
expected = "The account, <strong>test</strong>, "
expected += "has been successfully modified."
assert expected in response.content.decode()
def test_post_account_edit_language():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"L": "de", # German
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
# Parse the response content html into an lxml root, then make
# sure we see a 'de' option selected on the page.
content = response.content.decode()
root = lxml.html.fromstring(content)
lang_nodes = root.xpath('//option[@value="de"]/@selected')
assert lang_nodes and len(lang_nodes) != 0
assert lang_nodes[0] == "selected"
def test_post_account_edit_timezone():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"TZ": "CET",
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
def test_post_account_edit_error_missing_password():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"TZ": "CET",
"passwd": ""
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "Invalid password." in content
def test_post_account_edit_error_invalid_password():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"TZ": "CET",
"passwd": "invalid"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "Invalid password." in content
def test_post_account_edit_error_unauthorized():
request = Request()
sid = user.login(request, "testPassword")
create(User, Username="test2",
Email="test2@example.org", Passwd="testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"TZ": "CET",
"passwd": "testPassword"
}
with client as request:
# Attempt to edit 'test2' while logged in as 'test'.
response = request.post("/account/test2/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.UNAUTHORIZED)
def test_post_account_edit_ssh_pub_key():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"PK": make_ssh_pubkey(),
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
# Now let's update what's already there to gain coverage over that path.
post_data["PK"] = make_ssh_pubkey()
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
def test_post_account_edit_missing_ssh_pubkey():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": user.Username,
"E": user.Email,
"PK": make_ssh_pubkey(),
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
post_data = {
"U": user.Username,
"E": user.Email,
"PK": str(), # Pass an empty string now to walk the delete path.
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
def test_post_account_edit_invalid_ssh_pubkey():
pubkey = "ssh-rsa fake key"
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"P": "newPassword",
"C": "newPassword",
"PK": pubkey,
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
def test_post_account_edit_password():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"P": "newPassword",
"C": "newPassword",
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
assert user.valid_password("newPassword")
def test_post_account_edit_account_types():
request = Request()
sid = user.login(request, "testPassword")
cookies = {"AURSID": sid}
endpoint = f"/account/{user.Username}/edit"
# As a normal user, we cannot see the "Account Type:" input.
with client as request:
resp = request.get(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
assert "id_type" not in resp.text
# Invalid account types return an error.
post_data = {
"U": user.Username,
"E": user.Email,
"T": 0, # Invalid type ID
"passwd": "testPassword"
}
with client as request:
resp = request.post(endpoint, data=post_data, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "Invalid account type provided."
assert errors[0].text.strip() == expected
# Nor can we change any account types.
post_data = {
"U": user.Username,
"E": user.Email,
"T": TRUSTED_USER_ID,
"passwd": "testPassword"
}
with client as request:
resp = request.post(endpoint, data=post_data, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "You do not have permission to change account types."
assert errors[0].text.strip() == expected
# Change user from USER_ID to TRUSTED_USER_ID.
with db.begin():
user.AccountTypeID = TRUSTED_USER_ID
# As a trusted user, we can see the "Account Type:" input.
with client as request:
resp = request.get(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
assert "id_type" in resp.text
# As a trusted user, we cannot change account type to DEVELOPER_ID.
post_data = {
"U": user.Username,
"E": user.Email,
"T": DEVELOPER_ID,
"passwd": "testPassword"
}
with client as request:
resp = request.post(endpoint, data=post_data, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = ("You do not have permission to change "
"this user's account type to Developer.")
assert errors[0].text.strip() == expected
# However, we can modify our account type to USER_ID.
post_data = {
"U": user.Username,
"E": user.Email,
"T": USER_ID,
"passwd": "testPassword"
}
with client as request:
resp = request.post(endpoint, data=post_data, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
# No errors should be displayed.
errors = get_errors(resp.text)
assert not errors
# Make sure it got changed to USER_ID as we intended.
assert user.AccountTypeID == USER_ID
# Change user to a Developer.
with db.begin():
user.AccountTypeID = DEVELOPER_ID
# As a developer, we can absolutely change all account types.
# For example, from DEVELOPER_ID to TRUSTED_USER_AND_DEV_ID:
post_data = {
"U": user.Username,
"E": user.Email,
"T": TRUSTED_USER_AND_DEV_ID,
"passwd": "testPassword"
}
with client as request:
resp = request.post(endpoint, data=post_data, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
assert user.AccountTypeID == TRUSTED_USER_AND_DEV_ID
def test_get_account():
request = Request()
sid = user.login(request, "testPassword")
with client as request:
response = request.get("/account/test", cookies={"AURSID": sid},
allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
def test_get_account_not_found():
request = Request()
sid = user.login(request, "testPassword")
with client as request:
response = request.get("/account/not_found", cookies={"AURSID": sid},
allow_redirects=False)
assert response.status_code == int(HTTPStatus.NOT_FOUND)
def test_get_account_unauthenticated():
with client as request:
response = request.get("/account/test", allow_redirects=False)
assert response.status_code == int(HTTPStatus.UNAUTHORIZED)
content = response.content.decode()
assert "You must log in to view user information." in content
def test_get_accounts(tu_user):
""" Test that we can GET request /accounts and receive
a form which can be used to POST /accounts. """
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.get("/accounts", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
parser = lxml.etree.HTMLParser()
root = lxml.etree.fromstring(response.text, parser=parser)
# Get the form.
form = root.xpath('//form[contains(@class, "account-search-form")]')
# Make sure there's only one form and it goes where it should.
assert len(form) == 1
form = next(iter(form))
assert form.attrib.get("method") == "post"
assert form.attrib.get("action") == "/accounts"
def field(element):
""" Return the given element string as a valid
selector in the form. """
return f"./fieldset/p/{element}"
username = form.xpath(field('input[@id="id_username"]'))
assert bool(username)
account_type = form.xpath(field('select[@id="id_type"]'))
assert bool(account_type)
suspended = form.xpath(field('input[@id="id_suspended"]'))
assert bool(suspended)
email = form.xpath(field('input[@id="id_email"]'))
assert bool(email)
realname = form.xpath(field('input[@id="id_realname"]'))
assert bool(realname)
irc = form.xpath(field('input[@id="id_irc"]'))
assert bool(irc)
sortby = form.xpath(field('select[@id="id_sortby"]'))
assert bool(sortby)
def parse_root(html):
parser = lxml.etree.HTMLParser()
return lxml.etree.fromstring(html, parser=parser)
def get_rows(html):
root = parse_root(html)
return root.xpath('//table[contains(@class, "users")]/tbody/tr')
def test_post_accounts(tu_user):
# Set a PGPKey.
with db.begin():
user.PGPKey = "5F18B20346188419750745D7335F2CB41F253D30"
# Create a few more users.
users = [user]
with db.begin():
for i in range(10):
_user = create(User, Username=f"test_{i}",
Email=f"test_{i}@example.org",
RealName=f"Test #{i}",
Passwd="testPassword",
IRCNick=f"test_#{i}")
users.append(_user)
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.post("/accounts", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 11
# Simulate default ascending ORDER_BY.
sorted_users = sorted(users, key=lambda u: u.Username)
for i, _user in enumerate(sorted_users):
columns = rows[i].xpath("./td")
assert len(columns) == 7
username, atype, suspended, real_name, \
irc_nick, pgp_key, edit = columns
username = next(iter(username.xpath("./a")))
assert username.text.strip() == _user.Username
assert atype.text.strip() == str(_user.AccountType)
assert suspended.text.strip() == "Active"
assert real_name.text.strip() == _user.RealName
assert irc_nick.text == _user.IRCNick
assert pgp_key.text == (_user.PGPKey or None)
edit = edit.xpath("./a")
if user.can_edit_user(_user):
edit = next(iter(edit))
assert edit.text.strip() == "Edit"
else:
assert not edit
logger.debug('Checked user row {"id": %s, "username": "%s"}.'
% (_user.ID, _user.Username))
def test_post_accounts_username(tu_user):
# Test the U parameter path.
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"U": user.Username})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
username = next(iter(username.xpath("./a")))
assert username.text.strip() == user.Username
def test_post_accounts_account_type(tu_user):
# Check the different account type options.
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
# Make a user with the "User" role here so we can
# test the `u` parameter.
account_type = query(AccountType,
AccountType.AccountType == "User").first()
with db.begin():
create(User, Username="test_2",
Email="test_2@example.org",
RealName="Test User 2",
Passwd="testPassword",
AccountType=account_type)
# Expect no entries; we marked our only user as a User type.
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"T": "t"})
assert response.status_code == int(HTTPStatus.OK)
assert len(get_rows(response.text)) == 0
# So, let's also ensure that specifying "u" returns our user.
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"T": "u"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
assert type.text.strip() == "User"
# Set our only user to a Trusted User.
with db.begin():
user.AccountType = query(AccountType).filter(
AccountType.ID == TRUSTED_USER_ID
).first()
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"T": "t"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
assert type.text.strip() == "Trusted User"
with db.begin():
user.AccountType = query(AccountType).filter(
AccountType.ID == DEVELOPER_ID
).first()
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"T": "d"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
assert type.text.strip() == "Developer"
with db.begin():
user.AccountType = query(AccountType).filter(
AccountType.ID == TRUSTED_USER_AND_DEV_ID
).first()
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"T": "td"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
assert type.text.strip() == "Trusted User & Developer"
def test_post_accounts_status(tu_user):
# Test the functionality of Suspended.
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.post("/accounts", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
assert status.text.strip() == "Active"
with db.begin():
user.Suspended = True
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"S": True})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
assert status.text.strip() == "Suspended"
def test_post_accounts_email(tu_user):
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
# Search via email.
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"E": user.Email})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
def test_post_accounts_realname(tu_user):
# Test the R parameter path.
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"R": user.RealName})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
def test_post_accounts_irc(tu_user):
# Test the I parameter path.
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"I": user.IRCNick})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
def test_post_accounts_sortby(tu_user):
# Create a second user so we can compare sorts.
account_type = query(AccountType,
AccountType.ID == DEVELOPER_ID).first()
with db.begin():
create(User, Username="test2",
Email="test2@example.org",
RealName="Test User 2",
Passwd="testPassword",
IRCNick="test2",
AccountType=account_type)
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
# Show that "u" is the default search order, by username.
with client as request:
response = request.post("/accounts", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 2
first_rows = rows
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"SB": "u"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 2
def compare_text_values(column, lhs, rhs):
return [row[column].text.strip() for row in lhs] \
== [row[column].text.strip() for row in rhs]
# Test the username rows are ordered the same.
assert compare_text_values(0, first_rows, rows) is True
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"SB": "i"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 2
# Test the rows are reversed when ordering by IRCNick.
assert compare_text_values(4, first_rows, reversed(rows)) is True
# Sort by "i" -> RealName.
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"SB": "r"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 2
# Test the rows are reversed when ordering by RealName.
assert compare_text_values(4, first_rows, reversed(rows)) is True
with db.begin():
user.AccountType = query(AccountType).filter(
AccountType.ID == TRUSTED_USER_AND_DEV_ID
).first()
# Fetch first_rows again with our new AccountType ordering.
with client as request:
response = request.post("/accounts", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 2
first_rows = rows
# Sort by "t" -> AccountType.
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"SB": "t"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 2
# Test that rows again got reversed.
assert compare_text_values(1, first_rows, reversed(rows))
def test_post_accounts_pgp_key(tu_user):
with db.begin():
user.PGPKey = "5F18B20346188419750745D7335F2CB41F253D30"
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
# Search via PGPKey.
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"K": user.PGPKey})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
def test_post_accounts_paged(tu_user):
# Create 150 users.
users = [user]
account_type = query(AccountType,
AccountType.AccountType == "User").first()
with db.begin():
for i in range(150):
_user = create(User, Username=f"test_#{i}",
Email=f"test_#{i}@example.org",
RealName=f"Test User #{i}",
Passwd="testPassword",
AccountType=account_type)
users.append(_user)
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.post("/accounts", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 50 # `pp`, or hits per page is defined at 50.
# Sort users in ascending default sort by order.
sorted_users = sorted(users, key=lambda u: u.Username)
# Get the first fifty sorted users and assert that's what
# we got in the first search result page.
first_fifty = sorted_users[:50]
for i, _user in enumerate(first_fifty):
row = rows[i]
username = row[0].xpath("./a")[0] # First column
assert username.text.strip() == _user.Username
root = parse_root(response.text)
page_prev = root.xpath('//button[contains(@class, "page-prev")]')[0]
page_next = root.xpath('//button[contains(@class, "page-next")]')[0]
assert page_prev.attrib["disabled"] == "disabled"
assert "disabled" not in page_next.attrib
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"O": 50}) # +50 offset.
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 50
second_fifty = sorted_users[50:100]
for i, _user in enumerate(second_fifty):
row = rows[i]
username = row[0].xpath("./a")[0] # First column
assert username.text.strip() == _user.Username
with client as request:
response = request.post("/accounts", cookies=cookies,
data={"O": 101}) # Last page.
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 50
root = parse_root(response.text)
page_prev = root.xpath('//button[contains(@class, "page-prev")]')[0]
page_next = root.xpath('//button[contains(@class, "page-next")]')[0]
assert "disabled" not in page_prev.attrib
assert page_next.attrib["disabled"] == "disabled"
def test_get_terms_of_service():
with db.begin():
term = create(Term, Description="Test term.",
URL="http://localhost", Revision=1)
with client as request:
response = request.get("/tos", allow_redirects=False)
assert response.status_code == int(HTTPStatus.SEE_OTHER)
request = Request()
sid = user.login(request, "testPassword")
cookies = {"AURSID": sid}
# First of all, let's test that we get redirected to /tos
# when attempting to browse authenticated without accepting terms.
with client as request:
response = request.get("/", cookies=cookies, allow_redirects=False)
assert response.status_code == int(HTTPStatus.SEE_OTHER)
assert response.headers.get("location") == "/tos"
with client as request:
response = request.get("/tos", cookies=cookies, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
with db.begin():
accepted_term = create(AcceptedTerm, User=user,
Term=term, Revision=term.Revision)
with client as request:
response = request.get("/tos", cookies=cookies, allow_redirects=False)
# We accepted the term, there's nothing left to accept.
assert response.status_code == int(HTTPStatus.SEE_OTHER)
# Bump the term's revision.
with db.begin():
term.Revision = 2
with client as request:
response = request.get("/tos", cookies=cookies, allow_redirects=False)
# This time, we have a modified term Revision that hasn't
# yet been agreed to via AcceptedTerm update.
assert response.status_code == int(HTTPStatus.OK)
with db.begin():
accepted_term.Revision = term.Revision
with client as request:
response = request.get("/tos", cookies=cookies, allow_redirects=False)
# We updated the term revision, there's nothing left to accept.
assert response.status_code == int(HTTPStatus.SEE_OTHER)
def test_post_terms_of_service():
request = Request()
sid = user.login(request, "testPassword")
data = {"accept": True} # POST data.
cookies = {"AURSID": sid} # Auth cookie.
# Create a fresh Term.
with db.begin():
term = create(Term, Description="Test term.",
URL="http://localhost", Revision=1)
# Test that the term we just created is listed.
with client as request:
response = request.get("/tos", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
# Make a POST request to /tos with the agree checkbox disabled (False).
with client as request:
response = request.post("/tos", data={"accept": False},
cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
# Make a POST request to /tos with the agree checkbox enabled (True).
with client as request:
response = request.post("/tos", data=data, cookies=cookies)
assert response.status_code == int(HTTPStatus.SEE_OTHER)
# Query the db for the record created by the post request.
accepted_term = query(AcceptedTerm,
AcceptedTerm.TermsID == term.ID).first()
assert accepted_term.User == user
assert accepted_term.Term == term
# Update the term to revision 2.
with db.begin():
term.Revision = 2
# A GET request gives us the new revision to accept.
with client as request:
response = request.get("/tos", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
# Let's POST again and agree to the new term revision.
with client as request:
response = request.post("/tos", data=data, cookies=cookies)
assert response.status_code == int(HTTPStatus.SEE_OTHER)
# Check that the records ended up matching.
assert accepted_term.Revision == term.Revision
# Now, see that GET redirects us to / with no terms left to accept.
with client as request:
response = request.get("/tos", cookies=cookies, allow_redirects=False)
assert response.status_code == int(HTTPStatus.SEE_OTHER)
assert response.headers.get("location") == "/"
def test_account_comments_not_found():
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.get("/account/non-existent/comments", cookies=cookies)
assert resp.status_code == int(HTTPStatus.NOT_FOUND)