aurweb/aurweb/git/serve.py
moson b35da57187
housekeep: TU rename - UI elements
Rename all UI elements and translations.

Signed-off-by: moson <moson@archlinux.org>
2023-09-10 09:35:05 +02:00

655 lines
20 KiB
Python
Executable file

#!/usr/bin/env python3
import os
import re
import shlex
import subprocess
import sys
import time
import aurweb.config
import aurweb.db
import aurweb.exceptions
notify_cmd = aurweb.config.get("notifications", "notify-cmd")
repo_path = aurweb.config.get("serve", "repo-path")
repo_regex = aurweb.config.get("serve", "repo-regex")
git_shell_cmd = aurweb.config.get("serve", "git-shell-cmd")
git_update_cmd = aurweb.config.get("serve", "git-update-cmd")
ssh_cmdline = aurweb.config.get("serve", "ssh-cmdline")
enable_maintenance = aurweb.config.getboolean("options", "enable-maintenance")
maintenance_exc = aurweb.config.get("options", "maintenance-exceptions").split()
def pkgbase_from_name(pkgbase):
conn = aurweb.db.Connection()
cur = conn.execute("SELECT ID FROM PackageBases WHERE Name = ?", [pkgbase])
row = cur.fetchone()
return row[0] if row else None
def pkgbase_exists(pkgbase):
return pkgbase_from_name(pkgbase) is not None
def list_repos(user):
conn = aurweb.db.Connection()
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
userid = cur.fetchone()[0]
if userid == 0:
raise aurweb.exceptions.InvalidUserException(user)
cur = conn.execute(
"SELECT Name, PackagerUID FROM PackageBases " + "WHERE MaintainerUID = ?",
[userid],
)
for row in cur:
print((" " if row[1] else "*") + row[0])
conn.close()
def validate_pkgbase(pkgbase, user):
if not re.match(repo_regex, pkgbase):
raise aurweb.exceptions.InvalidRepositoryNameException(pkgbase)
if pkgbase_exists(pkgbase):
raise aurweb.exceptions.PackageBaseExistsException(pkgbase)
conn = aurweb.db.Connection()
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
userid = cur.fetchone()[0]
conn.close()
if userid == 0:
raise aurweb.exceptions.InvalidUserException(user)
def pkgbase_adopt(pkgbase, user, privileged):
pkgbase_id = pkgbase_from_name(pkgbase)
if not pkgbase_id:
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
conn = aurweb.db.Connection()
cur = conn.execute(
"SELECT ID FROM PackageBases WHERE ID = ? AND " + "MaintainerUID IS NULL",
[pkgbase_id],
)
if not privileged and not cur.fetchone():
raise aurweb.exceptions.PermissionDeniedException(user)
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
userid = cur.fetchone()[0]
if userid == 0:
raise aurweb.exceptions.InvalidUserException(user)
cur = conn.execute(
"UPDATE PackageBases SET MaintainerUID = ? " + "WHERE ID = ?",
[userid, pkgbase_id],
)
cur = conn.execute(
"SELECT COUNT(*) FROM PackageNotifications WHERE "
+ "PackageBaseID = ? AND UserID = ?",
[pkgbase_id, userid],
)
if cur.fetchone()[0] == 0:
cur = conn.execute(
"INSERT INTO PackageNotifications "
+ "(PackageBaseID, UserID) VALUES (?, ?)",
[pkgbase_id, userid],
)
conn.commit()
subprocess.Popen((notify_cmd, "adopt", str(userid), str(pkgbase_id)))
conn.close()
def pkgbase_get_comaintainers(pkgbase):
conn = aurweb.db.Connection()
cur = conn.execute(
"SELECT UserName FROM PackageComaintainers "
+ "INNER JOIN Users "
+ "ON Users.ID = PackageComaintainers.UsersID "
+ "INNER JOIN PackageBases "
+ "ON PackageBases.ID = PackageComaintainers.PackageBaseID "
+ "WHERE PackageBases.Name = ? "
+ "ORDER BY Priority ASC",
[pkgbase],
)
return [row[0] for row in cur.fetchall()]
def pkgbase_set_comaintainers(pkgbase, userlist, user, privileged):
pkgbase_id = pkgbase_from_name(pkgbase)
if not pkgbase_id:
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
if not privileged and not pkgbase_has_full_access(pkgbase, user):
raise aurweb.exceptions.PermissionDeniedException(user)
conn = aurweb.db.Connection()
userlist_old = set(pkgbase_get_comaintainers(pkgbase))
uids_old = set()
for olduser in userlist_old:
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [olduser])
userid = cur.fetchone()[0]
if userid == 0:
raise aurweb.exceptions.InvalidUserException(user)
uids_old.add(userid)
uids_new = set()
for newuser in userlist:
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [newuser])
userid = cur.fetchone()[0]
if userid == 0:
raise aurweb.exceptions.InvalidUserException(user)
uids_new.add(userid)
uids_add = uids_new - uids_old
uids_rem = uids_old - uids_new
i = 1
for userid in uids_new:
if userid in uids_add:
cur = conn.execute(
"INSERT INTO PackageComaintainers "
+ "(PackageBaseID, UsersID, Priority) "
+ "VALUES (?, ?, ?)",
[pkgbase_id, userid, i],
)
subprocess.Popen(
(notify_cmd, "comaintainer-add", str(userid), str(pkgbase_id))
)
else:
cur = conn.execute(
"UPDATE PackageComaintainers "
+ "SET Priority = ? "
+ "WHERE PackageBaseID = ? AND UsersID = ?",
[i, pkgbase_id, userid],
)
i += 1
for userid in uids_rem:
cur = conn.execute(
"DELETE FROM PackageComaintainers "
+ "WHERE PackageBaseID = ? AND UsersID = ?",
[pkgbase_id, userid],
)
subprocess.Popen(
(notify_cmd, "comaintainer-remove", str(userid), str(pkgbase_id))
)
conn.commit()
conn.close()
def pkgreq_by_pkgbase(pkgbase_id, reqtype):
conn = aurweb.db.Connection()
cur = conn.execute(
"SELECT PackageRequests.ID FROM PackageRequests "
+ "INNER JOIN RequestTypes ON "
+ "RequestTypes.ID = PackageRequests.ReqTypeID "
+ "WHERE PackageRequests.Status = 0 "
+ "AND PackageRequests.PackageBaseID = ? "
+ "AND RequestTypes.Name = ?",
[pkgbase_id, reqtype],
)
return [row[0] for row in cur.fetchall()]
def pkgreq_close(reqid, user, reason, comments, autoclose=False):
statusmap = {"accepted": 2, "rejected": 3}
if reason not in statusmap:
raise aurweb.exceptions.InvalidReasonException(reason)
status = statusmap[reason]
conn = aurweb.db.Connection()
if autoclose:
userid = None
else:
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
userid = cur.fetchone()[0]
if userid == 0:
raise aurweb.exceptions.InvalidUserException(user)
now = int(time.time())
conn.execute(
"UPDATE PackageRequests SET Status = ?, ClosedTS = ?, "
+ "ClosedUID = ?, ClosureComment = ? "
+ "WHERE ID = ?",
[status, now, userid, comments, reqid],
)
conn.commit()
conn.close()
if not userid:
userid = 0
subprocess.Popen(
(notify_cmd, "request-close", str(userid), str(reqid), reason)
).wait()
def pkgbase_disown(pkgbase, user, privileged):
pkgbase_id = pkgbase_from_name(pkgbase)
if not pkgbase_id:
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
initialized_by_owner = pkgbase_has_full_access(pkgbase, user)
if not privileged and not initialized_by_owner:
raise aurweb.exceptions.PermissionDeniedException(user)
# TODO: Support disowning package bases via package request.
# Scan through pending orphan requests and close them.
comment = "The user {:s} disowned the package.".format(user)
for reqid in pkgreq_by_pkgbase(pkgbase_id, "orphan"):
pkgreq_close(reqid, user, "accepted", comment, True)
comaintainers = []
new_maintainer_userid = None
conn = aurweb.db.Connection()
# Make the first co-maintainer the new maintainer, unless the action was
# enforced by a Package Maintainer.
if initialized_by_owner:
comaintainers = pkgbase_get_comaintainers(pkgbase)
if len(comaintainers) > 0:
new_maintainer = comaintainers[0]
cur = conn.execute(
"SELECT ID FROM Users WHERE Username = ?", [new_maintainer]
)
new_maintainer_userid = cur.fetchone()[0]
comaintainers.remove(new_maintainer)
pkgbase_set_comaintainers(pkgbase, comaintainers, user, privileged)
cur = conn.execute(
"UPDATE PackageBases SET MaintainerUID = ? " + "WHERE ID = ?",
[new_maintainer_userid, pkgbase_id],
)
conn.commit()
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
userid = cur.fetchone()[0]
if userid == 0:
raise aurweb.exceptions.InvalidUserException(user)
subprocess.Popen((notify_cmd, "disown", str(userid), str(pkgbase_id)))
conn.close()
def pkgbase_flag(pkgbase, user, comment):
pkgbase_id = pkgbase_from_name(pkgbase)
if not pkgbase_id:
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
if len(comment) < 3:
raise aurweb.exceptions.InvalidCommentException(comment)
conn = aurweb.db.Connection()
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
userid = cur.fetchone()[0]
if userid == 0:
raise aurweb.exceptions.InvalidUserException(user)
now = int(time.time())
conn.execute(
"UPDATE PackageBases SET "
+ "OutOfDateTS = ?, FlaggerUID = ?, FlaggerComment = ? "
+ "WHERE ID = ? AND OutOfDateTS IS NULL",
[now, userid, comment, pkgbase_id],
)
conn.commit()
subprocess.Popen((notify_cmd, "flag", str(userid), str(pkgbase_id)))
def pkgbase_unflag(pkgbase, user):
pkgbase_id = pkgbase_from_name(pkgbase)
if not pkgbase_id:
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
conn = aurweb.db.Connection()
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
userid = cur.fetchone()[0]
if userid == 0:
raise aurweb.exceptions.InvalidUserException(user)
if user in pkgbase_get_comaintainers(pkgbase):
conn.execute(
"UPDATE PackageBases SET OutOfDateTS = NULL " + "WHERE ID = ?", [pkgbase_id]
)
else:
conn.execute(
"UPDATE PackageBases SET OutOfDateTS = NULL "
+ "WHERE ID = ? AND (MaintainerUID = ? OR FlaggerUID = ?)",
[pkgbase_id, userid, userid],
)
conn.commit()
def pkgbase_vote(pkgbase, user):
pkgbase_id = pkgbase_from_name(pkgbase)
if not pkgbase_id:
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
conn = aurweb.db.Connection()
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
userid = cur.fetchone()[0]
if userid == 0:
raise aurweb.exceptions.InvalidUserException(user)
cur = conn.execute(
"SELECT COUNT(*) FROM PackageVotes "
+ "WHERE UsersID = ? AND PackageBaseID = ?",
[userid, pkgbase_id],
)
if cur.fetchone()[0] > 0:
raise aurweb.exceptions.AlreadyVotedException(pkgbase)
now = int(time.time())
conn.execute(
"INSERT INTO PackageVotes (UsersID, PackageBaseID, VoteTS) "
+ "VALUES (?, ?, ?)",
[userid, pkgbase_id, now],
)
conn.execute(
"UPDATE PackageBases SET NumVotes = NumVotes + 1 " + "WHERE ID = ?",
[pkgbase_id],
)
conn.commit()
def pkgbase_unvote(pkgbase, user):
pkgbase_id = pkgbase_from_name(pkgbase)
if not pkgbase_id:
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
conn = aurweb.db.Connection()
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
userid = cur.fetchone()[0]
if userid == 0:
raise aurweb.exceptions.InvalidUserException(user)
cur = conn.execute(
"SELECT COUNT(*) FROM PackageVotes "
+ "WHERE UsersID = ? AND PackageBaseID = ?",
[userid, pkgbase_id],
)
if cur.fetchone()[0] == 0:
raise aurweb.exceptions.NotVotedException(pkgbase)
conn.execute(
"DELETE FROM PackageVotes WHERE UsersID = ? AND " + "PackageBaseID = ?",
[userid, pkgbase_id],
)
conn.execute(
"UPDATE PackageBases SET NumVotes = NumVotes - 1 " + "WHERE ID = ?",
[pkgbase_id],
)
conn.commit()
def pkgbase_set_keywords(pkgbase, keywords):
pkgbase_id = pkgbase_from_name(pkgbase)
if not pkgbase_id:
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
conn = aurweb.db.Connection()
conn.execute("DELETE FROM PackageKeywords WHERE PackageBaseID = ?", [pkgbase_id])
for keyword in keywords:
conn.execute(
"INSERT INTO PackageKeywords (PackageBaseID, Keyword) " + "VALUES (?, ?)",
[pkgbase_id, keyword],
)
conn.commit()
conn.close()
def pkgbase_has_write_access(pkgbase, user):
conn = aurweb.db.Connection()
cur = conn.execute(
"SELECT COUNT(*) FROM PackageBases "
+ "LEFT JOIN PackageComaintainers "
+ "ON PackageComaintainers.PackageBaseID = PackageBases.ID "
+ "INNER JOIN Users "
+ "ON Users.ID = PackageBases.MaintainerUID "
+ "OR PackageBases.MaintainerUID IS NULL "
+ "OR Users.ID = PackageComaintainers.UsersID "
+ "WHERE Name = ? AND Username = ?",
[pkgbase, user],
)
return cur.fetchone()[0] > 0
def pkgbase_has_full_access(pkgbase, user):
conn = aurweb.db.Connection()
cur = conn.execute(
"SELECT COUNT(*) FROM PackageBases "
+ "INNER JOIN Users "
+ "ON Users.ID = PackageBases.MaintainerUID "
+ "WHERE Name = ? AND Username = ?",
[pkgbase, user],
)
return cur.fetchone()[0] > 0
def log_ssh_login(user, remote_addr):
conn = aurweb.db.Connection()
now = int(time.time())
conn.execute(
"UPDATE Users SET LastSSHLogin = ?, "
+ "LastSSHLoginIPAddress = ? WHERE Username = ?",
[now, remote_addr, user],
)
conn.commit()
conn.close()
def bans_match(remote_addr):
conn = aurweb.db.Connection()
cur = conn.execute("SELECT COUNT(*) FROM Bans WHERE IPAddress = ?", [remote_addr])
return cur.fetchone()[0] > 0
def die(msg):
sys.stderr.write("{:s}\n".format(msg))
exit(1)
def die_with_help(msg):
die(msg + "\nTry `{:s} help` for a list of commands.".format(ssh_cmdline))
def warn(msg):
sys.stderr.write("warning: {:s}\n".format(msg))
def usage(cmds):
sys.stderr.write("Commands:\n")
colwidth = max([len(cmd) for cmd in cmds.keys()]) + 4
for key in sorted(cmds):
sys.stderr.write(" " + key.ljust(colwidth) + cmds[key] + "\n")
exit(0)
def checkarg_atleast(cmdargv, *argdesc):
if len(cmdargv) - 1 < len(argdesc):
msg = "missing {:s}".format(argdesc[len(cmdargv) - 1])
raise aurweb.exceptions.InvalidArgumentsException(msg)
def checkarg_atmost(cmdargv, *argdesc):
if len(cmdargv) - 1 > len(argdesc):
raise aurweb.exceptions.InvalidArgumentsException("too many arguments")
def checkarg(cmdargv, *argdesc):
checkarg_atleast(cmdargv, *argdesc)
checkarg_atmost(cmdargv, *argdesc)
def serve(action, cmdargv, user, privileged, remote_addr): # noqa: C901
if enable_maintenance:
if remote_addr not in maintenance_exc:
raise aurweb.exceptions.MaintenanceException
if bans_match(remote_addr):
raise aurweb.exceptions.BannedException
log_ssh_login(user, remote_addr)
if action == "git" and cmdargv[1] in ("upload-pack", "receive-pack"):
action = action + "-" + cmdargv[1]
del cmdargv[1]
if action == "git-upload-pack" or action == "git-receive-pack":
checkarg(cmdargv, "path")
path = cmdargv[1].rstrip("/")
if not path.startswith("/"):
path = "/" + path
if not path.endswith(".git"):
path = path + ".git"
pkgbase = path[1:-4]
if not re.match(repo_regex, pkgbase):
raise aurweb.exceptions.InvalidRepositoryNameException(pkgbase)
if action == "git-receive-pack" and pkgbase_exists(pkgbase):
if not privileged and not pkgbase_has_write_access(pkgbase, user):
raise aurweb.exceptions.PermissionDeniedException(user)
if not os.access(git_update_cmd, os.R_OK | os.X_OK):
raise aurweb.exceptions.BrokenUpdateHookException(git_update_cmd)
os.environ["AUR_USER"] = user
os.environ["AUR_PKGBASE"] = pkgbase
os.environ["GIT_NAMESPACE"] = pkgbase
cmd = action + " '" + repo_path + "'"
os.execl(git_shell_cmd, git_shell_cmd, "-c", cmd)
elif action == "set-keywords":
checkarg_atleast(cmdargv, "repository name")
pkgbase_set_keywords(cmdargv[1], cmdargv[2:])
elif action == "list-repos":
checkarg(cmdargv)
list_repos(user)
elif action == "restore":
checkarg(cmdargv, "repository name")
pkgbase = cmdargv[1]
validate_pkgbase(pkgbase, user)
os.environ["AUR_USER"] = user
os.environ["AUR_PKGBASE"] = pkgbase
os.execl(git_update_cmd, git_update_cmd, "restore")
elif action == "adopt":
checkarg(cmdargv, "repository name")
pkgbase = cmdargv[1]
pkgbase_adopt(pkgbase, user, privileged)
elif action == "disown":
checkarg(cmdargv, "repository name")
pkgbase = cmdargv[1]
pkgbase_disown(pkgbase, user, privileged)
elif action == "flag":
checkarg(cmdargv, "repository name", "comment")
pkgbase = cmdargv[1]
comment = cmdargv[2]
pkgbase_flag(pkgbase, user, comment)
elif action == "unflag":
checkarg(cmdargv, "repository name")
pkgbase = cmdargv[1]
pkgbase_unflag(pkgbase, user)
elif action == "vote":
checkarg(cmdargv, "repository name")
pkgbase = cmdargv[1]
pkgbase_vote(pkgbase, user)
elif action == "unvote":
checkarg(cmdargv, "repository name")
pkgbase = cmdargv[1]
pkgbase_unvote(pkgbase, user)
elif action == "set-comaintainers":
checkarg_atleast(cmdargv, "repository name")
pkgbase = cmdargv[1]
userlist = cmdargv[2:]
pkgbase_set_comaintainers(pkgbase, userlist, user, privileged)
elif action == "help":
cmds = {
"adopt <name>": "Adopt a package base.",
"disown <name>": "Disown a package base.",
"flag <name> <comment>": "Flag a package base out-of-date.",
"help": "Show this help message and exit.",
"list-repos": "List all your repositories.",
"restore <name>": "Restore a deleted package base.",
"set-comaintainers <name> [...]": "Set package base co-maintainers.",
"set-keywords <name> [...]": "Change package base keywords.",
"unflag <name>": "Remove out-of-date flag from a package base.",
"unvote <name>": "Remove vote from a package base.",
"vote <name>": "Vote for a package base.",
"git-receive-pack": "Internal command used with Git.",
"git-upload-pack": "Internal command used with Git.",
}
usage(cmds)
else:
msg = "invalid command: {:s}".format(action)
raise aurweb.exceptions.InvalidArgumentsException(msg)
def main():
user = os.environ.get("AUR_USER")
privileged = os.environ.get("AUR_PRIVILEGED", "0") == "1"
ssh_cmd = os.environ.get("SSH_ORIGINAL_COMMAND")
ssh_client = os.environ.get("SSH_CLIENT")
if not ssh_cmd:
die_with_help(f"Welcome to AUR, {user}! Interactive shell is disabled.")
cmdargv = shlex.split(ssh_cmd)
action = cmdargv[0]
remote_addr = ssh_client.split(" ")[0] if ssh_client else None
try:
serve(action, cmdargv, user, privileged, remote_addr)
except aurweb.exceptions.MaintenanceException:
die("The AUR is down due to maintenance. We will be back soon.")
except aurweb.exceptions.BannedException:
die("The SSH interface is disabled for your IP address.")
except aurweb.exceptions.InvalidArgumentsException as e:
die_with_help("{:s}: {}".format(action, e))
except aurweb.exceptions.AurwebException as e:
die("{:s}: {}".format(action, e))
if __name__ == "__main__":
main()