From c94793b0b11b372a299fb6d23e39562066b7531b Mon Sep 17 00:00:00 2001 From: Kevin Morris Date: Thu, 28 Jan 2021 20:26:34 -0800 Subject: [PATCH] add user registration routes * Added /register get and post routes. + Added default attributes to AnonymousUser, including a new AnonymousList which behaves like an sqlalchemy relationship list. + aurweb.util: Added validation functions for various user fields used throughout registration. + test_accounts_routes: Added get|post register route tests. Signed-off-by: Kevin Morris --- aurweb/auth.py | 10 + aurweb/routers/accounts.py | 320 +++++++++++++++++++++++- aurweb/util.py | 84 +++++++ templates/partials/account_form.html | 343 ++++++++++++++++++++++++++ templates/register.html | 30 +++ test/test_accounts_routes.py | 356 ++++++++++++++++++++++++++- 6 files changed, 1140 insertions(+), 3 deletions(-) create mode 100644 templates/partials/account_form.html create mode 100644 templates/register.html diff --git a/aurweb/auth.py b/aurweb/auth.py index 53c853de..a4ff2167 100644 --- a/aurweb/auth.py +++ b/aurweb/auth.py @@ -7,12 +7,22 @@ from fastapi.responses import RedirectResponse from starlette.authentication import AuthCredentials, AuthenticationBackend, AuthenticationError from starlette.requests import HTTPConnection +import aurweb.config + from aurweb.models.session import Session from aurweb.models.user import User from aurweb.templates import make_context, render_template class AnonymousUser: + # Stub attributes used to mimic a real user. + ID = 0 + LangPreference = aurweb.config.get("options", "default_lang") + Timezone = aurweb.config.get("options", "default_timezone") + + # A stub ssh_pub_key relationship. + ssh_pub_key = None + @staticmethod def is_authenticated(): return False diff --git a/aurweb/routers/accounts.py b/aurweb/routers/accounts.py index db23bc3a..a43ba9f7 100644 --- a/aurweb/routers/accounts.py +++ b/aurweb/routers/accounts.py @@ -1,12 +1,20 @@ +import copy + from http import HTTPStatus from fastapi import APIRouter, Form, Request from fastapi.responses import HTMLResponse, RedirectResponse -from sqlalchemy import or_ +from sqlalchemy import and_, func, or_ -from aurweb import db +import aurweb.config + +from aurweb import db, l10n, time, util from aurweb.auth import auth_required +from aurweb.captcha import get_captcha_answer, get_captcha_salts, get_captcha_token from aurweb.l10n import get_translator_for_request +from aurweb.models.account_type import AccountType +from aurweb.models.ban import Ban +from aurweb.models.ssh_pub_key import SSHPubKey, get_fingerprint from aurweb.models.user import User from aurweb.scripts.notify import ResetKeyNotification from aurweb.templates import make_variable_context, render_template @@ -93,3 +101,311 @@ async def passreset_post(request: Request, # Render ?step=confirm. return RedirectResponse(url="/passreset?step=confirm", status_code=int(HTTPStatus.SEE_OTHER)) + + +def process_account_form(request: Request, user: User, args: dict): + """ Process an account form. All fields are optional and only checks + requirements in the case they are present. + + ``` + context = await make_variable_context(request, "Accounts") + ok, errors = process_account_form(request, user, **kwargs) + if not ok: + context["errors"] = errors + return render_template(request, "some_account_template.html", context) + ``` + + :param request: An incoming FastAPI request + :param user: The user model of the account being processed + :param args: A dictionary of arguments generated via request.form() + :return: A (passed processing boolean, list of errors) tuple + """ + + # Get a local translator. + _ = get_translator_for_request(request) + + host = request.client.host + ban = db.query(Ban, Ban.IPAddress == host).first() + if ban: + return False, [ + "Account registration has been disabled for your " + + "IP address, probably due to sustained spam attacks. " + + "Sorry for the inconvenience." + ] + + if request.user.is_authenticated(): + if not request.user.valid_password(args.get("passwd", None)): + return False, ["Invalid password."] + + email = args.get("E", None) + username = args.get("U", None) + + if not email or not username: + return False, ["Missing a required field."] + + username_min_len = aurweb.config.getint("options", "username_min_len") + username_max_len = aurweb.config.getint("options", "username_max_len") + if not util.valid_username(args.get("U")): + return False, [ + "The username is invalid.", + [ + _("It must be between %s and %s characters long") % ( + username_min_len, username_max_len), + "Start and end with a letter or number", + "Can contain only one period, underscore or hyphen.", + ] + ] + + password = args.get("P", None) + if password: + confirmation = args.get("C", None) + if not util.valid_password(password): + return False, [ + _("Your password must be at least %s characters.") % ( + username_min_len) + ] + elif not confirmation: + return False, ["Please confirm your new password."] + elif password != confirmation: + return False, ["Password fields do not match."] + + backup_email = args.get("BE", None) + homepage = args.get("HP", None) + pgp_key = args.get("K", None) + ssh_pubkey = args.get("PK", None) + language = args.get("L", None) + timezone = args.get("TZ", None) + + def username_exists(username): + return and_(User.ID != user.ID, + func.lower(User.Username) == username.lower()) + + def email_exists(email): + return and_(User.ID != user.ID, + func.lower(User.Email) == email.lower()) + + if not util.valid_email(email): + return False, ["The email address is invalid."] + elif backup_email and not util.valid_email(backup_email): + return False, ["The backup email address is invalid."] + elif homepage and not util.valid_homepage(homepage): + return False, [ + "The home page is invalid, please specify the full HTTP(s) URL."] + elif pgp_key and not util.valid_pgp_fingerprint(pgp_key): + return False, ["The PGP key fingerprint is invalid."] + elif ssh_pubkey and not util.valid_ssh_pubkey(ssh_pubkey): + return False, ["The SSH public key is invalid."] + elif language and language not in l10n.SUPPORTED_LANGUAGES: + return False, ["Language is not currently supported."] + elif timezone and timezone not in time.SUPPORTED_TIMEZONES: + return False, ["Timezone is not currently supported."] + elif db.query(User, username_exists(username)).first(): + # If the username already exists... + return False, [ + _("The username, %s%s%s, is already in use.") % ( + "", username, "") + ] + elif db.query(User, email_exists(email)).first(): + # If the email already exists... + return False, [ + _("The address, %s%s%s, is already in use.") % ( + "", email, "") + ] + + def ssh_fingerprint_exists(fingerprint): + return and_(SSHPubKey.UserID != user.ID, + SSHPubKey.Fingerprint == fingerprint) + + if ssh_pubkey: + fingerprint = get_fingerprint(ssh_pubkey.strip().rstrip()) + if fingerprint is None: + return False, ["The SSH public key is invalid."] + + if db.query(SSHPubKey, ssh_fingerprint_exists(fingerprint)).first(): + return False, [ + _("The SSH public key, %s%s%s, is already in use.") % ( + "", fingerprint, "") + ] + + captcha_salt = args.get("captcha_salt", None) + if captcha_salt and captcha_salt not in get_captcha_salts(): + return False, ["This CAPTCHA has expired. Please try again."] + + captcha = args.get("captcha", None) + if captcha: + answer = get_captcha_answer(get_captcha_token(captcha_salt)) + if captcha != answer: + return False, ["The entered CAPTCHA answer is invalid."] + + return True, [] + + +def make_account_form_context(context: dict, + request: Request, + user: User, + args: dict): + """ Modify a FastAPI context and add attributes for the account form. + + :param context: FastAPI context + :param request: FastAPI request + :param user: Target user + :param args: Persistent arguments: request.form() + :return: FastAPI context adjusted for account form + """ + # Do not modify the original context. + context = copy.copy(context) + + context["account_types"] = [ + (1, "Normal User"), + (2, "Trusted User") + ] + + user_account_type_id = context.get("account_types")[0][0] + + if request.user.has_credential("CRED_ACCOUNT_EDIT_DEV"): + context["account_types"].append((3, "Developer")) + context["account_types"].append((4, "Trusted User & Developer")) + + if request.user.is_authenticated(): + context["username"] = args.get("U", user.Username) + context["account_type"] = args.get("T", user.AccountType.ID) + context["suspended"] = args.get("S", user.Suspended) + context["email"] = args.get("E", user.Email) + context["hide_email"] = args.get("H", user.HideEmail) + context["backup_email"] = args.get("BE", user.BackupEmail) + context["realname"] = args.get("R", user.RealName) + context["homepage"] = args.get("HP", user.Homepage or str()) + context["ircnick"] = args.get("I", user.IRCNick) + context["pgp"] = args.get("K", user.PGPKey or str()) + context["lang"] = args.get("L", user.LangPreference) + context["tz"] = args.get("TZ", user.Timezone) + ssh_pk = user.ssh_pub_key.PubKey if user.ssh_pub_key else str() + context["ssh_pk"] = args.get("PK", ssh_pk) + context["cn"] = args.get("CN", user.CommentNotify) + context["un"] = args.get("UN", user.UpdateNotify) + context["on"] = args.get("ON", user.OwnershipNotify) + else: + context["username"] = args.get("U", str()) + context["account_type"] = args.get("T", user_account_type_id) + context["suspended"] = args.get("S", False) + context["email"] = args.get("E", str()) + context["hide_email"] = args.get("H", False) + context["backup_email"] = args.get("BE", str()) + context["realname"] = args.get("R", str()) + context["homepage"] = args.get("HP", str()) + context["ircnick"] = args.get("I", str()) + context["pgp"] = args.get("K", str()) + context["lang"] = args.get("L", context.get("language")) + context["tz"] = args.get("TZ", context.get("timezone")) + context["ssh_pk"] = args.get("PK", str()) + context["cn"] = args.get("CN", True) + context["un"] = args.get("UN", False) + context["on"] = args.get("ON", True) + + context["password"] = args.get("P", str()) + context["confirm"] = args.get("C", str()) + + return context + + +@router.get("/register", response_class=HTMLResponse) +@auth_required(False) +async def account_register(request: Request, + U: str = Form(default=str()), # Username + E: str = Form(default=str()), # Email + H: str = Form(default=False), # Hide Email + BE: str = Form(default=None), # Backup Email + R: str = Form(default=None), # Real Name + HP: str = Form(default=None), # Homepage + I: str = Form(default=None), # IRC Nick + K: str = Form(default=None), # PGP Key FP + L: str = Form(default=aurweb.config.get( + "options", "default_lang")), + TZ: str = Form(default=aurweb.config.get( + "options", "default_timezone")), + PK: str = Form(default=None), + CN: bool = Form(default=False), # Comment Notify + CU: bool = Form(default=False), # Update Notify + CO: bool = Form(default=False), # Owner Notify + captcha: str = Form(default=str())): + context = await make_variable_context(request, "Register") + context["captcha_salt"] = get_captcha_salts()[0] + context = make_account_form_context(context, request, None, dict()) + return render_template(request, "register.html", context) + + +@router.post("/register", response_class=HTMLResponse) +@auth_required(False) +async def account_register_post(request: Request, + U: str = Form(default=str()), # Username + E: str = Form(default=str()), # Email + H: str = Form(default=False), # Hide Email + BE: str = Form(default=None), # Backup Email + R: str = Form(default=''), # Real Name + HP: str = Form(default=None), # Homepage + I: str = Form(default=None), # IRC Nick + K: str = Form(default=None), # PGP Key + L: str = Form(default=aurweb.config.get( + "options", "default_lang")), + TZ: str = Form(default=aurweb.config.get( + "options", "default_timezone")), + PK: str = Form(default=None), # SSH PubKey + CN: bool = Form(default=False), + UN: bool = Form(default=False), + ON: bool = Form(default=False), + captcha: str = Form(default=None), + captcha_salt: str = Form(...)): + from aurweb.db import session + + context = await make_variable_context(request, "Register") + + args = dict(await request.form()) + context = make_account_form_context(context, request, None, args) + + ok, errors = process_account_form(request, request.user, args) + + if not ok: + # If the field values given do not meet the requirements, + # return HTTP 400 with an error. + context["errors"] = errors + return render_template(request, "register.html", context, + status_code=int(HTTPStatus.BAD_REQUEST)) + + if not captcha: + context["errors"] = ["The CAPTCHA is missing."] + return render_template(request, "register.html", context, + status_code=int(HTTPStatus.BAD_REQUEST)) + + # Create a user with no password with a resetkey, then send + # an email off about it. + resetkey = db.make_random_value(User, User.ResetKey) + + # By default, we grab the User account type to associate with. + account_type = db.query(AccountType, + AccountType.AccountType == "User").first() + + # Create a user given all parameters available. + user = db.create(User, Username=U, Email=E, HideEmail=H, BackupEmail=BE, + RealName=R, Homepage=HP, IRCNick=I, PGPKey=K, + LangPreference=L, Timezone=TZ, CommentNotify=CN, + UpdateNotify=UN, OwnershipNotify=ON, ResetKey=resetkey, + AccountType=account_type) + + # If a PK was given and either one does not exist or the given + # PK mismatches the existing user's SSHPubKey.PubKey. + if PK: + # Get the second element in the PK, which is the actual key. + pubkey = PK.strip().rstrip() + fingerprint = get_fingerprint(pubkey) + user.ssh_pub_key = SSHPubKey(UserID=user.ID, + PubKey=pubkey, + Fingerprint=fingerprint) + session.commit() + + # Send a reset key notification to the new user. + executor = db.ConnectionExecutor(db.get_engine().raw_connection()) + ResetKeyNotification(executor, user.ID).send() + + context["complete"] = True + context["user"] = user + return render_template(request, "register.html", context) diff --git a/aurweb/util.py b/aurweb/util.py index 65f18a4c..5e1717bd 100644 --- a/aurweb/util.py +++ b/aurweb/util.py @@ -1,7 +1,91 @@ +import base64 import random +import re import string +from urllib.parse import urlparse + +import jinja2 + +from email_validator import EmailNotValidError, EmailUndeliverableError, validate_email + +import aurweb.config + def make_random_string(length): return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) + + +def valid_username(username): + min_len = aurweb.config.getint("options", "username_min_len") + max_len = aurweb.config.getint("options", "username_max_len") + if not (min_len <= len(username) <= max_len): + return False + + # Check that username contains: one or more alphanumeric + # characters, an optional separator of '.', '-' or '_', followed + # by alphanumeric characters. + return re.match(r'^[a-zA-Z0-9]+[.\-_]?[a-zA-Z0-9]+$', username) + + +def valid_email(email): + try: + validate_email(email) + except EmailUndeliverableError: + return False + except EmailNotValidError: + return False + return True + + +def valid_homepage(homepage): + parts = urlparse(homepage) + return parts.scheme in ("http", "https") and bool(parts.netloc) + + +def valid_password(password): + min_len = aurweb.config.getint("options", "passwd_min_len") + return len(password) >= min_len + + +def valid_pgp_fingerprint(fp): + fp = fp.replace(" ", "") + try: + # Attempt to convert the fingerprint to an int via base16. + # If it can't, it's not a hex string. + int(fp, 16) + except ValueError: + return False + + # Check the length; must be 40 hexadecimal digits. + return len(fp) == 40 + + +def valid_ssh_pubkey(pk): + valid_prefixes = ("ssh-rsa", "ecdsa-sha2-nistp256", + "ecdsa-sha2-nistp384", "ecdsa-sha2-nistp521", + "ssh-ed25519") + + has_valid_prefix = False + for prefix in valid_prefixes: + if "%s " % prefix in pk: + has_valid_prefix = True + break + if not has_valid_prefix: + return False + + tokens = pk.strip().rstrip().split(" ") + if len(tokens) < 2: + return False + + return base64.b64encode(base64.b64decode(tokens[1])).decode() == tokens[1] + + +@jinja2.contextfilter +def account_url(context, user): + request = context.get("request") + base = f"{request.url.scheme}://{request.url.hostname}" + if request.url.scheme == "http" and request.url.port != 80: + base += f":{request.url.port}" + return f"{base}/account/{user.Username}" diff --git a/templates/partials/account_form.html b/templates/partials/account_form.html new file mode 100644 index 00000000..3af13368 --- /dev/null +++ b/templates/partials/account_form.html @@ -0,0 +1,343 @@ + +
+
+ +
+
+ +

+ + + + ({% trans %}required{% endtrans %}) +

+

+ {{ "Your user name is the name you will use to login. " + "It is visible to the general public, even if your " + "account is inactive." | tr }} +

+ + {% if request.user.has_credential("CRED_ACCOUNT_CHANGE_TYPE") %} +

+ + +

+ +

+ + + +

+ {% endif %} + + +

+ + + + ({% trans %}required{% endtrans %}) +

+

+ {{ "Please ensure you correctly entered your email " + "address, otherwise you will be locked out." | tr }} +

+ + +

+ + + +

+

+ {{ "If you do not hide your email address, it is " + "visible to all registered AUR users. If you hide your " + "email address, it is visible to members of the Arch " + "Linux staff only." | tr }} +

+ + +

+ + + +

+

+ + {{ "Optionally provide a secondary email address that " + "can be used to restore your account in case you lose " + "access to your primary email address." | tr }} + {{ "Password reset links are always sent to both your " + "primary and your backup email address." | tr }} + {{ "Your backup email address is always only visible to " + "members of the Arch Linux staff, independent of the %s " + "setting." | tr + | format("%s" | format("Hide Email Address" | tr)) + | safe }} + +

+ + +

+ + + +

+ + +

+ + + +

+ + +

+ + + +

+ + +

+ + + +

+ + +

+ + + +

+ + +

+ + + +

+ +
+ + {% if form_type == "UpdateAccount" %} +
+ + {{ + "If you want to change the password, enter a new password " + "and confirm the new password by entering it again." | tr + }} + +

+ + +

+ +

+ + + +

+
+ {% endif %} + +
+ + {{ + "The following information is only required if you " + "want to submit packages to the Arch User Repository." | tr + }} + +

+ + + + +

+
+ +
+ {% trans%}Notification settings{% endtrans %}: +

+ + + +

+

+ + + +

+

+ + + +

+
+ +
+ {% if form_type == "UpdateAccount" %} + + {{ "To confirm the profile changes, please enter " + "your current password:" | tr }} + +

+ + +

+ {% else %} + + + {{ "To protect the AUR against automated account creation, " + "we kindly ask you to provide the output of the following " + "command:" | tr }} + + {{ captcha_salt | captcha_cmdline }} + + +

+ + + ({% trans %}required{% endtrans %}) + + +

+ {% endif %} +
+ +
+

+ + {% if form_type == "UpdateAccount" %} +   + {% else %} +   + {% endif %} + +

+
+
diff --git a/templates/register.html b/templates/register.html new file mode 100644 index 00000000..a15971a1 --- /dev/null +++ b/templates/register.html @@ -0,0 +1,30 @@ +{% extends "partials/layout.html" %} + +{% block pageContent %} +
+

{% trans %}Register{% endtrans %}

+ + {% if complete %} + {{ + "The account, %s%s%s, has been successfully created." + | tr + | format("", "'" + user.Username + "'", "") + | safe + }} +

+ {% trans %}A password reset key has been sent to your e-mail address.{% endtrans %} +

+ {% else %} + {% if errors %} + {% include "partials/error.html" %} + {% else %} +

+ {% trans %}Use this form to create an account.{% endtrans %} +

+ {% endif %} + + {% set form_type = "NewAccount" %} + {% include "partials/account_form.html" %} + {% endif %} +
+{% endblock %} diff --git a/test/test_accounts_routes.py b/test/test_accounts_routes.py index 69896a0f..d79137bf 100644 --- a/test/test_accounts_routes.py +++ b/test/test_accounts_routes.py @@ -1,13 +1,21 @@ +import re +import tempfile + +from datetime import datetime from http import HTTPStatus +from subprocess import Popen import pytest from fastapi.testclient import TestClient +from aurweb import captcha from aurweb.asgi import app -from aurweb.db import query +from aurweb.db import create, delete, query from aurweb.models.account_type import AccountType +from aurweb.models.ban import Ban from aurweb.models.session import Session +from aurweb.models.ssh_pub_key import SSHPubKey, get_fingerprint from aurweb.models.user import User from aurweb.testing import setup_test_db from aurweb.testing.models import make_user @@ -220,3 +228,349 @@ def test_post_passreset_error_password_requirements(): error = f"Your password must be at least {passwd_min_len} characters." assert error in response.content.decode("utf-8") + + +def test_get_register(): + with client as request: + response = request.get("/register") + assert response.status_code == int(HTTPStatus.OK) + + +def post_register(request, **kwargs): + """ A simple helper that allows overrides to test defaults. """ + salt = captcha.get_captcha_salts()[0] + token = captcha.get_captcha_token(salt) + answer = captcha.get_captcha_answer(token) + + data = { + "U": "newUser", + "E": "newUser@email.org", + "P": "newUserPassword", + "C": "newUserPassword", + "L": "en", + "TZ": "UTC", + "captcha": answer, + "captcha_salt": salt + } + + # For any kwargs given, override their k:v pairs in data. + args = dict(kwargs) + for k, v in args.items(): + data[k] = v + + return request.post("/register", data=data, allow_redirects=False) + + +def test_post_register(): + with client as request: + response = post_register(request) + assert response.status_code == int(HTTPStatus.OK) + + expected = "The account, 'newUser', " + expected += "has been successfully created." + assert expected in response.content.decode() + + +def test_post_register_rejects_case_insensitive_spoof(): + with client as request: + response = post_register(request, U="newUser", E="newUser@example.org") + assert response.status_code == int(HTTPStatus.OK) + + with client as request: + response = post_register(request, U="NEWUSER", E="BLAH@GMAIL.COM") + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + expected = "The username, NEWUSER, is already in use." + assert expected in response.content.decode() + + with client as request: + response = post_register(request, U="BLAH", E="NEWUSER@EXAMPLE.ORG") + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + expected = "The address, NEWUSER@EXAMPLE.ORG, " + expected += "is already in use." + assert expected in response.content.decode() + + +def test_post_register_error_expired_captcha(): + with client as request: + response = post_register(request, captcha_salt="invalid-salt") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert "This CAPTCHA has expired. Please try again." in content + + +def test_post_register_error_missing_captcha(): + with client as request: + response = post_register(request, captcha=None) + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert "The CAPTCHA is missing." in content + + +def test_post_register_error_invalid_captcha(): + with client as request: + response = post_register(request, captcha="invalid blah blah") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert "The entered CAPTCHA answer is invalid." in content + + +def test_post_register_error_ip_banned(): + # 'testclient' is used as request.client.host via FastAPI TestClient. + create(Ban, IPAddress="testclient", BanTS=datetime.utcnow()) + + with client as request: + response = post_register(request) + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert ("Account registration has been disabled for your IP address, " + + "probably due to sustained spam attacks. Sorry for the " + + "inconvenience.") in content + + +def test_post_register_error_missing_username(): + with client as request: + response = post_register(request, U="") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert "Missing a required field." in content + + +def test_post_register_error_missing_email(): + with client as request: + response = post_register(request, E="") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert "Missing a required field." in content + + +def test_post_register_error_invalid_username(): + with client as request: + # Our test config requires at least three characters for a + # valid username, so test against two characters: 'ba'. + response = post_register(request, U="ba") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert "The username is invalid." in content + + +def test_post_register_invalid_password(): + with client as request: + response = post_register(request, P="abc", C="abc") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + expected = r"Your password must be at least \d+ characters." + assert re.search(expected, content) + + +def test_post_register_error_missing_confirm(): + with client as request: + response = post_register(request, C=None) + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert "Please confirm your new password." in content + + +def test_post_register_error_mismatched_confirm(): + with client as request: + response = post_register(request, C="mismatched") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert "Password fields do not match." in content + + +def test_post_register_error_invalid_email(): + with client as request: + response = post_register(request, E="bad@email") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert "The email address is invalid." in content + + +def test_post_register_error_undeliverable_email(): + with client as request: + # At the time of writing, webchat.freenode.net does not contain + # mx records; if it ever does, it'll break this test. + response = post_register(request, E="email@bad.c") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert "The email address is invalid." in content + + +def test_post_register_invalid_backup_email(): + with client as request: + response = post_register(request, BE="bad@email") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert "The backup email address is invalid." in content + + +def test_post_register_error_invalid_homepage(): + with client as request: + response = post_register(request, HP="bad") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + expected = "The home page is invalid, please specify the full HTTP(s) URL." + assert expected in content + + +def test_post_register_error_invalid_pgp_fingerprints(): + with client as request: + response = post_register(request, K="bad") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + expected = "The PGP key fingerprint is invalid." + assert expected in content + + pk = 'z' + ('a' * 39) + with client as request: + response = post_register(request, K=pk) + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + expected = "The PGP key fingerprint is invalid." + assert expected in content + + +def test_post_register_error_invalid_ssh_pubkeys(): + with client as request: + response = post_register(request, PK="bad") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert "The SSH public key is invalid." in content + + with client as request: + response = post_register(request, PK="ssh-rsa ") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + assert "The SSH public key is invalid." in content + + +def test_post_register_error_unsupported_language(): + with client as request: + response = post_register(request, L="bad") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + expected = "Language is not currently supported." + assert expected in content + + +def test_post_register_error_unsupported_timezone(): + with client as request: + response = post_register(request, TZ="ABCDEFGH") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + expected = "Timezone is not currently supported." + assert expected in content + + +def test_post_register_error_username_taken(): + with client as request: + response = post_register(request, U="test") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + expected = r"The username, .*, is already in use." + assert re.search(expected, content) + + +def test_post_register_error_email_taken(): + with client as request: + response = post_register(request, E="test@example.org") + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + expected = r"The address, .*, is already in use." + assert re.search(expected, content) + + +def test_post_register_error_ssh_pubkey_taken(): + pk = str() + + # Create a public key with ssh-keygen (this adds ssh-keygen as a + # dependency to passing this test). + with tempfile.TemporaryDirectory() as tmpdir: + with open("/dev/null", "w") as null: + proc = Popen(["ssh-keygen", "-f", f"{tmpdir}/test.ssh", "-N", ""], + stdout=null, stderr=null) + proc.wait() + assert proc.returncode == 0 + + # Read in the public key, then delete the temp dir we made. + pk = open(f"{tmpdir}/test.ssh.pub").read().rstrip() + + # Take the sha256 fingerprint of the ssh public key, create it. + fp = get_fingerprint(pk) + create(SSHPubKey, UserID=user.ID, PubKey=pk, Fingerprint=fp) + + with client as request: + response = post_register(request, PK=pk) + + assert response.status_code == int(HTTPStatus.BAD_REQUEST) + + content = response.content.decode() + expected = r"The SSH public key, .*, is already in use." + assert re.search(expected, content) + + +def test_post_register_with_ssh_pubkey(): + pk = str() + + # Create a public key with ssh-keygen (this adds ssh-keygen as a + # dependency to passing this test). + with tempfile.TemporaryDirectory() as tmpdir: + with open("/dev/null", "w") as null: + proc = Popen(["ssh-keygen", "-f", f"{tmpdir}/test.ssh", "-N", ""], + stdout=null, stderr=null) + proc.wait() + assert proc.returncode == 0 + + # Read in the public key, then delete the temp dir we made. + pk = open(f"{tmpdir}/test.ssh.pub").read().rstrip() + + with client as request: + response = post_register(request, PK=pk) + + assert response.status_code == int(HTTPStatus.OK)