diff --git a/aurweb/models/user.py b/aurweb/models/user.py index f0724202..dcf5f519 100644 --- a/aurweb/models/user.py +++ b/aurweb/models/user.py @@ -1,12 +1,14 @@ import hashlib from datetime import datetime +from http import HTTPStatus from typing import List, Set import bcrypt -from fastapi import Request +from fastapi import HTTPException, Request from sqlalchemy import or_ +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import backref, relationship import aurweb.config @@ -108,33 +110,45 @@ class User(Base): if not self.authenticated: return None - now_ts = datetime.utcnow().timestamp() - session_ts = now_ts + ( - session_time if session_time - else aurweb.config.getint("options", "login_timeout") - ) + # Maximum number of iterations where we attempt to generate + # a unique SID. In cases where the Session table has + # exhausted all possible values, this will catch exceptions + # instead of raising them and include details about failing + # generation in an HTTPException. + tries = 36 - sid = None + exc = None + for i in range(tries): + exc = None + now_ts = datetime.utcnow().timestamp() + session_ts = now_ts + ( + session_time if session_time + else aurweb.config.getint("options", "login_timeout") + ) + try: + with db.begin(): + self.LastLogin = now_ts + self.LastLoginIPAddress = request.client.host + if not self.session: + sid = generate_unique_sid() + self.session = db.create(Session, User=self, + SessionID=sid, + LastUpdateTS=session_ts) + else: + last_updated = self.session.LastUpdateTS + if last_updated and last_updated < now_ts: + self.session.SessionID = generate_unique_sid() + self.session.LastUpdateTS = session_ts + break + except IntegrityError as exc_: + exc = exc_ - with db.begin(): - self.LastLogin = now_ts - self.LastLoginIPAddress = request.client.host - if not self.session: - sid = generate_unique_sid() - self.session = Session(UsersID=self.ID, SessionID=sid, - LastUpdateTS=session_ts) - db.add(self.session) - else: - last_updated = self.session.LastUpdateTS - if last_updated and last_updated < now_ts: - self.session.SessionID = sid = generate_unique_sid() - else: - # Session is still valid; retrieve the current SID. - sid = self.session.SessionID + if exc: + detail = ("Unable to generate a unique session ID in " + f"{tries} iterations.") + raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail=detail) - self.session.LastUpdateTS = session_ts - - request.cookies["AURSID"] = self.session.SessionID return self.session.SessionID def has_credential(self, credential: Set[int], @@ -142,8 +156,7 @@ class User(Base): from aurweb.auth.creds import has_credential return has_credential(self, credential, approved) - def logout(self, request): - del request.cookies["AURSID"] + def logout(self, request: Request): self.authenticated = False if self.session: with db.begin(): diff --git a/test/test_auth_routes.py b/test/test_auth_routes.py index a8d0db11..3455a019 100644 --- a/test/test_auth_routes.py +++ b/test/test_auth_routes.py @@ -283,3 +283,48 @@ def test_login_bad_referer(client: TestClient): response = request.post("/login", data=post_data, headers=BAD_REFERER) assert response.status_code == int(HTTPStatus.BAD_REQUEST) assert "AURSID" not in response.cookies + + +def test_generate_unique_sid_exhausted(client: TestClient, user: User): + """ + In this test, we mock up generate_unique_sid() to infinitely return + the same SessionID given to `user`. Within that mocking, we try + to login as `user2` and expect the internal server error rendering + by our error handler. + + This exercises the bad path of /login, where we can't find a unique + SID to assign the user. + """ + now = int(datetime.utcnow().timestamp()) + with db.begin(): + # Create a second user; we'll login with this one. + user2 = db.create(User, Username="test2", Email="test2@example.org", + ResetKey="testReset", Passwd="testPassword", + AccountTypeID=USER_ID) + + # Create a session with ID == "testSession" for `user`. + db.create(Session, User=user, SessionID="testSession", + LastUpdateTS=now) + + # Mock out generate_unique_sid; always return "testSession" which + # causes us to eventually error out and raise an internal error. + def mock_generate_sid(): + return "testSession" + + # Login as `user2`; we expect an internal server error response + # with a relevent detail. + post_data = { + "user": user2.Username, + "passwd": "testPassword", + "next": "/", + } + generate_unique_sid_ = "aurweb.models.session.generate_unique_sid" + with mock.patch(generate_unique_sid_, mock_generate_sid): + with client as request: + # Set cookies = {} to remove any previous login kept by TestClient. + response = request.post("/login", data=post_data, cookies={}) + assert response.status_code == int(HTTPStatus.INTERNAL_SERVER_ERROR) + + expected = "Unable to generate a unique session ID" + assert expected in response.text + assert "500 - Internal Server Error" in response.text diff --git a/test/test_user.py b/test/test_user.py index 52cdc89e..2c8dd847 100644 --- a/test/test_user.py +++ b/test/test_user.py @@ -62,7 +62,6 @@ def test_user_login_logout(user: User): sid = user.login(request, "testPassword") assert sid is not None assert user.is_authenticated() - assert "AURSID" in request.cookies # Expect that User session relationships work right. user_session = db.query(Session, @@ -92,7 +91,6 @@ def test_user_login_logout(user: User): # Test logout. user.logout(request) - assert "AURSID" not in request.cookies assert not user.is_authenticated()