Merge branch 'master' into live

This commit is contained in:
Kevin Morris 2022-02-08 09:16:17 -08:00
commit 5b9c89558a
No known key found for this signature in database
GPG key ID: F7E46DED420788F3
13 changed files with 164 additions and 110 deletions

View file

@ -6,7 +6,7 @@ from typing import Any
# Publicly visible version of aurweb. This is used to display # Publicly visible version of aurweb. This is used to display
# aurweb versioning in the footer and must be maintained. # aurweb versioning in the footer and must be maintained.
# Todo: Make this dynamic/automated. # Todo: Make this dynamic/automated.
AURWEB_VERSION = "v6.0.7" AURWEB_VERSION = "v6.0.8"
_parser = None _parser = None

View file

@ -1,6 +1,3 @@
import os
import tempfile
from subprocess import PIPE, Popen from subprocess import PIPE, Popen
from sqlalchemy.orm import backref, relationship from sqlalchemy.orm import backref, relationship
@ -15,28 +12,17 @@ class SSHPubKey(Base):
__mapper_args__ = {"primary_key": [__table__.c.Fingerprint]} __mapper_args__ = {"primary_key": [__table__.c.Fingerprint]}
User = relationship( User = relationship(
"User", backref=backref("ssh_pub_key", uselist=False), "User", backref=backref("ssh_pub_keys", lazy="dynamic"),
foreign_keys=[__table__.c.UserID]) foreign_keys=[__table__.c.UserID])
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
def get_fingerprint(pubkey): def get_fingerprint(pubkey: str) -> str:
with tempfile.TemporaryDirectory() as tmpdir: proc = Popen(["ssh-keygen", "-l", "-f", "-"], stdin=PIPE, stdout=PIPE,
pk = os.path.join(tmpdir, "ssh.pub") stderr=PIPE)
out, _ = proc.communicate(pubkey.encode())
with open(pk, "w") as f: if proc.returncode:
f.write(pubkey) raise ValueError("The SSH public key is invalid.")
return out.decode().split()[1].split(":", 1)[1]
proc = Popen(["ssh-keygen", "-l", "-f", pk], stdout=PIPE, stderr=PIPE)
out, err = proc.communicate()
# Invalid SSH Public Key. Return None to the caller.
if proc.returncode != 0:
return None
parts = out.decode().split()
fp = parts[1].replace("SHA256:", "")
return fp

View file

@ -2,6 +2,7 @@ import copy
import typing import typing
from http import HTTPStatus from http import HTTPStatus
from typing import Any, Dict
from fastapi import APIRouter, Form, Request from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.responses import HTMLResponse, RedirectResponse
@ -105,7 +106,8 @@ async def passreset_post(request: Request,
status_code=HTTPStatus.SEE_OTHER) status_code=HTTPStatus.SEE_OTHER)
def process_account_form(request: Request, user: models.User, args: dict): def process_account_form(request: Request, user: models.User,
args: Dict[str, Any]):
""" Process an account form. All fields are optional and only checks """ Process an account form. All fields are optional and only checks
requirements in the case they are present. requirements in the case they are present.
@ -193,8 +195,8 @@ def make_account_form_context(context: dict,
context["pgp"] = args.get("K", user.PGPKey or str()) context["pgp"] = args.get("K", user.PGPKey or str())
context["lang"] = args.get("L", user.LangPreference) context["lang"] = args.get("L", user.LangPreference)
context["tz"] = args.get("TZ", user.Timezone) context["tz"] = args.get("TZ", user.Timezone)
ssh_pk = user.ssh_pub_key.PubKey if user.ssh_pub_key else str() ssh_pks = [pk.PubKey for pk in user.ssh_pub_keys]
context["ssh_pk"] = args.get("PK", ssh_pk) context["ssh_pks"] = args.get("PK", ssh_pks)
context["cn"] = args.get("CN", user.CommentNotify) context["cn"] = args.get("CN", user.CommentNotify)
context["un"] = args.get("UN", user.UpdateNotify) context["un"] = args.get("UN", user.UpdateNotify)
context["on"] = args.get("ON", user.OwnershipNotify) context["on"] = args.get("ON", user.OwnershipNotify)
@ -212,7 +214,7 @@ def make_account_form_context(context: dict,
context["pgp"] = args.get("K", str()) context["pgp"] = args.get("K", str())
context["lang"] = args.get("L", context.get("language")) context["lang"] = args.get("L", context.get("language"))
context["tz"] = args.get("TZ", context.get("timezone")) context["tz"] = args.get("TZ", context.get("timezone"))
context["ssh_pk"] = args.get("PK", str()) context["ssh_pks"] = args.get("PK", str())
context["cn"] = args.get("CN", True) context["cn"] = args.get("CN", True)
context["un"] = args.get("UN", False) context["un"] = args.get("UN", False)
context["on"] = args.get("ON", True) context["on"] = args.get("ON", True)
@ -314,16 +316,13 @@ async def account_register_post(request: Request,
# PK mismatches the existing user's SSHPubKey.PubKey. # PK mismatches the existing user's SSHPubKey.PubKey.
if PK: if PK:
# Get the second element in the PK, which is the actual key. # Get the second element in the PK, which is the actual key.
pubkey = PK.strip().rstrip() keys = util.parse_ssh_keys(PK.strip())
parts = pubkey.split(" ") for k in keys:
if len(parts) == 3: pk = " ".join(k)
# Remove the host part. fprint = get_fingerprint(pk)
pubkey = parts[0] + " " + parts[1] with db.begin():
fingerprint = get_fingerprint(pubkey) db.create(models.SSHPubKey, UserID=user.ID,
with db.begin(): PubKey=pk, Fingerprint=fprint)
user.ssh_pub_key = models.SSHPubKey(UserID=user.ID,
PubKey=pubkey,
Fingerprint=fingerprint)
# Send a reset key notification to the new user. # Send a reset key notification to the new user.
WelcomeNotification(user.ID).send() WelcomeNotification(user.ID).send()
@ -409,6 +408,9 @@ async def account_edit_post(request: Request,
context = make_account_form_context(context, request, user, args) context = make_account_form_context(context, request, user, args)
ok, errors = process_account_form(request, user, args) ok, errors = process_account_form(request, user, args)
if PK:
context["ssh_pks"] = [PK]
if not passwd: if not passwd:
context["errors"] = ["Invalid password."] context["errors"] = ["Invalid password."]
return render_template(request, "account/edit.html", context, return render_template(request, "account/edit.html", context,

View file

@ -2,7 +2,8 @@ from typing import Any, Dict
from fastapi import Request from fastapi import Request
from aurweb import cookies, db, models, time from aurweb import cookies, db, models, time, util
from aurweb.models import SSHPubKey
from aurweb.models.ssh_pub_key import get_fingerprint from aurweb.models.ssh_pub_key import get_fingerprint
from aurweb.util import strtobool from aurweb.util import strtobool
@ -52,32 +53,35 @@ def timezone(TZ: str = str(),
context["language"] = TZ context["language"] = TZ
def ssh_pubkey(PK: str = str(), def ssh_pubkey(PK: str = str(), user: models.User = None, **kwargs) -> None:
user: models.User = None, if not PK:
**kwargs) -> None: # If no pubkey is provided, wipe out any pubkeys the user
# If a PK is given, compare it against the target user's PK. # has and return out early.
if PK:
# Get the second token in the public key, which is the actual key.
pubkey = PK.strip().rstrip()
parts = pubkey.split(" ")
if len(parts) == 3:
# Remove the host part.
pubkey = parts[0] + " " + parts[1]
fingerprint = get_fingerprint(pubkey)
if not user.ssh_pub_key:
# No public key exists, create one.
with db.begin():
db.create(models.SSHPubKey, UserID=user.ID,
PubKey=pubkey, Fingerprint=fingerprint)
elif user.ssh_pub_key.PubKey != pubkey:
# A public key already exists, update it.
with db.begin():
user.ssh_pub_key.PubKey = pubkey
user.ssh_pub_key.Fingerprint = fingerprint
elif user.ssh_pub_key:
# Else, if the user has a public key already, delete it.
with db.begin(): with db.begin():
db.delete(user.ssh_pub_key) db.delete_all(user.ssh_pub_keys)
return
# Otherwise, parse ssh keys and their fprints out of PK.
keys = util.parse_ssh_keys(PK.strip())
fprints = [get_fingerprint(" ".join(k)) for k in keys]
with db.begin():
# Delete any existing keys we can't find.
to_remove = user.ssh_pub_keys.filter(
~SSHPubKey.Fingerprint.in_(fprints))
db.delete_all(to_remove)
# For each key, if it does not yet exist, create it.
for i, full_key in enumerate(keys):
prefix, key = full_key
exists = user.ssh_pub_keys.filter(
SSHPubKey.Fingerprint == fprints[i]
).exists()
if not db.query(exists).scalar():
# No public key exists, create one.
db.create(models.SSHPubKey, UserID=user.ID,
PubKey=" ".join([prefix, key]),
Fingerprint=fprints[i])
def account_type(T: int = None, def account_type(T: int = None,

View file

@ -107,14 +107,16 @@ def invalid_pgp_key(K: str = str(), **kwargs) -> None:
def invalid_ssh_pubkey(PK: str = str(), user: models.User = None, def invalid_ssh_pubkey(PK: str = str(), user: models.User = None,
_: l10n.Translator = None, **kwargs) -> None: _: l10n.Translator = None, **kwargs) -> None:
if PK: if not PK:
invalid_exc = ValidationError(["The SSH public key is invalid."]) return
if not util.valid_ssh_pubkey(PK):
raise invalid_exc
fingerprint = get_fingerprint(PK.strip().rstrip()) try:
if not fingerprint: keys = util.parse_ssh_keys(PK.strip())
raise invalid_exc except ValueError as exc:
raise ValidationError([str(exc)])
for prefix, key in keys:
fingerprint = get_fingerprint(f"{prefix} {key}")
exists = db.query(models.SSHPubKey).filter( exists = db.query(models.SSHPubKey).filter(
and_(models.SSHPubKey.UserID != user.ID, and_(models.SSHPubKey.UserID != user.ID,

View file

@ -1,4 +1,3 @@
import base64
import math import math
import re import re
import secrets import secrets
@ -7,7 +6,8 @@ import string
from datetime import datetime from datetime import datetime
from distutils.util import strtobool as _strtobool from distutils.util import strtobool as _strtobool
from http import HTTPStatus from http import HTTPStatus
from typing import Callable, Iterable, Tuple, Union from subprocess import PIPE, Popen
from typing import Callable, Iterable, List, Tuple, Union
from urllib.parse import urlparse from urllib.parse import urlparse
import fastapi import fastapi
@ -82,25 +82,6 @@ def valid_pgp_fingerprint(fp):
return len(fp) == 40 return len(fp) == 40
def valid_ssh_pubkey(pk):
valid_prefixes = aurweb.config.get("auth", "valid-keytypes")
valid_prefixes = set(valid_prefixes.split(" "))
has_valid_prefix = False
for prefix in valid_prefixes:
if "%s " % prefix in pk:
has_valid_prefix = True
break
if not has_valid_prefix:
return False
tokens = pk.strip().rstrip().split(" ")
if len(tokens) < 2:
return False
return base64.b64encode(base64.b64decode(tokens[1])).decode() == tokens[1]
def jsonify(obj): def jsonify(obj):
""" Perform a conversion on obj if it's needed. """ """ Perform a conversion on obj if it's needed. """
if isinstance(obj, datetime): if isinstance(obj, datetime):
@ -191,3 +172,29 @@ async def error_or_result(next: Callable, *args, **kwargs) \
status_code = HTTPStatus.INTERNAL_SERVER_ERROR status_code = HTTPStatus.INTERNAL_SERVER_ERROR
return JSONResponse({"error": str(exc)}, status_code=status_code) return JSONResponse({"error": str(exc)}, status_code=status_code)
return response return response
def parse_ssh_key(string: str) -> Tuple[str, str]:
""" Parse an SSH public key. """
invalid_exc = ValueError("The SSH public key is invalid.")
parts = re.sub(r'\s\s+', ' ', string.strip()).split()
if len(parts) < 2:
raise invalid_exc
prefix, key = parts[:2]
prefixes = set(aurweb.config.get("auth", "valid-keytypes").split(" "))
if prefix not in prefixes:
raise invalid_exc
proc = Popen(["ssh-keygen", "-l", "-f", "-"], stdin=PIPE, stdout=PIPE,
stderr=PIPE)
out, _ = proc.communicate(f"{prefix} {key}".encode())
if proc.returncode:
raise invalid_exc
return (prefix, key)
def parse_ssh_keys(string: str) -> List[Tuple[str, str]]:
""" Parse a list of SSH public keys. """
return [parse_ssh_key(e) for e in string.splitlines()]

View file

@ -8,7 +8,7 @@
# #
[tool.poetry] [tool.poetry]
name = "aurweb" name = "aurweb"
version = "v6.0.7" version = "v6.0.8"
license = "GPL-2.0-only" license = "GPL-2.0-only"
description = "Source code for the Arch User Repository's website" description = "Source code for the Arch User Repository's website"
homepage = "https://aur.archlinux.org" homepage = "https://aur.archlinux.org"

View file

@ -262,7 +262,7 @@
<!-- Only set PK auto-fill when we've got a NewAccount form. --> <!-- Only set PK auto-fill when we've got a NewAccount form. -->
<textarea id="id_ssh" name="PK" <textarea id="id_ssh" name="PK"
rows="5" cols="30">{{ ssh_pk }}</textarea> rows="5" cols="30">{{ ssh_pks | join("\n") }}</textarea>
</p> </p>
</fieldset> </fieldset>

View file

@ -577,10 +577,13 @@ def test_post_register_error_ssh_pubkey_taken(client: TestClient, user: User):
# Read in the public key, then delete the temp dir we made. # Read in the public key, then delete the temp dir we made.
pk = open(f"{tmpdir}/test.ssh.pub").read().rstrip() pk = open(f"{tmpdir}/test.ssh.pub").read().rstrip()
prefix, key, loc = pk.split()
norm_pk = prefix + " " + key
# Take the sha256 fingerprint of the ssh public key, create it. # Take the sha256 fingerprint of the ssh public key, create it.
fp = get_fingerprint(pk) fp = get_fingerprint(norm_pk)
with db.begin(): with db.begin():
create(SSHPubKey, UserID=user.ID, PubKey=pk, Fingerprint=fp) create(SSHPubKey, UserID=user.ID, PubKey=norm_pk, Fingerprint=fp)
with client as request: with client as request:
response = post_register(request, PK=pk) response = post_register(request, PK=pk)
@ -1080,22 +1083,16 @@ def test_post_account_edit_missing_ssh_pubkey(client: TestClient, user: User):
def test_post_account_edit_invalid_ssh_pubkey(client: TestClient, user: User): def test_post_account_edit_invalid_ssh_pubkey(client: TestClient, user: User):
pubkey = "ssh-rsa fake key" pubkey = "ssh-rsa fake key"
request = Request() data = {
sid = user.login(request, "testPassword")
post_data = {
"U": "test", "U": "test",
"E": "test@example.org", "E": "test@example.org",
"P": "newPassword",
"C": "newPassword",
"PK": pubkey, "PK": pubkey,
"passwd": "testPassword" "passwd": "testPassword"
} }
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request: with client as request:
response = request.post("/account/test/edit", cookies={ response = request.post("/account/test/edit", data=data,
"AURSID": sid cookies=cookies, allow_redirects=False)
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.BAD_REQUEST) assert response.status_code == int(HTTPStatus.BAD_REQUEST)

View file

@ -53,4 +53,4 @@ def test_adduser_ssh_pk():
"--ssh-pubkey", TEST_SSH_PUBKEY]) "--ssh-pubkey", TEST_SSH_PUBKEY])
test = db.query(User).filter(User.Username == "test").first() test = db.query(User).filter(User.Username == "test").first()
assert test is not None assert test is not None
assert TEST_SSH_PUBKEY.startswith(test.ssh_pub_key.PubKey) assert TEST_SSH_PUBKEY.startswith(test.ssh_pub_keys.first().PubKey)

View file

@ -1,3 +1,5 @@
from subprocess import PIPE, Popen
import pytest import pytest
from aurweb import db from aurweb import db
@ -61,8 +63,12 @@ def test_pubkey_cs(user: User):
def test_pubkey_fingerprint(): def test_pubkey_fingerprint():
assert get_fingerprint(TEST_SSH_PUBKEY) is not None proc = Popen(["ssh-keygen", "-l", "-f", "-"], stdin=PIPE, stdout=PIPE)
out, _ = proc.communicate(TEST_SSH_PUBKEY.encode())
expected = out.decode().split()[1].split(":", 1)[1]
assert get_fingerprint(TEST_SSH_PUBKEY) == expected
def test_pubkey_invalid_fingerprint(): def test_pubkey_invalid_fingerprint():
assert get_fingerprint("ssh-rsa fake and invalid") is None with pytest.raises(ValueError):
get_fingerprint("invalid-prefix some-fake-content")

View file

@ -183,14 +183,14 @@ def test_user_has_credential(user: User):
def test_user_ssh_pub_key(user: User): def test_user_ssh_pub_key(user: User):
assert user.ssh_pub_key is None assert user.ssh_pub_keys.first() is None
with db.begin(): with db.begin():
ssh_pub_key = db.create(SSHPubKey, UserID=user.ID, ssh_pub_key = db.create(SSHPubKey, UserID=user.ID,
Fingerprint="testFingerprint", Fingerprint="testFingerprint",
PubKey="testPubKey") PubKey="testPubKey")
assert user.ssh_pub_key == ssh_pub_key assert user.ssh_pub_keys.first() == ssh_pub_key
def test_user_credential_types(user: User): def test_user_credential_types(user: User):

View file

@ -60,3 +60,53 @@ def test_valid_homepage():
assert not util.valid_homepage("https://[google.com/broken-ipv6") assert not util.valid_homepage("https://[google.com/broken-ipv6")
assert not util.valid_homepage("gopher://gopher.hprc.utoronto.ca/") assert not util.valid_homepage("gopher://gopher.hprc.utoronto.ca/")
def test_parse_ssh_key():
# Test a valid key.
pk = """ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyN\
TYAAABBBEURnkiY6JoLyqDE8Li1XuAW+LHmkmLDMW/GL5wY7k4/A+Ta7bjA3MOKrF9j4EuUTvCuNX\
ULxvpfSqheTFWZc+g="""
prefix, key = util.parse_ssh_key(pk)
e_prefix, e_key = pk.split()
assert prefix == e_prefix
assert key == e_key
# Test an invalid key with just one word in it.
with pytest.raises(ValueError):
util.parse_ssh_key("ssh-rsa")
# Test a valid key with extra words in it (after the PK).
pk = pk + " blah blah"
prefix, key = util.parse_ssh_key(pk)
assert prefix == e_prefix
assert key == e_key
# Test an invalid prefix.
with pytest.raises(ValueError):
util.parse_ssh_key("invalid-prefix fake-content")
def test_parse_ssh_keys():
pks = """ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyN\
TYAAABBBEURnkiY6JoLyqDE8Li1XuAW+LHmkmLDMW/GL5wY7k4/A+Ta7bjA3MOKrF9j4EuUTvCuNX\
ULxvpfSqheTFWZc+g=
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDmqEapFMh/ajPHnm1dBweYPeLOUjC0Ydp6uw7rB\
S5KCggUVQR8WfIm+sRYTj2+smGsK6zHMBjFnbzvV11vnMqcnY+Sa4LhIAdwkbt/b8HlGaLj1hCWSh\
a5b5/noeK7L+CECGHdvfJhpxBbhq38YEdFnCGbslk/4NriNcUp/DO81CXb1RzJ9GBFH8ivPW1mbe9\
YbxDwGimZZslg0OZu9UzoAT6xEGyiZsqJkTMbRp1ZYIOv9jHCJxRuxxuN3fzxyT3xE69+vhq2/NJX\
8aRsxGPL9G/XKcaYGS6y6LW4quIBCz/XsTZfx1GmkQeZPYHH8FeE+XC/+toXL/kamxdOQKFYEEpWK\
vTNJCD6JtMClxbIXW9q74nNqG+2SD/VQNMUz/505TK1PbY/4uyFfq5HquHJXQVCBll03FRerNHH2N\
schFne6BFHpa48PCoZNH45wLjFXwUyrGU1HrNqh6ZPdRfBTrTOkgs+BKBxGNeV45aYUPu/cFBSPcB\
fRSo6OFcejKc="""
keys = util.parse_ssh_keys(pks)
assert len(keys) == 2
pfx1, key1, pfx2, key2 = pks.split()
k1, k2 = keys
assert pfx1 == k1[0]
assert key1 == k1[1]
assert pfx2 == k2[0]
assert key2 == k2[1]