style: Run pre-commit

This commit is contained in:
Joakim Saario 2022-08-21 22:08:29 +02:00
parent b47882b114
commit 9c6c13b78a
No known key found for this signature in database
GPG key ID: D8B76D271B7BD453
235 changed files with 7180 additions and 5628 deletions

View file

@ -8,12 +8,23 @@ from aurweb.models.ssh_pub_key import get_fingerprint
from aurweb.util import strtobool
def simple(U: str = str(), E: str = str(), H: bool = False,
BE: str = str(), R: str = str(), HP: str = str(),
I: str = str(), K: str = str(), J: bool = False,
CN: bool = False, UN: bool = False, ON: bool = False,
S: bool = False, user: models.User = None,
**kwargs) -> None:
def simple(
U: str = str(),
E: str = str(),
H: bool = False,
BE: str = str(),
R: str = str(),
HP: str = str(),
I: str = str(),
K: str = str(),
J: bool = False,
CN: bool = False,
UN: bool = False,
ON: bool = False,
S: bool = False,
user: models.User = None,
**kwargs,
) -> None:
now = time.utcnow()
with db.begin():
user.Username = U or user.Username
@ -31,22 +42,26 @@ def simple(U: str = str(), E: str = str(), H: bool = False,
user.OwnershipNotify = strtobool(ON)
def language(L: str = str(),
request: Request = None,
user: models.User = None,
context: dict[str, Any] = {},
**kwargs) -> None:
def language(
L: str = str(),
request: Request = None,
user: models.User = None,
context: dict[str, Any] = {},
**kwargs,
) -> None:
if L and L != user.LangPreference:
with db.begin():
user.LangPreference = L
context["language"] = L
def timezone(TZ: str = str(),
request: Request = None,
user: models.User = None,
context: dict[str, Any] = {},
**kwargs) -> None:
def timezone(
TZ: str = str(),
request: Request = None,
user: models.User = None,
context: dict[str, Any] = {},
**kwargs,
) -> None:
if TZ and TZ != user.Timezone:
with db.begin():
user.Timezone = TZ
@ -67,8 +82,7 @@ def ssh_pubkey(PK: str = str(), user: models.User = None, **kwargs) -> None:
with db.begin():
# Delete any existing keys we can't find.
to_remove = user.ssh_pub_keys.filter(
~SSHPubKey.Fingerprint.in_(fprints))
to_remove = user.ssh_pub_keys.filter(~SSHPubKey.Fingerprint.in_(fprints))
db.delete_all(to_remove)
# For each key, if it does not yet exist, create it.
@ -79,24 +93,27 @@ def ssh_pubkey(PK: str = str(), user: models.User = None, **kwargs) -> None:
).exists()
if not db.query(exists).scalar():
# No public key exists, create one.
db.create(models.SSHPubKey, UserID=user.ID,
PubKey=" ".join([prefix, key]),
Fingerprint=fprints[i])
db.create(
models.SSHPubKey,
UserID=user.ID,
PubKey=" ".join([prefix, key]),
Fingerprint=fprints[i],
)
def account_type(T: int = None,
user: models.User = None,
**kwargs) -> None:
def account_type(T: int = None, user: models.User = None, **kwargs) -> None:
if T is not None and (T := int(T)) != user.AccountTypeID:
with db.begin():
user.AccountTypeID = T
def password(P: str = str(),
request: Request = None,
user: models.User = None,
context: dict[str, Any] = {},
**kwargs) -> None:
def password(
P: str = str(),
request: Request = None,
user: models.User = None,
context: dict[str, Any] = {},
**kwargs,
) -> None:
if P and not user.valid_password(P):
# Remove the fields we consumed for passwords.
context["P"] = context["C"] = str()

View file

@ -25,42 +25,44 @@ def invalid_fields(E: str = str(), U: str = str(), **kwargs) -> None:
raise ValidationError(["Missing a required field."])
def invalid_suspend_permission(request: Request = None,
user: models.User = None,
S: str = "False",
**kwargs) -> None:
def invalid_suspend_permission(
request: Request = None, user: models.User = None, S: str = "False", **kwargs
) -> None:
if not request.user.is_elevated() and strtobool(S) != bool(user.Suspended):
raise ValidationError([
"You do not have permission to suspend accounts."])
raise ValidationError(["You do not have permission to suspend accounts."])
def invalid_username(request: Request = None, U: str = str(),
_: l10n.Translator = None,
**kwargs) -> None:
def invalid_username(
request: Request = None, U: str = str(), _: l10n.Translator = None, **kwargs
) -> None:
if not util.valid_username(U):
username_min_len = config.getint("options", "username_min_len")
username_max_len = config.getint("options", "username_max_len")
raise ValidationError([
"The username is invalid.",
raise ValidationError(
[
_("It must be between %s and %s characters long") % (
username_min_len, username_max_len),
"Start and end with a letter or number",
"Can contain only one period, underscore or hyphen.",
"The username is invalid.",
[
_("It must be between %s and %s characters long")
% (username_min_len, username_max_len),
"Start and end with a letter or number",
"Can contain only one period, underscore or hyphen.",
],
]
])
)
def invalid_password(P: str = str(), C: str = str(),
_: l10n.Translator = None, **kwargs) -> None:
def invalid_password(
P: str = str(), C: str = str(), _: l10n.Translator = None, **kwargs
) -> None:
if P:
if not util.valid_password(P):
username_min_len = config.getint(
"options", "username_min_len")
raise ValidationError([
_("Your password must be at least %s characters.") % (
username_min_len)
])
username_min_len = config.getint("options", "username_min_len")
raise ValidationError(
[
_("Your password must be at least %s characters.")
% (username_min_len)
]
)
elif not C:
raise ValidationError(["Please confirm your new password."])
elif P != C:
@ -71,15 +73,18 @@ def is_banned(request: Request = None, **kwargs) -> None:
host = request.client.host
exists = db.query(models.Ban, models.Ban.IPAddress == host).exists()
if db.query(exists).scalar():
raise ValidationError([
"Account registration has been disabled for your "
"IP address, probably due to sustained spam attacks. "
"Sorry for the inconvenience."
])
raise ValidationError(
[
"Account registration has been disabled for your "
"IP address, probably due to sustained spam attacks. "
"Sorry for the inconvenience."
]
)
def invalid_user_password(request: Request = None, passwd: str = str(),
**kwargs) -> None:
def invalid_user_password(
request: Request = None, passwd: str = str(), **kwargs
) -> None:
if request.user.is_authenticated():
if not request.user.valid_password(passwd):
raise ValidationError(["Invalid password."])
@ -97,8 +102,9 @@ def invalid_backup_email(BE: str = str(), **kwargs) -> None:
def invalid_homepage(HP: str = str(), **kwargs) -> None:
if HP and not util.valid_homepage(HP):
raise ValidationError([
"The home page is invalid, please specify the full HTTP(s) URL."])
raise ValidationError(
["The home page is invalid, please specify the full HTTP(s) URL."]
)
def invalid_pgp_key(K: str = str(), **kwargs) -> None:
@ -106,8 +112,9 @@ def invalid_pgp_key(K: str = str(), **kwargs) -> None:
raise ValidationError(["The PGP key fingerprint is invalid."])
def invalid_ssh_pubkey(PK: str = str(), user: models.User = None,
_: l10n.Translator = None, **kwargs) -> None:
def invalid_ssh_pubkey(
PK: str = str(), user: models.User = None, _: l10n.Translator = None, **kwargs
) -> None:
if not PK:
return
@ -119,15 +126,23 @@ def invalid_ssh_pubkey(PK: str = str(), user: models.User = None,
for prefix, key in keys:
fingerprint = get_fingerprint(f"{prefix} {key}")
exists = db.query(models.SSHPubKey).filter(
and_(models.SSHPubKey.UserID != user.ID,
models.SSHPubKey.Fingerprint == fingerprint)
).exists()
exists = (
db.query(models.SSHPubKey)
.filter(
and_(
models.SSHPubKey.UserID != user.ID,
models.SSHPubKey.Fingerprint == fingerprint,
)
)
.exists()
)
if db.query(exists).scalar():
raise ValidationError([
_("The SSH public key, %s%s%s, is already in use.") % (
"<strong>", fingerprint, "</strong>")
])
raise ValidationError(
[
_("The SSH public key, %s%s%s, is already in use.")
% ("<strong>", fingerprint, "</strong>")
]
)
def invalid_language(L: str = str(), **kwargs) -> None:
@ -140,60 +155,78 @@ def invalid_timezone(TZ: str = str(), **kwargs) -> None:
raise ValidationError(["Timezone is not currently supported."])
def username_in_use(U: str = str(), user: models.User = None,
_: l10n.Translator = None, **kwargs) -> None:
exists = db.query(models.User).filter(
and_(models.User.ID != user.ID,
models.User.Username == U)
).exists()
def username_in_use(
U: str = str(), user: models.User = None, _: l10n.Translator = None, **kwargs
) -> None:
exists = (
db.query(models.User)
.filter(and_(models.User.ID != user.ID, models.User.Username == U))
.exists()
)
if db.query(exists).scalar():
# If the username already exists...
raise ValidationError([
_("The username, %s%s%s, is already in use.") % (
"<strong>", U, "</strong>")
])
raise ValidationError(
[
_("The username, %s%s%s, is already in use.")
% ("<strong>", U, "</strong>")
]
)
def email_in_use(E: str = str(), user: models.User = None,
_: l10n.Translator = None, **kwargs) -> None:
exists = db.query(models.User).filter(
and_(models.User.ID != user.ID,
models.User.Email == E)
).exists()
def email_in_use(
E: str = str(), user: models.User = None, _: l10n.Translator = None, **kwargs
) -> None:
exists = (
db.query(models.User)
.filter(and_(models.User.ID != user.ID, models.User.Email == E))
.exists()
)
if db.query(exists).scalar():
# If the email already exists...
raise ValidationError([
_("The address, %s%s%s, is already in use.") % (
"<strong>", E, "</strong>")
])
raise ValidationError(
[
_("The address, %s%s%s, is already in use.")
% ("<strong>", E, "</strong>")
]
)
def invalid_account_type(T: int = None, request: Request = None,
user: models.User = None,
_: l10n.Translator = None,
**kwargs) -> None:
def invalid_account_type(
T: int = None,
request: Request = None,
user: models.User = None,
_: l10n.Translator = None,
**kwargs,
) -> None:
if T is not None and (T := int(T)) != user.AccountTypeID:
name = ACCOUNT_TYPE_NAME.get(T, None)
has_cred = request.user.has_credential(creds.ACCOUNT_CHANGE_TYPE)
if name is None:
raise ValidationError(["Invalid account type provided."])
elif not has_cred:
raise ValidationError([
"You do not have permission to change account types."])
raise ValidationError(
["You do not have permission to change account types."]
)
elif T > request.user.AccountTypeID:
# If the chosen account type is higher than the editor's account
# type, the editor doesn't have permission to set the new type.
error = _("You do not have permission to change "
"this user's account type to %s.") % name
error = (
_(
"You do not have permission to change "
"this user's account type to %s."
)
% name
)
raise ValidationError([error])
logger.debug(f"Trusted User '{request.user.Username}' has "
f"modified '{user.Username}' account's type to"
f" {name}.")
logger.debug(
f"Trusted User '{request.user.Username}' has "
f"modified '{user.Username}' account's type to"
f" {name}."
)
def invalid_captcha(captcha_salt: str = None, captcha: str = None,
**kwargs) -> None:
def invalid_captcha(captcha_salt: str = None, captcha: str = None, **kwargs) -> None:
if captcha_salt and captcha_salt not in get_captcha_salts():
raise ValidationError(["This CAPTCHA has expired. Please try again."])