From 806a19b91a3f2e254e1956243a2fff89b123ff4f Mon Sep 17 00:00:00 2001 From: Kevin Morris Date: Thu, 2 Dec 2021 23:26:42 -0800 Subject: [PATCH] feat(fastapi): render a 500 html response when unique SID generation fails We've seen a bug in the past where unique SID generation fails and still ends up raising an exception. This commit reworks how we deal with database exceptions internally, tries for 36 iterations to set a fresh unique SID, and raises a 500 HTTPException if we were unable to. Signed-off-by: Kevin Morris --- aurweb/models/user.py | 67 ++++++++++++++++++++++++---------------- test/test_auth_routes.py | 45 +++++++++++++++++++++++++++ test/test_user.py | 2 -- 3 files changed, 85 insertions(+), 29 deletions(-) diff --git a/aurweb/models/user.py b/aurweb/models/user.py index f0724202..dcf5f519 100644 --- a/aurweb/models/user.py +++ b/aurweb/models/user.py @@ -1,12 +1,14 @@ import hashlib from datetime import datetime +from http import HTTPStatus from typing import List, Set import bcrypt -from fastapi import Request +from fastapi import HTTPException, Request from sqlalchemy import or_ +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import backref, relationship import aurweb.config @@ -108,33 +110,45 @@ class User(Base): if not self.authenticated: return None - now_ts = datetime.utcnow().timestamp() - session_ts = now_ts + ( - session_time if session_time - else aurweb.config.getint("options", "login_timeout") - ) + # Maximum number of iterations where we attempt to generate + # a unique SID. In cases where the Session table has + # exhausted all possible values, this will catch exceptions + # instead of raising them and include details about failing + # generation in an HTTPException. + tries = 36 - sid = None + exc = None + for i in range(tries): + exc = None + now_ts = datetime.utcnow().timestamp() + session_ts = now_ts + ( + session_time if session_time + else aurweb.config.getint("options", "login_timeout") + ) + try: + with db.begin(): + self.LastLogin = now_ts + self.LastLoginIPAddress = request.client.host + if not self.session: + sid = generate_unique_sid() + self.session = db.create(Session, User=self, + SessionID=sid, + LastUpdateTS=session_ts) + else: + last_updated = self.session.LastUpdateTS + if last_updated and last_updated < now_ts: + self.session.SessionID = generate_unique_sid() + self.session.LastUpdateTS = session_ts + break + except IntegrityError as exc_: + exc = exc_ - with db.begin(): - self.LastLogin = now_ts - self.LastLoginIPAddress = request.client.host - if not self.session: - sid = generate_unique_sid() - self.session = Session(UsersID=self.ID, SessionID=sid, - LastUpdateTS=session_ts) - db.add(self.session) - else: - last_updated = self.session.LastUpdateTS - if last_updated and last_updated < now_ts: - self.session.SessionID = sid = generate_unique_sid() - else: - # Session is still valid; retrieve the current SID. - sid = self.session.SessionID + if exc: + detail = ("Unable to generate a unique session ID in " + f"{tries} iterations.") + raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail=detail) - self.session.LastUpdateTS = session_ts - - request.cookies["AURSID"] = self.session.SessionID return self.session.SessionID def has_credential(self, credential: Set[int], @@ -142,8 +156,7 @@ class User(Base): from aurweb.auth.creds import has_credential return has_credential(self, credential, approved) - def logout(self, request): - del request.cookies["AURSID"] + def logout(self, request: Request): self.authenticated = False if self.session: with db.begin(): diff --git a/test/test_auth_routes.py b/test/test_auth_routes.py index a8d0db11..3455a019 100644 --- a/test/test_auth_routes.py +++ b/test/test_auth_routes.py @@ -283,3 +283,48 @@ def test_login_bad_referer(client: TestClient): response = request.post("/login", data=post_data, headers=BAD_REFERER) assert response.status_code == int(HTTPStatus.BAD_REQUEST) assert "AURSID" not in response.cookies + + +def test_generate_unique_sid_exhausted(client: TestClient, user: User): + """ + In this test, we mock up generate_unique_sid() to infinitely return + the same SessionID given to `user`. Within that mocking, we try + to login as `user2` and expect the internal server error rendering + by our error handler. + + This exercises the bad path of /login, where we can't find a unique + SID to assign the user. + """ + now = int(datetime.utcnow().timestamp()) + with db.begin(): + # Create a second user; we'll login with this one. + user2 = db.create(User, Username="test2", Email="test2@example.org", + ResetKey="testReset", Passwd="testPassword", + AccountTypeID=USER_ID) + + # Create a session with ID == "testSession" for `user`. + db.create(Session, User=user, SessionID="testSession", + LastUpdateTS=now) + + # Mock out generate_unique_sid; always return "testSession" which + # causes us to eventually error out and raise an internal error. + def mock_generate_sid(): + return "testSession" + + # Login as `user2`; we expect an internal server error response + # with a relevent detail. + post_data = { + "user": user2.Username, + "passwd": "testPassword", + "next": "/", + } + generate_unique_sid_ = "aurweb.models.session.generate_unique_sid" + with mock.patch(generate_unique_sid_, mock_generate_sid): + with client as request: + # Set cookies = {} to remove any previous login kept by TestClient. + response = request.post("/login", data=post_data, cookies={}) + assert response.status_code == int(HTTPStatus.INTERNAL_SERVER_ERROR) + + expected = "Unable to generate a unique session ID" + assert expected in response.text + assert "500 - Internal Server Error" in response.text diff --git a/test/test_user.py b/test/test_user.py index 52cdc89e..2c8dd847 100644 --- a/test/test_user.py +++ b/test/test_user.py @@ -62,7 +62,6 @@ def test_user_login_logout(user: User): sid = user.login(request, "testPassword") assert sid is not None assert user.is_authenticated() - assert "AURSID" in request.cookies # Expect that User session relationships work right. user_session = db.query(Session, @@ -92,7 +91,6 @@ def test_user_login_logout(user: User): # Test logout. user.logout(request) - assert "AURSID" not in request.cookies assert not user.is_authenticated()