fix(fastapi): only elevated users are allowed to suspend accounts

Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
Kevin Morris 2021-11-08 18:18:41 -08:00
parent 446a082352
commit 85ebc72e8a
No known key found for this signature in database
GPG key ID: F7E46DED420788F3
5 changed files with 72 additions and 9 deletions

View file

@ -51,6 +51,9 @@ class AnonymousUser:
LangPreference = aurweb.config.get("options", "default_lang") LangPreference = aurweb.config.get("options", "default_lang")
Timezone = aurweb.config.get("options", "default_timezone") Timezone = aurweb.config.get("options", "default_timezone")
Suspended = 0
InactivityTS = 0
# A stub ssh_pub_key relationship. # A stub ssh_pub_key relationship.
ssh_pub_key = None ssh_pub_key = None

View file

@ -143,6 +143,10 @@ def process_account_form(request: Request, user: models.User, args: dict):
if not email or not username: if not email or not username:
return (False, ["Missing a required field."]) return (False, ["Missing a required field."])
inactive = args.get("J", False)
if not request.user.is_elevated() and inactive != bool(user.InactivityTS):
return (False, ["You do not have permission to suspend accounts."])
username_min_len = aurweb.config.getint("options", "username_min_len") username_min_len = aurweb.config.getint("options", "username_min_len")
username_max_len = aurweb.config.getint("options", "username_max_len") username_max_len = aurweb.config.getint("options", "username_max_len")
if not util.valid_username(args.get("U")): if not util.valid_username(args.get("U")):
@ -528,7 +532,8 @@ async def account_edit_post(request: Request,
user.Homepage = HP or user.Homepage user.Homepage = HP or user.Homepage
user.IRCNick = I or user.IRCNick user.IRCNick = I or user.IRCNick
user.PGPKey = K or user.PGPKey user.PGPKey = K or user.PGPKey
user.InactivityTS = datetime.utcnow().timestamp() if J else 0 user.Suspended = J
user.InactivityTS = int(datetime.utcnow().timestamp()) * int(J)
# If we update the language, update the cookie as well. # If we update the language, update the cookie as well.
if L and L != user.LangPreference: if L and L != user.LangPreference:

View file

@ -879,6 +879,10 @@ msgstr ""
msgid "Account suspended" msgid "Account suspended"
msgstr "" msgstr ""
#: aurweb/routers/accounts.py
msgid "You do not have permission to suspend accounts."
msgstr ""
#: lib/acctfuncs.inc.php #: lib/acctfuncs.inc.php
#, php-format #, php-format
msgid "" msgid ""

View file

@ -42,14 +42,16 @@
"account is inactive." | tr }}</em> "account is inactive." | tr }}</em>
</p> </p>
<p> {% if request.user.is_elevated() %}
<label for="id_inactive">{% trans %}Inactive{% endtrans %}:</label> <p>
<input id="id_inactive" type="checkbox" name="J" <label for="id_inactive">{% trans %}Inactive{% endtrans %}:</label>
{% if inactive %} <input id="id_inactive" type="checkbox" name="J"
checked="checked" {% if inactive %}
{% endif %} checked="checked"
> {% endif %}
</p> >
</p>
{% endif %}
{% if request.user.has_credential("CRED_ACCOUNT_CHANGE_TYPE") %} {% if request.user.has_credential("CRED_ACCOUNT_CHANGE_TYPE") %}
<p> <p>

View file

@ -780,6 +780,55 @@ def test_post_account_edit_error_invalid_password():
assert "Invalid password." in content assert "Invalid password." in content
def test_post_account_edit_inactivity_unauthorized():
cookies = {"AURSID": user.login(Request(), "testPassword")}
post_data = {
"U": "test",
"E": "test@example.org",
"J": True,
"passwd": "testPassword"
}
with client as request:
resp = request.post(f"/account/{user.Username}/edit", data=post_data,
cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "You do not have permission to suspend accounts."
assert errors[0].text.strip() == expected
def test_post_account_edit_inactivity():
with db.begin():
user.AccountTypeID = TRUSTED_USER_ID
assert not user.Suspended
cookies = {"AURSID": user.login(Request(), "testPassword")}
post_data = {
"U": "test",
"E": "test@example.org",
"J": True,
"passwd": "testPassword"
}
with client as request:
resp = request.post(f"/account/{user.Username}/edit", data=post_data,
cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
# Make sure the user record got updated correctly.
assert user.Suspended
assert user.InactivityTS > 0
post_data.update({"J": False})
with client as request:
resp = request.post(f"/account/{user.Username}/edit", data=post_data,
cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
assert not user.Suspended
assert user.InactivityTS == 0
def test_post_account_edit_error_unauthorized(): def test_post_account_edit_error_unauthorized():
request = Request() request = Request()
sid = user.login(request, "testPassword") sid = user.login(request, "testPassword")