mirror of
https://gitlab.archlinux.org/archlinux/aurweb.git
synced 2025-02-03 10:43:03 +01:00
change(python): rework session timing
Previously, we were just relying on the cookie expiration for sessions to expire. We were not cleaning up Session records either. Rework timing to depend on an AURREMEMBER cookie which is now emitted on login during BasicAuthBackend processing. If the SID does still have a session but it's expired, we now delete the session record before returning. Otherwise, we update the session's LastUpdateTS to the current time. In addition, stored the unauthenticated result value in a variable to reduce redundancy. Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
parent
f8bef16d32
commit
8501bba0ac
5 changed files with 50 additions and 24 deletions
|
@ -7,7 +7,6 @@ import fastapi
|
||||||
|
|
||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
from fastapi.responses import RedirectResponse
|
from fastapi.responses import RedirectResponse
|
||||||
from sqlalchemy import and_
|
|
||||||
from starlette.authentication import AuthCredentials, AuthenticationBackend
|
from starlette.authentication import AuthCredentials, AuthenticationBackend
|
||||||
from starlette.requests import HTTPConnection
|
from starlette.requests import HTTPConnection
|
||||||
|
|
||||||
|
@ -97,18 +96,27 @@ class AnonymousUser:
|
||||||
|
|
||||||
class BasicAuthBackend(AuthenticationBackend):
|
class BasicAuthBackend(AuthenticationBackend):
|
||||||
async def authenticate(self, conn: HTTPConnection):
|
async def authenticate(self, conn: HTTPConnection):
|
||||||
|
unauthenticated = (None, AnonymousUser())
|
||||||
sid = conn.cookies.get("AURSID")
|
sid = conn.cookies.get("AURSID")
|
||||||
if not sid:
|
if not sid:
|
||||||
return (None, AnonymousUser())
|
return unauthenticated
|
||||||
|
|
||||||
now_ts = datetime.utcnow().timestamp()
|
timeout = aurweb.config.getint("options", "login_timeout")
|
||||||
record = db.query(Session).filter(
|
remembered = ("AURREMEMBER" in conn.cookies
|
||||||
and_(Session.SessionID == sid,
|
and bool(conn.cookies.get("AURREMEMBER")))
|
||||||
Session.LastUpdateTS >= now_ts)).first()
|
if remembered:
|
||||||
|
timeout = aurweb.config.getint("options",
|
||||||
|
"persistent_cookie_timeout")
|
||||||
|
|
||||||
# If no session with sid and a LastUpdateTS now or later exists.
|
# If no session with sid and a LastUpdateTS now or later exists.
|
||||||
|
now_ts = int(datetime.utcnow().timestamp())
|
||||||
|
record = db.query(Session).filter(Session.SessionID == sid).first()
|
||||||
if not record:
|
if not record:
|
||||||
return (None, AnonymousUser())
|
return unauthenticated
|
||||||
|
elif record.LastUpdateTS < (now_ts - timeout):
|
||||||
|
with db.begin():
|
||||||
|
db.delete_all([record])
|
||||||
|
return unauthenticated
|
||||||
|
|
||||||
# At this point, we cannot have an invalid user if the record
|
# At this point, we cannot have an invalid user if the record
|
||||||
# exists, due to ForeignKey constraints in the schema upheld
|
# exists, due to ForeignKey constraints in the schema upheld
|
||||||
|
|
|
@ -123,10 +123,6 @@ class User(Base):
|
||||||
for i in range(tries):
|
for i in range(tries):
|
||||||
exc = None
|
exc = None
|
||||||
now_ts = datetime.utcnow().timestamp()
|
now_ts = datetime.utcnow().timestamp()
|
||||||
session_ts = now_ts + (
|
|
||||||
session_time if session_time
|
|
||||||
else aurweb.config.getint("options", "login_timeout")
|
|
||||||
)
|
|
||||||
try:
|
try:
|
||||||
with db.begin():
|
with db.begin():
|
||||||
self.LastLogin = now_ts
|
self.LastLogin = now_ts
|
||||||
|
@ -135,12 +131,12 @@ class User(Base):
|
||||||
sid = generate_unique_sid()
|
sid = generate_unique_sid()
|
||||||
self.session = db.create(Session, User=self,
|
self.session = db.create(Session, User=self,
|
||||||
SessionID=sid,
|
SessionID=sid,
|
||||||
LastUpdateTS=session_ts)
|
LastUpdateTS=now_ts)
|
||||||
else:
|
else:
|
||||||
last_updated = self.session.LastUpdateTS
|
last_updated = self.session.LastUpdateTS
|
||||||
if last_updated and last_updated < now_ts:
|
if last_updated and last_updated < now_ts:
|
||||||
self.session.SessionID = generate_unique_sid()
|
self.session.SessionID = generate_unique_sid()
|
||||||
self.session.LastUpdateTS = session_ts
|
self.session.LastUpdateTS = now_ts
|
||||||
break
|
break
|
||||||
except IntegrityError as exc_:
|
except IntegrityError as exc_:
|
||||||
exc = exc_
|
exc = exc_
|
||||||
|
|
|
@ -73,6 +73,10 @@ async def login_post(request: Request,
|
||||||
response.set_cookie("AURLANG", user.LangPreference,
|
response.set_cookie("AURLANG", user.LangPreference,
|
||||||
secure=secure, httponly=secure,
|
secure=secure, httponly=secure,
|
||||||
samesite=cookies.samesite())
|
samesite=cookies.samesite())
|
||||||
|
response.set_cookie("AURREMEMBER", remember_me,
|
||||||
|
expires=expires_at,
|
||||||
|
secure=secure, httponly=secure,
|
||||||
|
samesite=cookies.samesite())
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,7 @@ import pytest
|
||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
from sqlalchemy.exc import IntegrityError
|
from sqlalchemy.exc import IntegrityError
|
||||||
|
|
||||||
from aurweb import db
|
from aurweb import config, db
|
||||||
from aurweb.auth import AnonymousUser, BasicAuthBackend, account_type_required, auth_required
|
from aurweb.auth import AnonymousUser, BasicAuthBackend, account_type_required, auth_required
|
||||||
from aurweb.models.account_type import USER, USER_ID
|
from aurweb.models.account_type import USER, USER_ID
|
||||||
from aurweb.models.session import Session
|
from aurweb.models.session import Session
|
||||||
|
@ -76,6 +76,28 @@ async def test_basic_auth_backend(user: User, backend: BasicAuthBackend):
|
||||||
assert result == user
|
assert result == user
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_expired_session(backend: BasicAuthBackend, user: User):
|
||||||
|
""" Login, expire the session manually, then authenticate. """
|
||||||
|
# First, build a Request with a logged in user.
|
||||||
|
request = Request()
|
||||||
|
request.user = user
|
||||||
|
sid = request.user.login(Request(), "testPassword")
|
||||||
|
request.cookies["AURSID"] = sid
|
||||||
|
|
||||||
|
# Set Session.LastUpdateTS to 20 seconds expired.
|
||||||
|
timeout = config.getint("options", "login_timeout")
|
||||||
|
now_ts = int(datetime.utcnow().timestamp())
|
||||||
|
with db.begin():
|
||||||
|
request.user.session.LastUpdateTS = now_ts - timeout - 20
|
||||||
|
|
||||||
|
# Run through authentication backend and get the session
|
||||||
|
# deleted due to its expiration.
|
||||||
|
await backend.authenticate(request)
|
||||||
|
session = db.query(Session).filter(Session.SessionID == sid).first()
|
||||||
|
assert session is None
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_auth_required_redirection_bad_referrer():
|
async def test_auth_required_redirection_bad_referrer():
|
||||||
# Create a fake route function which can be wrapped by auth_required.
|
# Create a fake route function which can be wrapped by auth_required.
|
||||||
|
|
|
@ -13,7 +13,6 @@ from aurweb.asgi import app
|
||||||
from aurweb.models.account_type import USER_ID
|
from aurweb.models.account_type import USER_ID
|
||||||
from aurweb.models.session import Session
|
from aurweb.models.session import Session
|
||||||
from aurweb.models.user import User
|
from aurweb.models.user import User
|
||||||
from aurweb.testing.requests import Request
|
|
||||||
|
|
||||||
# Some test global constants.
|
# Some test global constants.
|
||||||
TEST_USERNAME = "test"
|
TEST_USERNAME = "test"
|
||||||
|
@ -136,12 +135,11 @@ def test_secure_login(getboolean: bool, client: TestClient, user: User):
|
||||||
|
|
||||||
def test_authenticated_login(client: TestClient, user: User):
|
def test_authenticated_login(client: TestClient, user: User):
|
||||||
post_data = {
|
post_data = {
|
||||||
"user": "test",
|
"user": user.Username,
|
||||||
"passwd": "testPassword",
|
"passwd": "testPassword",
|
||||||
"next": "/"
|
"next": "/"
|
||||||
}
|
}
|
||||||
|
|
||||||
cookies = {"AURSID": user.login(Request(), "testPassword")}
|
|
||||||
with client as request:
|
with client as request:
|
||||||
# Try to login.
|
# Try to login.
|
||||||
response = request.post("/login", data=post_data,
|
response = request.post("/login", data=post_data,
|
||||||
|
@ -153,7 +151,7 @@ def test_authenticated_login(client: TestClient, user: User):
|
||||||
# when requesting GET /login as an authenticated user.
|
# when requesting GET /login as an authenticated user.
|
||||||
# Now, let's verify that we receive 403 Forbidden when we
|
# Now, let's verify that we receive 403 Forbidden when we
|
||||||
# try to get /login as an authenticated user.
|
# try to get /login as an authenticated user.
|
||||||
response = request.get("/login", cookies=cookies,
|
response = request.get("/login", cookies=response.cookies,
|
||||||
allow_redirects=False)
|
allow_redirects=False)
|
||||||
assert response.status_code == int(HTTPStatus.OK)
|
assert response.status_code == int(HTTPStatus.OK)
|
||||||
assert "Logged-in as: <strong>test</strong>" in response.text
|
assert "Logged-in as: <strong>test</strong>" in response.text
|
||||||
|
@ -200,14 +198,12 @@ def test_login_remember_me(client: TestClient, user: User):
|
||||||
|
|
||||||
cookie_timeout = aurweb.config.getint(
|
cookie_timeout = aurweb.config.getint(
|
||||||
"options", "persistent_cookie_timeout")
|
"options", "persistent_cookie_timeout")
|
||||||
expected_ts = datetime.utcnow().timestamp() + cookie_timeout
|
now_ts = int(datetime.utcnow().timestamp())
|
||||||
|
|
||||||
session = db.query(Session).filter(Session.UsersID == user.ID).first()
|
session = db.query(Session).filter(Session.UsersID == user.ID).first()
|
||||||
|
|
||||||
# Expect that LastUpdateTS was within 5 seconds of the expected_ts,
|
# Expect that LastUpdateTS is not past the cookie timeout
|
||||||
# which is equal to the current timestamp + persistent_cookie_timeout.
|
# for a remembered session.
|
||||||
assert session.LastUpdateTS > expected_ts - 5
|
assert session.LastUpdateTS > (now_ts - cookie_timeout)
|
||||||
assert session.LastUpdateTS < expected_ts + 5
|
|
||||||
|
|
||||||
|
|
||||||
def test_login_incorrect_password_remember_me(client: TestClient, user: User):
|
def test_login_incorrect_password_remember_me(client: TestClient, user: User):
|
||||||
|
|
Loading…
Add table
Reference in a new issue