From 56f2798279f3cbde46389aa65a27fb58bfb0bcfc Mon Sep 17 00:00:00 2001 From: Kevin Morris Date: Fri, 25 Dec 2020 20:54:53 -0800 Subject: [PATCH] add aurweb.auth and authentication to User + Added aurweb.auth.AnonymousUser * An instance of this model is returned as the request user when the request is not authenticated + Added aurweb.auth.BasicAuthBackend + Add starlette's AuthenticationMiddleware to app middleware, which uses our BasicAuthBackend facility + Added User.is_authenticated() + Added User.authenticate(password) + Added User.login(request, password) + Added User.logout(request) + Added repr(User(...)) representation + Added aurweb.auth.auth_required decorator. This change uses the same AURSID logic in the PHP implementation. Additionally, introduce a few helpers for authentication, one of which being `User.update_password(password, rounds = 12)` where `rounds` is a configurable number of salt rounds. Signed-off-by: Kevin Morris --- aurweb/asgi.py | 8 +++ aurweb/auth.py | 77 +++++++++++++++++++++++ aurweb/models/user.py | 125 ++++++++++++++++++++++++++++++++++++- test/test_auth.py | 80 ++++++++++++++++++++++++ test/test_user.py | 142 +++++++++++++++++++++++++++++++++++++----- 5 files changed, 412 insertions(+), 20 deletions(-) create mode 100644 aurweb/auth.py create mode 100644 test/test_auth.py diff --git a/aurweb/asgi.py b/aurweb/asgi.py index c03a00f7..4d21ad03 100644 --- a/aurweb/asgi.py +++ b/aurweb/asgi.py @@ -1,12 +1,15 @@ import http +import os from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles +from starlette.middleware.authentication import AuthenticationMiddleware from starlette.middleware.sessions import SessionMiddleware import aurweb.config +from aurweb.auth import BasicAuthBackend from aurweb.db import get_engine from aurweb.routers import html, sso, errors @@ -32,10 +35,15 @@ async def app_startup(): StaticFiles(directory="web/html/images"), name="static_images") + # Add application middlewares. + app.add_middleware(AuthenticationMiddleware, backend=BasicAuthBackend()) app.add_middleware(SessionMiddleware, secret_key=session_secret) + + # Add application routes. app.include_router(sso.router) app.include_router(html.router) + # Initialize the database engine and ORM. get_engine() # NOTE: Always keep this dictionary updated with all routes diff --git a/aurweb/auth.py b/aurweb/auth.py new file mode 100644 index 00000000..8608a82a --- /dev/null +++ b/aurweb/auth.py @@ -0,0 +1,77 @@ +import functools + +from datetime import datetime +from http import HTTPStatus + +from fastapi.responses import RedirectResponse +from starlette.authentication import AuthCredentials, AuthenticationBackend, AuthenticationError +from starlette.requests import HTTPConnection + +from aurweb.models.session import Session +from aurweb.models.user import User +from aurweb.templates import make_context, render_template + + +class AnonymousUser: + @staticmethod + def is_authenticated(): + return False + + +class BasicAuthBackend(AuthenticationBackend): + async def authenticate(self, conn: HTTPConnection): + from aurweb.db import session + + sid = conn.cookies.get("AURSID") + if not sid: + return None, AnonymousUser() + + now_ts = datetime.utcnow().timestamp() + record = session.query(Session).filter( + Session.SessionID == sid, Session.LastUpdateTS >= now_ts).first() + if not record: + return None, AnonymousUser() + + user = session.query(User).filter(User.ID == record.UsersID).first() + if not user: + raise AuthenticationError(f"Invalid User ID: {record.UsersID}") + + user.authenticated = True + return AuthCredentials(["authenticated"]), user + + +def auth_required(is_required: bool = True, + redirect: str = "/", + template: tuple = None): + """ Authentication route decorator. + + If redirect is given, the user will be redirected if the auth state + does not match is_required. + + If template is given, it will be rendered with Unauthorized if + is_required does not match and take priority over redirect. + + :param is_required: A boolean indicating whether the function requires auth + :param redirect: Path to redirect to if is_required isn't True + :param template: A template tuple: ("template.html", "Template Page") + """ + + def decorator(func): + @functools.wraps(func) + async def wrapper(request, *args, **kwargs): + if request.user.is_authenticated() != is_required: + status_code = int(HTTPStatus.UNAUTHORIZED) + url = "/" + if redirect: + status_code = int(HTTPStatus.SEE_OTHER) + url = redirect + if template: + path, title = template + context = make_context(request, title) + return render_template(request, path, context, + status_code=int(HTTPStatus.UNAUTHORIZED)) + return RedirectResponse(url=url, status_code=status_code) + return await func(request, *args, **kwargs) + return wrapper + + return decorator diff --git a/aurweb/models/user.py b/aurweb/models/user.py index ba91c439..aff4ce6b 100644 --- a/aurweb/models/user.py +++ b/aurweb/models/user.py @@ -1,13 +1,25 @@ +import hashlib + +from datetime import datetime + +import bcrypt + +from fastapi import Request from sqlalchemy.orm import backref, mapper, relationship +import aurweb.config + from aurweb.models.account_type import AccountType +from aurweb.models.ban import is_banned from aurweb.schema import Users class User: """ An ORM model of a single Users record. """ + authenticated = False def __init__(self, **kwargs): + # Set AccountTypeID if it was passed. self.AccountTypeID = kwargs.get("AccountTypeID") account_type = kwargs.get("AccountType") @@ -15,22 +27,129 @@ class User: self.AccountType = account_type self.Username = kwargs.get("Username") + + self.ResetKey = kwargs.get("ResetKey") self.Email = kwargs.get("Email") self.BackupEmail = kwargs.get("BackupEmail") - self.Passwd = kwargs.get("Passwd") - self.Salt = kwargs.get("Salt") self.RealName = kwargs.get("RealName") self.LangPreference = kwargs.get("LangPreference") self.Timezone = kwargs.get("Timezone") self.Homepage = kwargs.get("Homepage") self.IRCNick = kwargs.get("IRCNick") self.PGPKey = kwargs.get("PGPKey") - self.RegistrationTS = kwargs.get("RegistrationTS") + self.RegistrationTS = datetime.utcnow() self.CommentNotify = kwargs.get("CommentNotify") self.UpdateNotify = kwargs.get("UpdateNotify") self.OwnershipNotify = kwargs.get("OwnershipNotify") self.SSOAccountID = kwargs.get("SSOAccountID") + self.Salt = None + self.Passwd = str() + + passwd = kwargs.get("Passwd") + if passwd: + self.update_password(passwd) + + def update_password(self, password, salt_rounds=12): + from aurweb.db import session + self.Passwd = bcrypt.hashpw( + password.encode(), + bcrypt.gensalt(rounds=salt_rounds)).decode() + session.commit() + + @staticmethod + def minimum_passwd_length(): + return aurweb.config.getint("options", "passwd_min_len") + + def is_authenticated(self): + """ Return internal authenticated state. """ + return self.authenticated + + def valid_password(self, password: str): + """ Check authentication against a given password. """ + from aurweb.db import session + + if password is None: + return False + + password_is_valid = False + + try: + password_is_valid = bcrypt.checkpw(password.encode(), + self.Passwd.encode()) + except ValueError: + pass + + # If our Salt column is not empty, we're using a legacy password. + if not password_is_valid and self.Salt != str(): + # Try to login with legacy method. + password_is_valid = hashlib.md5( + f"{self.Salt}{password}".encode() + ).hexdigest() == self.Passwd + + # We got here, we passed the legacy authentication. + # Update the password to our modern hash style. + if password_is_valid: + self.update_password(password) + + return password_is_valid + + def _login_approved(self, request: Request): + return not is_banned(request) and not self.Suspended + + def login(self, request: Request, password: str, session_time=0): + """ Login and authenticate a request. """ + + from aurweb.db import session + from aurweb.models.session import Session, generate_unique_sid + + if not self._login_approved(request): + return None + + self.authenticated = self.valid_password(password) + if not self.authenticated: + return None + + self.LastLogin = now_ts = datetime.utcnow().timestamp() + self.LastLoginIPAddress = request.client.host + session.commit() + + session_ts = now_ts + ( + session_time if session_time + else aurweb.config.getint("options", "login_timeout") + ) + + sid = None + + if not self.session: + sid = generate_unique_sid() + self.session = Session(UsersID=self.ID, SessionID=sid, + LastUpdateTS=session_ts) + session.add(self.session) + else: + last_updated = self.session.LastUpdateTS + if last_updated and last_updated < now_ts: + self.session.SessionID = sid = generate_unique_sid() + else: + # Session is still valid; retrieve the current SID. + sid = self.session.SessionID + + self.session.LastUpdateTS = session_ts + + session.commit() + + request.cookies["AURSID"] = self.session.SessionID + return self.session.SessionID + + def logout(self, request): + from aurweb.db import session + + del request.cookies["AURSID"] + self.authenticated = False + if self.session: + session.delete(self.session) + session.commit() + def __repr__(self): return "" % ( self.ID, str(self.AccountType), self.Username) diff --git a/test/test_auth.py b/test/test_auth.py new file mode 100644 index 00000000..d2251de4 --- /dev/null +++ b/test/test_auth.py @@ -0,0 +1,80 @@ +from datetime import datetime + +import pytest + +from starlette.authentication import AuthenticationError + +from aurweb.db import query +from aurweb.auth import BasicAuthBackend +from aurweb.models.account_type import AccountType +from aurweb.testing import setup_test_db +from aurweb.testing.models import make_session, make_user +from aurweb.testing.requests import Request + +# Persistent user object, initialized in our setup fixture. +user = None +backend = None +request = None + + +@pytest.fixture(autouse=True) +def setup(): + global user, backend, request + + setup_test_db("Users", "Sessions") + + from aurweb.db import session + + account_type = query(AccountType, + AccountType.AccountType == "User").first() + user = make_user(Username="test", Email="test@example.com", + RealName="Test User", Passwd="testPassword", + AccountType=account_type) + + session.add(user) + session.commit() + + backend = BasicAuthBackend() + request = Request() + + +@pytest.mark.asyncio +async def test_auth_backend_missing_sid(): + # The request has no AURSID cookie, so authentication fails, and + # AnonymousUser is returned. + _, result = await backend.authenticate(request) + assert not result.is_authenticated() + + +@pytest.mark.asyncio +async def test_auth_backend_invalid_sid(): + # Provide a fake AURSID that won't be found in the database. + # This results in our path going down the invalid sid route, + # which gives us an AnonymousUser. + request.cookies["AURSID"] = "fake" + _, result = await backend.authenticate(request) + assert not result.is_authenticated() + + +@pytest.mark.asyncio +async def test_auth_backend_invalid_user_id(): + # Create a new session with a fake user id. + now_ts = datetime.utcnow().timestamp() + make_session(UsersID=666, SessionID="realSession", + LastUpdateTS=now_ts + 5) + + # Here, we specify a real SID; but it's user is not there. + request.cookies["AURSID"] = "realSession" + with pytest.raises(AuthenticationError, match="Invalid User ID: 666"): + await backend.authenticate(request) + + +@pytest.mark.asyncio +async def test_basic_auth_backend(): + # This time, everything matches up. We expect the user to + # equal the real_user. + now_ts = datetime.utcnow().timestamp() + make_session(UsersID=user.ID, SessionID="realSession", + LastUpdateTS=now_ts + 5) + _, result = await backend.authenticate(request) + assert result == user diff --git a/test/test_user.py b/test/test_user.py index 5a56a035..b8d4248a 100644 --- a/test/test_user.py +++ b/test/test_user.py @@ -1,48 +1,86 @@ +import hashlib + +from datetime import datetime, timedelta + +import bcrypt import pytest +import aurweb.auth import aurweb.config from aurweb.db import query from aurweb.models.account_type import AccountType +from aurweb.models.ban import Ban +from aurweb.models.session import Session from aurweb.models.user import User from aurweb.testing import setup_test_db +from aurweb.testing.models import make_session, make_user +from aurweb.testing.requests import Request + +account_type, user = None, None @pytest.fixture(autouse=True) def setup(): - setup_test_db("Users") - - -def test_user(): - """ Test creating a user and reading its columns. """ from aurweb.db import session - # First, grab our target AccountType. + global account_type, user + + setup_test_db("Users", "Sessions", "Bans") + account_type = session.query(AccountType).filter( AccountType.AccountType == "User").first() - user = User( - AccountType=account_type, - RealName="Test User", Username="test", - Email="test@example.org", Passwd="abcd", - IRCNick="tester", - Salt="efgh", ResetKey="blahblah") - session.add(user) - session.commit() + user = make_user(Username="test", Email="test@example.org", + RealName="Test User", Passwd="testPassword", + AccountType=account_type) + + +def test_user_login_logout(): + """ Test creating a user and reading its columns. """ + from aurweb.db import session + + # Assert that make_user created a valid user. + assert bool(user.ID) + + # Test authentication. + assert user.valid_password("testPassword") + assert not user.valid_password("badPassword") + assert user in account_type.users - # Make sure the user was created and given an ID. - assert bool(user.ID) + # Make a raw request. + request = Request() + assert not user.login(request, "badPassword") + assert not user.is_authenticated() + + sid = user.login(request, "testPassword") + assert sid is not None + assert user.is_authenticated() + assert "AURSID" in request.cookies + + # Expect that User session relationships work right. + user_session = session.query(Session).filter( + Session.UsersID == user.ID).first() + assert user_session == user.session + assert user.session.SessionID == sid + assert user.session.User == user # Search for the user via query API. result = session.query(User).filter(User.ID == user.ID).first() # Compare the result and our original user. + assert result == user assert result.ID == user.ID assert result.AccountType.ID == user.AccountType.ID assert result.Username == user.Username assert result.Email == user.Email + # Test result authenticate methods to ensure they work the same. + assert not result.valid_password("badPassword") + assert result.valid_password("testPassword") + assert result.is_authenticated() + # Ensure we've got the correct account type. assert user.AccountType.ID == account_type.ID assert user.AccountType.AccountType == account_type.AccountType @@ -51,4 +89,74 @@ def test_user(): assert repr(user) == f"" - session.delete(user) + # Test logout. + user.logout(request) + assert "AURSID" not in request.cookies + assert not user.is_authenticated() + + +def test_user_login_twice(): + request = Request() + assert user.login(request, "testPassword") + assert user.login(request, "testPassword") + + +def test_user_login_banned(): + from aurweb.db import session + + # Add ban for the next 30 seconds. + banned_timestamp = datetime.utcnow() + timedelta(seconds=30) + ban = Ban(IPAddress="127.0.0.1", BanTS=banned_timestamp) + session.add(ban) + session.commit() + + request = Request() + request.client.host = "127.0.0.1" + assert not user.login(request, "testPassword") + + +def test_user_login_suspended(): + from aurweb.db import session + user.Suspended = True + session.commit() + assert not user.login(Request(), "testPassword") + + +def test_legacy_user_authentication(): + from aurweb.db import session + + user.Salt = bcrypt.gensalt().decode() + user.Passwd = hashlib.md5(f"{user.Salt}testPassword".encode()).hexdigest() + session.commit() + + assert not user.valid_password("badPassword") + assert user.valid_password("testPassword") + + # Test by passing a password of None value in. + assert not user.valid_password(None) + + +def test_user_login_with_outdated_sid(): + from aurweb.db import session + + # Make a session with a LastUpdateTS 5 seconds ago, causing + # user.login to update it with a new sid. + _session = make_session(UsersID=user.ID, SessionID="stub", + LastUpdateTS=datetime.utcnow().timestamp() - 5) + sid = user.login(Request(), "testPassword") + assert sid and user.is_authenticated() + assert sid != "stub" + + session.delete(_session) + session.commit() + + +def test_user_update_password(): + user.update_password("secondPassword") + assert not user.valid_password("testPassword") + assert user.valid_password("secondPassword") + + +def test_user_minimum_passwd_length(): + passwd_min_len = aurweb.config.getint("options", "passwd_min_len") + assert User.minimum_passwd_length() == passwd_min_len