feat(fastapi): render a 500 html response when unique SID generation fails

We've seen a bug in the past where unique SID generation fails and
still ends up raising an exception.

This commit reworks how we deal with database exceptions internally,
tries for 36 iterations to set a fresh unique SID, and raises a 500
HTTPException if we were unable to.

Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
Kevin Morris 2021-12-02 23:26:42 -08:00
parent abfd41f31e
commit 806a19b91a
No known key found for this signature in database
GPG key ID: F7E46DED420788F3
3 changed files with 85 additions and 29 deletions

View file

@ -1,12 +1,14 @@
import hashlib import hashlib
from datetime import datetime from datetime import datetime
from http import HTTPStatus
from typing import List, Set from typing import List, Set
import bcrypt import bcrypt
from fastapi import Request from fastapi import HTTPException, Request
from sqlalchemy import or_ from sqlalchemy import or_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import backref, relationship from sqlalchemy.orm import backref, relationship
import aurweb.config import aurweb.config
@ -108,33 +110,45 @@ class User(Base):
if not self.authenticated: if not self.authenticated:
return None return None
now_ts = datetime.utcnow().timestamp() # Maximum number of iterations where we attempt to generate
session_ts = now_ts + ( # a unique SID. In cases where the Session table has
session_time if session_time # exhausted all possible values, this will catch exceptions
else aurweb.config.getint("options", "login_timeout") # instead of raising them and include details about failing
) # generation in an HTTPException.
tries = 36
sid = None exc = None
for i in range(tries):
exc = None
now_ts = datetime.utcnow().timestamp()
session_ts = now_ts + (
session_time if session_time
else aurweb.config.getint("options", "login_timeout")
)
try:
with db.begin():
self.LastLogin = now_ts
self.LastLoginIPAddress = request.client.host
if not self.session:
sid = generate_unique_sid()
self.session = db.create(Session, User=self,
SessionID=sid,
LastUpdateTS=session_ts)
else:
last_updated = self.session.LastUpdateTS
if last_updated and last_updated < now_ts:
self.session.SessionID = generate_unique_sid()
self.session.LastUpdateTS = session_ts
break
except IntegrityError as exc_:
exc = exc_
with db.begin(): if exc:
self.LastLogin = now_ts detail = ("Unable to generate a unique session ID in "
self.LastLoginIPAddress = request.client.host f"{tries} iterations.")
if not self.session: raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
sid = generate_unique_sid() detail=detail)
self.session = Session(UsersID=self.ID, SessionID=sid,
LastUpdateTS=session_ts)
db.add(self.session)
else:
last_updated = self.session.LastUpdateTS
if last_updated and last_updated < now_ts:
self.session.SessionID = sid = generate_unique_sid()
else:
# Session is still valid; retrieve the current SID.
sid = self.session.SessionID
self.session.LastUpdateTS = session_ts
request.cookies["AURSID"] = self.session.SessionID
return self.session.SessionID return self.session.SessionID
def has_credential(self, credential: Set[int], def has_credential(self, credential: Set[int],
@ -142,8 +156,7 @@ class User(Base):
from aurweb.auth.creds import has_credential from aurweb.auth.creds import has_credential
return has_credential(self, credential, approved) return has_credential(self, credential, approved)
def logout(self, request): def logout(self, request: Request):
del request.cookies["AURSID"]
self.authenticated = False self.authenticated = False
if self.session: if self.session:
with db.begin(): with db.begin():

View file

@ -283,3 +283,48 @@ def test_login_bad_referer(client: TestClient):
response = request.post("/login", data=post_data, headers=BAD_REFERER) response = request.post("/login", data=post_data, headers=BAD_REFERER)
assert response.status_code == int(HTTPStatus.BAD_REQUEST) assert response.status_code == int(HTTPStatus.BAD_REQUEST)
assert "AURSID" not in response.cookies assert "AURSID" not in response.cookies
def test_generate_unique_sid_exhausted(client: TestClient, user: User):
"""
In this test, we mock up generate_unique_sid() to infinitely return
the same SessionID given to `user`. Within that mocking, we try
to login as `user2` and expect the internal server error rendering
by our error handler.
This exercises the bad path of /login, where we can't find a unique
SID to assign the user.
"""
now = int(datetime.utcnow().timestamp())
with db.begin():
# Create a second user; we'll login with this one.
user2 = db.create(User, Username="test2", Email="test2@example.org",
ResetKey="testReset", Passwd="testPassword",
AccountTypeID=USER_ID)
# Create a session with ID == "testSession" for `user`.
db.create(Session, User=user, SessionID="testSession",
LastUpdateTS=now)
# Mock out generate_unique_sid; always return "testSession" which
# causes us to eventually error out and raise an internal error.
def mock_generate_sid():
return "testSession"
# Login as `user2`; we expect an internal server error response
# with a relevent detail.
post_data = {
"user": user2.Username,
"passwd": "testPassword",
"next": "/",
}
generate_unique_sid_ = "aurweb.models.session.generate_unique_sid"
with mock.patch(generate_unique_sid_, mock_generate_sid):
with client as request:
# Set cookies = {} to remove any previous login kept by TestClient.
response = request.post("/login", data=post_data, cookies={})
assert response.status_code == int(HTTPStatus.INTERNAL_SERVER_ERROR)
expected = "Unable to generate a unique session ID"
assert expected in response.text
assert "500 - Internal Server Error" in response.text

View file

@ -62,7 +62,6 @@ def test_user_login_logout(user: User):
sid = user.login(request, "testPassword") sid = user.login(request, "testPassword")
assert sid is not None assert sid is not None
assert user.is_authenticated() assert user.is_authenticated()
assert "AURSID" in request.cookies
# Expect that User session relationships work right. # Expect that User session relationships work right.
user_session = db.query(Session, user_session = db.query(Session,
@ -92,7 +91,6 @@ def test_user_login_logout(user: User):
# Test logout. # Test logout.
user.logout(request) user.logout(request)
assert "AURSID" not in request.cookies
assert not user.is_authenticated() assert not user.is_authenticated()