mirror of
https://gitlab.archlinux.org/archlinux/aurweb.git
synced 2025-02-03 10:43:03 +01:00
add aurweb.auth and authentication to User
+ Added aurweb.auth.AnonymousUser * An instance of this model is returned as the request user when the request is not authenticated + Added aurweb.auth.BasicAuthBackend + Add starlette's AuthenticationMiddleware to app middleware, which uses our BasicAuthBackend facility + Added User.is_authenticated() + Added User.authenticate(password) + Added User.login(request, password) + Added User.logout(request) + Added repr(User(...)) representation + Added aurweb.auth.auth_required decorator. This change uses the same AURSID logic in the PHP implementation. Additionally, introduce a few helpers for authentication, one of which being `User.update_password(password, rounds = 12)` where `rounds` is a configurable number of salt rounds. Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
parent
137c050f99
commit
56f2798279
5 changed files with 412 additions and 20 deletions
|
@ -1,12 +1,15 @@
|
||||||
import http
|
import http
|
||||||
|
import os
|
||||||
|
|
||||||
from fastapi import FastAPI, HTTPException
|
from fastapi import FastAPI, HTTPException
|
||||||
from fastapi.responses import HTMLResponse
|
from fastapi.responses import HTMLResponse
|
||||||
from fastapi.staticfiles import StaticFiles
|
from fastapi.staticfiles import StaticFiles
|
||||||
|
from starlette.middleware.authentication import AuthenticationMiddleware
|
||||||
from starlette.middleware.sessions import SessionMiddleware
|
from starlette.middleware.sessions import SessionMiddleware
|
||||||
|
|
||||||
import aurweb.config
|
import aurweb.config
|
||||||
|
|
||||||
|
from aurweb.auth import BasicAuthBackend
|
||||||
from aurweb.db import get_engine
|
from aurweb.db import get_engine
|
||||||
from aurweb.routers import html, sso, errors
|
from aurweb.routers import html, sso, errors
|
||||||
|
|
||||||
|
@ -32,10 +35,15 @@ async def app_startup():
|
||||||
StaticFiles(directory="web/html/images"),
|
StaticFiles(directory="web/html/images"),
|
||||||
name="static_images")
|
name="static_images")
|
||||||
|
|
||||||
|
# Add application middlewares.
|
||||||
|
app.add_middleware(AuthenticationMiddleware, backend=BasicAuthBackend())
|
||||||
app.add_middleware(SessionMiddleware, secret_key=session_secret)
|
app.add_middleware(SessionMiddleware, secret_key=session_secret)
|
||||||
|
|
||||||
|
# Add application routes.
|
||||||
app.include_router(sso.router)
|
app.include_router(sso.router)
|
||||||
app.include_router(html.router)
|
app.include_router(html.router)
|
||||||
|
|
||||||
|
# Initialize the database engine and ORM.
|
||||||
get_engine()
|
get_engine()
|
||||||
|
|
||||||
# NOTE: Always keep this dictionary updated with all routes
|
# NOTE: Always keep this dictionary updated with all routes
|
||||||
|
|
77
aurweb/auth.py
Normal file
77
aurweb/auth.py
Normal file
|
@ -0,0 +1,77 @@
|
||||||
|
import functools
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from http import HTTPStatus
|
||||||
|
|
||||||
|
from fastapi.responses import RedirectResponse
|
||||||
|
from starlette.authentication import AuthCredentials, AuthenticationBackend, AuthenticationError
|
||||||
|
from starlette.requests import HTTPConnection
|
||||||
|
|
||||||
|
from aurweb.models.session import Session
|
||||||
|
from aurweb.models.user import User
|
||||||
|
from aurweb.templates import make_context, render_template
|
||||||
|
|
||||||
|
|
||||||
|
class AnonymousUser:
|
||||||
|
@staticmethod
|
||||||
|
def is_authenticated():
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class BasicAuthBackend(AuthenticationBackend):
|
||||||
|
async def authenticate(self, conn: HTTPConnection):
|
||||||
|
from aurweb.db import session
|
||||||
|
|
||||||
|
sid = conn.cookies.get("AURSID")
|
||||||
|
if not sid:
|
||||||
|
return None, AnonymousUser()
|
||||||
|
|
||||||
|
now_ts = datetime.utcnow().timestamp()
|
||||||
|
record = session.query(Session).filter(
|
||||||
|
Session.SessionID == sid, Session.LastUpdateTS >= now_ts).first()
|
||||||
|
if not record:
|
||||||
|
return None, AnonymousUser()
|
||||||
|
|
||||||
|
user = session.query(User).filter(User.ID == record.UsersID).first()
|
||||||
|
if not user:
|
||||||
|
raise AuthenticationError(f"Invalid User ID: {record.UsersID}")
|
||||||
|
|
||||||
|
user.authenticated = True
|
||||||
|
return AuthCredentials(["authenticated"]), user
|
||||||
|
|
||||||
|
|
||||||
|
def auth_required(is_required: bool = True,
|
||||||
|
redirect: str = "/",
|
||||||
|
template: tuple = None):
|
||||||
|
""" Authentication route decorator.
|
||||||
|
|
||||||
|
If redirect is given, the user will be redirected if the auth state
|
||||||
|
does not match is_required.
|
||||||
|
|
||||||
|
If template is given, it will be rendered with Unauthorized if
|
||||||
|
is_required does not match and take priority over redirect.
|
||||||
|
|
||||||
|
:param is_required: A boolean indicating whether the function requires auth
|
||||||
|
:param redirect: Path to redirect to if is_required isn't True
|
||||||
|
:param template: A template tuple: ("template.html", "Template Page")
|
||||||
|
"""
|
||||||
|
|
||||||
|
def decorator(func):
|
||||||
|
@functools.wraps(func)
|
||||||
|
async def wrapper(request, *args, **kwargs):
|
||||||
|
if request.user.is_authenticated() != is_required:
|
||||||
|
status_code = int(HTTPStatus.UNAUTHORIZED)
|
||||||
|
url = "/"
|
||||||
|
if redirect:
|
||||||
|
status_code = int(HTTPStatus.SEE_OTHER)
|
||||||
|
url = redirect
|
||||||
|
if template:
|
||||||
|
path, title = template
|
||||||
|
context = make_context(request, title)
|
||||||
|
return render_template(request, path, context,
|
||||||
|
status_code=int(HTTPStatus.UNAUTHORIZED))
|
||||||
|
return RedirectResponse(url=url, status_code=status_code)
|
||||||
|
return await func(request, *args, **kwargs)
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
return decorator
|
|
@ -1,13 +1,25 @@
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import bcrypt
|
||||||
|
|
||||||
|
from fastapi import Request
|
||||||
from sqlalchemy.orm import backref, mapper, relationship
|
from sqlalchemy.orm import backref, mapper, relationship
|
||||||
|
|
||||||
|
import aurweb.config
|
||||||
|
|
||||||
from aurweb.models.account_type import AccountType
|
from aurweb.models.account_type import AccountType
|
||||||
|
from aurweb.models.ban import is_banned
|
||||||
from aurweb.schema import Users
|
from aurweb.schema import Users
|
||||||
|
|
||||||
|
|
||||||
class User:
|
class User:
|
||||||
""" An ORM model of a single Users record. """
|
""" An ORM model of a single Users record. """
|
||||||
|
authenticated = False
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
|
# Set AccountTypeID if it was passed.
|
||||||
self.AccountTypeID = kwargs.get("AccountTypeID")
|
self.AccountTypeID = kwargs.get("AccountTypeID")
|
||||||
|
|
||||||
account_type = kwargs.get("AccountType")
|
account_type = kwargs.get("AccountType")
|
||||||
|
@ -15,22 +27,129 @@ class User:
|
||||||
self.AccountType = account_type
|
self.AccountType = account_type
|
||||||
|
|
||||||
self.Username = kwargs.get("Username")
|
self.Username = kwargs.get("Username")
|
||||||
|
|
||||||
|
self.ResetKey = kwargs.get("ResetKey")
|
||||||
self.Email = kwargs.get("Email")
|
self.Email = kwargs.get("Email")
|
||||||
self.BackupEmail = kwargs.get("BackupEmail")
|
self.BackupEmail = kwargs.get("BackupEmail")
|
||||||
self.Passwd = kwargs.get("Passwd")
|
|
||||||
self.Salt = kwargs.get("Salt")
|
|
||||||
self.RealName = kwargs.get("RealName")
|
self.RealName = kwargs.get("RealName")
|
||||||
self.LangPreference = kwargs.get("LangPreference")
|
self.LangPreference = kwargs.get("LangPreference")
|
||||||
self.Timezone = kwargs.get("Timezone")
|
self.Timezone = kwargs.get("Timezone")
|
||||||
self.Homepage = kwargs.get("Homepage")
|
self.Homepage = kwargs.get("Homepage")
|
||||||
self.IRCNick = kwargs.get("IRCNick")
|
self.IRCNick = kwargs.get("IRCNick")
|
||||||
self.PGPKey = kwargs.get("PGPKey")
|
self.PGPKey = kwargs.get("PGPKey")
|
||||||
self.RegistrationTS = kwargs.get("RegistrationTS")
|
self.RegistrationTS = datetime.utcnow()
|
||||||
self.CommentNotify = kwargs.get("CommentNotify")
|
self.CommentNotify = kwargs.get("CommentNotify")
|
||||||
self.UpdateNotify = kwargs.get("UpdateNotify")
|
self.UpdateNotify = kwargs.get("UpdateNotify")
|
||||||
self.OwnershipNotify = kwargs.get("OwnershipNotify")
|
self.OwnershipNotify = kwargs.get("OwnershipNotify")
|
||||||
self.SSOAccountID = kwargs.get("SSOAccountID")
|
self.SSOAccountID = kwargs.get("SSOAccountID")
|
||||||
|
|
||||||
|
self.Salt = None
|
||||||
|
self.Passwd = str()
|
||||||
|
|
||||||
|
passwd = kwargs.get("Passwd")
|
||||||
|
if passwd:
|
||||||
|
self.update_password(passwd)
|
||||||
|
|
||||||
|
def update_password(self, password, salt_rounds=12):
|
||||||
|
from aurweb.db import session
|
||||||
|
self.Passwd = bcrypt.hashpw(
|
||||||
|
password.encode(),
|
||||||
|
bcrypt.gensalt(rounds=salt_rounds)).decode()
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def minimum_passwd_length():
|
||||||
|
return aurweb.config.getint("options", "passwd_min_len")
|
||||||
|
|
||||||
|
def is_authenticated(self):
|
||||||
|
""" Return internal authenticated state. """
|
||||||
|
return self.authenticated
|
||||||
|
|
||||||
|
def valid_password(self, password: str):
|
||||||
|
""" Check authentication against a given password. """
|
||||||
|
from aurweb.db import session
|
||||||
|
|
||||||
|
if password is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
password_is_valid = False
|
||||||
|
|
||||||
|
try:
|
||||||
|
password_is_valid = bcrypt.checkpw(password.encode(),
|
||||||
|
self.Passwd.encode())
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# If our Salt column is not empty, we're using a legacy password.
|
||||||
|
if not password_is_valid and self.Salt != str():
|
||||||
|
# Try to login with legacy method.
|
||||||
|
password_is_valid = hashlib.md5(
|
||||||
|
f"{self.Salt}{password}".encode()
|
||||||
|
).hexdigest() == self.Passwd
|
||||||
|
|
||||||
|
# We got here, we passed the legacy authentication.
|
||||||
|
# Update the password to our modern hash style.
|
||||||
|
if password_is_valid:
|
||||||
|
self.update_password(password)
|
||||||
|
|
||||||
|
return password_is_valid
|
||||||
|
|
||||||
|
def _login_approved(self, request: Request):
|
||||||
|
return not is_banned(request) and not self.Suspended
|
||||||
|
|
||||||
|
def login(self, request: Request, password: str, session_time=0):
|
||||||
|
""" Login and authenticate a request. """
|
||||||
|
|
||||||
|
from aurweb.db import session
|
||||||
|
from aurweb.models.session import Session, generate_unique_sid
|
||||||
|
|
||||||
|
if not self._login_approved(request):
|
||||||
|
return None
|
||||||
|
|
||||||
|
self.authenticated = self.valid_password(password)
|
||||||
|
if not self.authenticated:
|
||||||
|
return None
|
||||||
|
|
||||||
|
self.LastLogin = now_ts = datetime.utcnow().timestamp()
|
||||||
|
self.LastLoginIPAddress = request.client.host
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
session_ts = now_ts + (
|
||||||
|
session_time if session_time
|
||||||
|
else aurweb.config.getint("options", "login_timeout")
|
||||||
|
)
|
||||||
|
|
||||||
|
sid = None
|
||||||
|
|
||||||
|
if not self.session:
|
||||||
|
sid = generate_unique_sid()
|
||||||
|
self.session = Session(UsersID=self.ID, SessionID=sid,
|
||||||
|
LastUpdateTS=session_ts)
|
||||||
|
session.add(self.session)
|
||||||
|
else:
|
||||||
|
last_updated = self.session.LastUpdateTS
|
||||||
|
if last_updated and last_updated < now_ts:
|
||||||
|
self.session.SessionID = sid = generate_unique_sid()
|
||||||
|
else:
|
||||||
|
# Session is still valid; retrieve the current SID.
|
||||||
|
sid = self.session.SessionID
|
||||||
|
|
||||||
|
self.session.LastUpdateTS = session_ts
|
||||||
|
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
request.cookies["AURSID"] = self.session.SessionID
|
||||||
|
return self.session.SessionID
|
||||||
|
|
||||||
|
def logout(self, request):
|
||||||
|
from aurweb.db import session
|
||||||
|
|
||||||
|
del request.cookies["AURSID"]
|
||||||
|
self.authenticated = False
|
||||||
|
if self.session:
|
||||||
|
session.delete(self.session)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<User(ID='%s', AccountType='%s', Username='%s')>" % (
|
return "<User(ID='%s', AccountType='%s', Username='%s')>" % (
|
||||||
self.ID, str(self.AccountType), self.Username)
|
self.ID, str(self.AccountType), self.Username)
|
||||||
|
|
80
test/test_auth.py
Normal file
80
test/test_auth.py
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from starlette.authentication import AuthenticationError
|
||||||
|
|
||||||
|
from aurweb.db import query
|
||||||
|
from aurweb.auth import BasicAuthBackend
|
||||||
|
from aurweb.models.account_type import AccountType
|
||||||
|
from aurweb.testing import setup_test_db
|
||||||
|
from aurweb.testing.models import make_session, make_user
|
||||||
|
from aurweb.testing.requests import Request
|
||||||
|
|
||||||
|
# Persistent user object, initialized in our setup fixture.
|
||||||
|
user = None
|
||||||
|
backend = None
|
||||||
|
request = None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def setup():
|
||||||
|
global user, backend, request
|
||||||
|
|
||||||
|
setup_test_db("Users", "Sessions")
|
||||||
|
|
||||||
|
from aurweb.db import session
|
||||||
|
|
||||||
|
account_type = query(AccountType,
|
||||||
|
AccountType.AccountType == "User").first()
|
||||||
|
user = make_user(Username="test", Email="test@example.com",
|
||||||
|
RealName="Test User", Passwd="testPassword",
|
||||||
|
AccountType=account_type)
|
||||||
|
|
||||||
|
session.add(user)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
backend = BasicAuthBackend()
|
||||||
|
request = Request()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_auth_backend_missing_sid():
|
||||||
|
# The request has no AURSID cookie, so authentication fails, and
|
||||||
|
# AnonymousUser is returned.
|
||||||
|
_, result = await backend.authenticate(request)
|
||||||
|
assert not result.is_authenticated()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_auth_backend_invalid_sid():
|
||||||
|
# Provide a fake AURSID that won't be found in the database.
|
||||||
|
# This results in our path going down the invalid sid route,
|
||||||
|
# which gives us an AnonymousUser.
|
||||||
|
request.cookies["AURSID"] = "fake"
|
||||||
|
_, result = await backend.authenticate(request)
|
||||||
|
assert not result.is_authenticated()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_auth_backend_invalid_user_id():
|
||||||
|
# Create a new session with a fake user id.
|
||||||
|
now_ts = datetime.utcnow().timestamp()
|
||||||
|
make_session(UsersID=666, SessionID="realSession",
|
||||||
|
LastUpdateTS=now_ts + 5)
|
||||||
|
|
||||||
|
# Here, we specify a real SID; but it's user is not there.
|
||||||
|
request.cookies["AURSID"] = "realSession"
|
||||||
|
with pytest.raises(AuthenticationError, match="Invalid User ID: 666"):
|
||||||
|
await backend.authenticate(request)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_basic_auth_backend():
|
||||||
|
# This time, everything matches up. We expect the user to
|
||||||
|
# equal the real_user.
|
||||||
|
now_ts = datetime.utcnow().timestamp()
|
||||||
|
make_session(UsersID=user.ID, SessionID="realSession",
|
||||||
|
LastUpdateTS=now_ts + 5)
|
||||||
|
_, result = await backend.authenticate(request)
|
||||||
|
assert result == user
|
|
@ -1,48 +1,86 @@
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
import bcrypt
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
import aurweb.auth
|
||||||
import aurweb.config
|
import aurweb.config
|
||||||
|
|
||||||
from aurweb.db import query
|
from aurweb.db import query
|
||||||
from aurweb.models.account_type import AccountType
|
from aurweb.models.account_type import AccountType
|
||||||
|
from aurweb.models.ban import Ban
|
||||||
|
from aurweb.models.session import Session
|
||||||
from aurweb.models.user import User
|
from aurweb.models.user import User
|
||||||
from aurweb.testing import setup_test_db
|
from aurweb.testing import setup_test_db
|
||||||
|
from aurweb.testing.models import make_session, make_user
|
||||||
|
from aurweb.testing.requests import Request
|
||||||
|
|
||||||
|
account_type, user = None, None
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def setup():
|
def setup():
|
||||||
setup_test_db("Users")
|
|
||||||
|
|
||||||
|
|
||||||
def test_user():
|
|
||||||
""" Test creating a user and reading its columns. """
|
|
||||||
from aurweb.db import session
|
from aurweb.db import session
|
||||||
|
|
||||||
# First, grab our target AccountType.
|
global account_type, user
|
||||||
|
|
||||||
|
setup_test_db("Users", "Sessions", "Bans")
|
||||||
|
|
||||||
account_type = session.query(AccountType).filter(
|
account_type = session.query(AccountType).filter(
|
||||||
AccountType.AccountType == "User").first()
|
AccountType.AccountType == "User").first()
|
||||||
|
|
||||||
user = User(
|
user = make_user(Username="test", Email="test@example.org",
|
||||||
AccountType=account_type,
|
RealName="Test User", Passwd="testPassword",
|
||||||
RealName="Test User", Username="test",
|
AccountType=account_type)
|
||||||
Email="test@example.org", Passwd="abcd",
|
|
||||||
IRCNick="tester",
|
|
||||||
Salt="efgh", ResetKey="blahblah")
|
def test_user_login_logout():
|
||||||
session.add(user)
|
""" Test creating a user and reading its columns. """
|
||||||
session.commit()
|
from aurweb.db import session
|
||||||
|
|
||||||
|
# Assert that make_user created a valid user.
|
||||||
|
assert bool(user.ID)
|
||||||
|
|
||||||
|
# Test authentication.
|
||||||
|
assert user.valid_password("testPassword")
|
||||||
|
assert not user.valid_password("badPassword")
|
||||||
|
|
||||||
assert user in account_type.users
|
assert user in account_type.users
|
||||||
|
|
||||||
# Make sure the user was created and given an ID.
|
# Make a raw request.
|
||||||
assert bool(user.ID)
|
request = Request()
|
||||||
|
assert not user.login(request, "badPassword")
|
||||||
|
assert not user.is_authenticated()
|
||||||
|
|
||||||
|
sid = user.login(request, "testPassword")
|
||||||
|
assert sid is not None
|
||||||
|
assert user.is_authenticated()
|
||||||
|
assert "AURSID" in request.cookies
|
||||||
|
|
||||||
|
# Expect that User session relationships work right.
|
||||||
|
user_session = session.query(Session).filter(
|
||||||
|
Session.UsersID == user.ID).first()
|
||||||
|
assert user_session == user.session
|
||||||
|
assert user.session.SessionID == sid
|
||||||
|
assert user.session.User == user
|
||||||
|
|
||||||
# Search for the user via query API.
|
# Search for the user via query API.
|
||||||
result = session.query(User).filter(User.ID == user.ID).first()
|
result = session.query(User).filter(User.ID == user.ID).first()
|
||||||
|
|
||||||
# Compare the result and our original user.
|
# Compare the result and our original user.
|
||||||
|
assert result == user
|
||||||
assert result.ID == user.ID
|
assert result.ID == user.ID
|
||||||
assert result.AccountType.ID == user.AccountType.ID
|
assert result.AccountType.ID == user.AccountType.ID
|
||||||
assert result.Username == user.Username
|
assert result.Username == user.Username
|
||||||
assert result.Email == user.Email
|
assert result.Email == user.Email
|
||||||
|
|
||||||
|
# Test result authenticate methods to ensure they work the same.
|
||||||
|
assert not result.valid_password("badPassword")
|
||||||
|
assert result.valid_password("testPassword")
|
||||||
|
assert result.is_authenticated()
|
||||||
|
|
||||||
# Ensure we've got the correct account type.
|
# Ensure we've got the correct account type.
|
||||||
assert user.AccountType.ID == account_type.ID
|
assert user.AccountType.ID == account_type.ID
|
||||||
assert user.AccountType.AccountType == account_type.AccountType
|
assert user.AccountType.AccountType == account_type.AccountType
|
||||||
|
@ -51,4 +89,74 @@ def test_user():
|
||||||
assert repr(user) == f"<User(ID='{user.ID}', " + \
|
assert repr(user) == f"<User(ID='{user.ID}', " + \
|
||||||
"AccountType='User', Username='test')>"
|
"AccountType='User', Username='test')>"
|
||||||
|
|
||||||
session.delete(user)
|
# Test logout.
|
||||||
|
user.logout(request)
|
||||||
|
assert "AURSID" not in request.cookies
|
||||||
|
assert not user.is_authenticated()
|
||||||
|
|
||||||
|
|
||||||
|
def test_user_login_twice():
|
||||||
|
request = Request()
|
||||||
|
assert user.login(request, "testPassword")
|
||||||
|
assert user.login(request, "testPassword")
|
||||||
|
|
||||||
|
|
||||||
|
def test_user_login_banned():
|
||||||
|
from aurweb.db import session
|
||||||
|
|
||||||
|
# Add ban for the next 30 seconds.
|
||||||
|
banned_timestamp = datetime.utcnow() + timedelta(seconds=30)
|
||||||
|
ban = Ban(IPAddress="127.0.0.1", BanTS=banned_timestamp)
|
||||||
|
session.add(ban)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
request = Request()
|
||||||
|
request.client.host = "127.0.0.1"
|
||||||
|
assert not user.login(request, "testPassword")
|
||||||
|
|
||||||
|
|
||||||
|
def test_user_login_suspended():
|
||||||
|
from aurweb.db import session
|
||||||
|
user.Suspended = True
|
||||||
|
session.commit()
|
||||||
|
assert not user.login(Request(), "testPassword")
|
||||||
|
|
||||||
|
|
||||||
|
def test_legacy_user_authentication():
|
||||||
|
from aurweb.db import session
|
||||||
|
|
||||||
|
user.Salt = bcrypt.gensalt().decode()
|
||||||
|
user.Passwd = hashlib.md5(f"{user.Salt}testPassword".encode()).hexdigest()
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
assert not user.valid_password("badPassword")
|
||||||
|
assert user.valid_password("testPassword")
|
||||||
|
|
||||||
|
# Test by passing a password of None value in.
|
||||||
|
assert not user.valid_password(None)
|
||||||
|
|
||||||
|
|
||||||
|
def test_user_login_with_outdated_sid():
|
||||||
|
from aurweb.db import session
|
||||||
|
|
||||||
|
# Make a session with a LastUpdateTS 5 seconds ago, causing
|
||||||
|
# user.login to update it with a new sid.
|
||||||
|
_session = make_session(UsersID=user.ID, SessionID="stub",
|
||||||
|
LastUpdateTS=datetime.utcnow().timestamp() - 5)
|
||||||
|
sid = user.login(Request(), "testPassword")
|
||||||
|
assert sid and user.is_authenticated()
|
||||||
|
assert sid != "stub"
|
||||||
|
|
||||||
|
session.delete(_session)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def test_user_update_password():
|
||||||
|
user.update_password("secondPassword")
|
||||||
|
assert not user.valid_password("testPassword")
|
||||||
|
assert user.valid_password("secondPassword")
|
||||||
|
|
||||||
|
|
||||||
|
def test_user_minimum_passwd_length():
|
||||||
|
passwd_min_len = aurweb.config.getint("options", "passwd_min_len")
|
||||||
|
assert User.minimum_passwd_length() == passwd_min_len
|
||||||
|
|
Loading…
Add table
Reference in a new issue