mirror of
https://gitlab.archlinux.org/archlinux/aurweb.git
synced 2025-02-03 10:43:03 +01:00
git-serve: Use Python exceptions for error handling
Make it easier to reuse the helper functions provided by git-serve from another Python script by throwing exceptions instead of terminating the program on errors. Signed-off-by: Lukas Fleischer <lfleischer@archlinux.org>
This commit is contained in:
parent
6d8edafe77
commit
8914a41db9
2 changed files with 131 additions and 79 deletions
53
aurweb/exceptions.py
Normal file
53
aurweb/exceptions.py
Normal file
|
@ -0,0 +1,53 @@
|
|||
class AurwebException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class MaintenanceException(AurwebException):
|
||||
pass
|
||||
|
||||
|
||||
class PermissionDeniedException(AurwebException):
|
||||
def __init__(self, user):
|
||||
msg = 'permission denied: {:s}'.format(user)
|
||||
super(PermissionDeniedException, self).__init__(msg)
|
||||
|
||||
|
||||
class InvalidUserException(AurwebException):
|
||||
def __init__(self, user):
|
||||
msg = 'unknown user: {:s}'.format(user)
|
||||
super(InvalidUserException, self).__init__(msg)
|
||||
|
||||
|
||||
class InvalidPackageBaseException(AurwebException):
|
||||
def __init__(self, pkgbase):
|
||||
msg = 'package base not found: {:s}'.format(pkgbase)
|
||||
super(InvalidPackageBaseException, self).__init__(msg)
|
||||
|
||||
|
||||
class InvalidRepositoryNameException(AurwebException):
|
||||
def __init__(self, pkgbase):
|
||||
msg = 'invalid repository name: {:s}'.format(pkgbase)
|
||||
super(InvalidRepositoryNameException, self).__init__(msg)
|
||||
|
||||
|
||||
class PackageBaseExistsException(AurwebException):
|
||||
def __init__(self, pkgbase):
|
||||
msg = 'package base already exists: {:s}'.format(pkgbase)
|
||||
super(PackageBaseExistsException, self).__init__(msg)
|
||||
|
||||
|
||||
class InvalidReasonException(AurwebException):
|
||||
def __init__(self, reason):
|
||||
msg = 'invalid reason: {:s}'.format(reason)
|
||||
super(InvalidReasonException, self).__init__(msg)
|
||||
|
||||
|
||||
class InvalidCommentException(AurwebException):
|
||||
def __init__(self, comment):
|
||||
msg = 'comment is too short: {:s}'.format(comment)
|
||||
super(InvalidCommentException, self).__init__(msg)
|
||||
|
||||
|
||||
class InvalidArgumentsException(AurwebException):
|
||||
def __init__(self, msg):
|
||||
super(InvalidArgumentsException, self).__init__(msg)
|
|
@ -9,6 +9,7 @@ import time
|
|||
|
||||
import aurweb.config
|
||||
import aurweb.db
|
||||
import aurweb.exceptions
|
||||
|
||||
notify_cmd = aurweb.config.get('notifications', 'notify-cmd')
|
||||
|
||||
|
@ -40,7 +41,7 @@ def list_repos(user):
|
|||
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
|
||||
userid = cur.fetchone()[0]
|
||||
if userid == 0:
|
||||
die('{:s}: unknown user: {:s}'.format(action, user))
|
||||
raise aurweb.exceptions.InvalidUserException(user)
|
||||
|
||||
cur = conn.execute("SELECT Name, PackagerUID FROM PackageBases " +
|
||||
"WHERE MaintainerUID = ?", [userid])
|
||||
|
@ -51,16 +52,16 @@ def list_repos(user):
|
|||
|
||||
def create_pkgbase(pkgbase, user):
|
||||
if not re.match(repo_regex, pkgbase):
|
||||
die('{:s}: invalid repository name: {:s}'.format(action, pkgbase))
|
||||
raise aurweb.exceptions.InvalidRepositoryNameException(pkgbase)
|
||||
if pkgbase_exists(pkgbase):
|
||||
die('{:s}: package base already exists: {:s}'.format(action, pkgbase))
|
||||
raise aurweb.exceptions.PackageBaseExistsException(pkgbase)
|
||||
|
||||
conn = aurweb.db.Connection()
|
||||
|
||||
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
|
||||
userid = cur.fetchone()[0]
|
||||
if userid == 0:
|
||||
die('{:s}: unknown user: {:s}'.format(action, user))
|
||||
raise aurweb.exceptions.InvalidUserException(user)
|
||||
|
||||
now = int(time.time())
|
||||
cur = conn.execute("INSERT INTO PackageBases (Name, SubmittedTS, " +
|
||||
|
@ -79,19 +80,19 @@ def create_pkgbase(pkgbase, user):
|
|||
def pkgbase_adopt(pkgbase, user, privileged):
|
||||
pkgbase_id = pkgbase_from_name(pkgbase)
|
||||
if not pkgbase_id:
|
||||
die('{:s}: package base not found: {:s}'.format(action, pkgbase))
|
||||
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
|
||||
|
||||
conn = aurweb.db.Connection()
|
||||
|
||||
cur = conn.execute("SELECT ID FROM PackageBases WHERE ID = ? AND " +
|
||||
"MaintainerUID IS NULL", [pkgbase_id])
|
||||
if not privileged and not cur.fetchone():
|
||||
die('{:s}: permission denied: {:s}'.format(action, user))
|
||||
raise aurweb.exceptions.PermissionDeniedException(user)
|
||||
|
||||
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
|
||||
userid = cur.fetchone()[0]
|
||||
if userid == 0:
|
||||
die('{:s}: unknown user: {:s}'.format(action, user))
|
||||
raise aurweb.exceptions.InvalidUserException(user)
|
||||
|
||||
cur = conn.execute("UPDATE PackageBases SET MaintainerUID = ? " +
|
||||
"WHERE ID = ?", [userid, pkgbase_id])
|
||||
|
@ -127,10 +128,10 @@ def pkgbase_get_comaintainers(pkgbase):
|
|||
def pkgbase_set_comaintainers(pkgbase, userlist, user, privileged):
|
||||
pkgbase_id = pkgbase_from_name(pkgbase)
|
||||
if not pkgbase_id:
|
||||
die('{:s}: package base not found: {:s}'.format(action, pkgbase))
|
||||
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
|
||||
|
||||
if not privileged and not pkgbase_has_full_access(pkgbase, user):
|
||||
die('{:s}: permission denied: {:s}'.format(action, user))
|
||||
raise aurweb.exceptions.PermissionDeniedException(user)
|
||||
|
||||
conn = aurweb.db.Connection()
|
||||
|
||||
|
@ -142,7 +143,7 @@ def pkgbase_set_comaintainers(pkgbase, userlist, user, privileged):
|
|||
[olduser])
|
||||
userid = cur.fetchone()[0]
|
||||
if userid == 0:
|
||||
die('{:s}: unknown user: {:s}'.format(action, user))
|
||||
raise aurweb.exceptions.InvalidUserException(user)
|
||||
uids_old.add(userid)
|
||||
|
||||
uids_new = set()
|
||||
|
@ -151,7 +152,7 @@ def pkgbase_set_comaintainers(pkgbase, userlist, user, privileged):
|
|||
[newuser])
|
||||
userid = cur.fetchone()[0]
|
||||
if userid == 0:
|
||||
die('{:s}: unknown user: {:s}'.format(action, user))
|
||||
raise aurweb.exceptions.InvalidUserException(user)
|
||||
uids_new.add(userid)
|
||||
|
||||
uids_add = uids_new - uids_old
|
||||
|
@ -196,10 +197,10 @@ def pkgreq_by_pkgbase(pkgbase_id, reqtype):
|
|||
return [row[0] for row in cur.fetchall()]
|
||||
|
||||
|
||||
def pkgreq_close(reqid, reason, comments, autoclose=False):
|
||||
def pkgreq_close(reqid, user, reason, comments, autoclose=False):
|
||||
statusmap = {'accepted': 2, 'rejected': 3}
|
||||
if reason not in statusmap:
|
||||
die('{:s}: invalid reason: {:s}'.format(action, reason))
|
||||
raise aurweb.exceptions.InvalidReasonException(reason)
|
||||
status = statusmap[reason]
|
||||
|
||||
conn = aurweb.db.Connection()
|
||||
|
@ -210,7 +211,7 @@ def pkgreq_close(reqid, reason, comments, autoclose=False):
|
|||
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
|
||||
userid = cur.fetchone()[0]
|
||||
if userid == 0:
|
||||
die('{:s}: unknown user: {:s}'.format(action, user))
|
||||
raise aurweb.exceptions.InvalidUserException(user)
|
||||
|
||||
conn.execute("UPDATE PackageRequests SET Status = ?, ClosureComment = ? " +
|
||||
"WHERE ID = ?", [status, comments, reqid])
|
||||
|
@ -224,18 +225,18 @@ def pkgreq_close(reqid, reason, comments, autoclose=False):
|
|||
def pkgbase_disown(pkgbase, user, privileged):
|
||||
pkgbase_id = pkgbase_from_name(pkgbase)
|
||||
if not pkgbase_id:
|
||||
die('{:s}: package base not found: {:s}'.format(action, pkgbase))
|
||||
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
|
||||
|
||||
initialized_by_owner = pkgbase_has_full_access(pkgbase, user)
|
||||
if not privileged and not initialized_by_owner:
|
||||
die('{:s}: permission denied: {:s}'.format(action, user))
|
||||
raise aurweb.exceptions.PermissionDeniedException(user)
|
||||
|
||||
# TODO: Support disowning package bases via package request.
|
||||
|
||||
# Scan through pending orphan requests and close them.
|
||||
comment = 'The user {:s} disowned the package.'.format(user)
|
||||
for reqid in pkgreq_by_pkgbase(pkgbase_id, 'orphan'):
|
||||
pkgreq_close(reqid, 'accepted', comment, True)
|
||||
pkgreq_close(reqid, user, 'accepted', comment, True)
|
||||
|
||||
comaintainers = []
|
||||
new_maintainer_userid = None
|
||||
|
@ -262,7 +263,7 @@ def pkgbase_disown(pkgbase, user, privileged):
|
|||
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
|
||||
userid = cur.fetchone()[0]
|
||||
if userid == 0:
|
||||
die('{:s}: unknown user: {:s}'.format(action, user))
|
||||
raise aurweb.exceptions.InvalidUserException(user)
|
||||
|
||||
subprocess.Popen((notify_cmd, 'disown', str(pkgbase_id), str(userid)))
|
||||
|
||||
|
@ -272,14 +273,16 @@ def pkgbase_disown(pkgbase, user, privileged):
|
|||
def pkgbase_flag(pkgbase, user, comment):
|
||||
pkgbase_id = pkgbase_from_name(pkgbase)
|
||||
if not pkgbase_id:
|
||||
die('{:s}: package base not found: {:s}'.format(action, pkgbase))
|
||||
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
|
||||
if len(comment) < 3:
|
||||
raise aurweb.exceptions.InvalidCommentException(comment)
|
||||
|
||||
conn = aurweb.db.Connection()
|
||||
|
||||
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
|
||||
userid = cur.fetchone()[0]
|
||||
if userid == 0:
|
||||
die('{:s}: unknown user: {:s}'.format(action, user))
|
||||
raise aurweb.exceptions.InvalidUserException(user)
|
||||
|
||||
now = int(time.time())
|
||||
conn.execute("UPDATE PackageBases SET " +
|
||||
|
@ -295,14 +298,14 @@ def pkgbase_flag(pkgbase, user, comment):
|
|||
def pkgbase_unflag(pkgbase, user):
|
||||
pkgbase_id = pkgbase_from_name(pkgbase)
|
||||
if not pkgbase_id:
|
||||
die('{:s}: package base not found: {:s}'.format(action, pkgbase))
|
||||
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
|
||||
|
||||
conn = aurweb.db.Connection()
|
||||
|
||||
cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
|
||||
userid = cur.fetchone()[0]
|
||||
if userid == 0:
|
||||
die('{:s}: unknown user: {:s}'.format(action, user))
|
||||
raise aurweb.exceptions.InvalidUserException(user)
|
||||
|
||||
if user in pkgbase_get_comaintainers(pkgbase):
|
||||
conn.execute("UPDATE PackageBases SET OutOfDateTS = NULL " +
|
||||
|
@ -318,7 +321,7 @@ def pkgbase_unflag(pkgbase, user):
|
|||
def pkgbase_set_keywords(pkgbase, keywords):
|
||||
pkgbase_id = pkgbase_from_name(pkgbase)
|
||||
if not pkgbase_id:
|
||||
die('{:s}: package base not found: {:s}'.format(action, pkgbase))
|
||||
raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
|
||||
|
||||
conn = aurweb.db.Connection()
|
||||
|
||||
|
@ -377,29 +380,33 @@ def usage(cmds):
|
|||
exit(0)
|
||||
|
||||
|
||||
def main():
|
||||
user = os.environ.get('AUR_USER')
|
||||
privileged = (os.environ.get('AUR_PRIVILEGED', '0') == '1')
|
||||
ssh_cmd = os.environ.get('SSH_ORIGINAL_COMMAND')
|
||||
ssh_client = os.environ.get('SSH_CLIENT')
|
||||
def checkarg_atleast(cmdargv, *argdesc):
|
||||
if len(cmdargv) - 1 < len(argdesc):
|
||||
msg = 'missing {:s}'.format(argdesc[len(cmdargv) - 1])
|
||||
raise aurweb.exceptions.InvalidArgumentsException(msg)
|
||||
|
||||
if not ssh_cmd:
|
||||
die_with_help("Interactive shell is disabled.")
|
||||
cmdargv = shlex.split(ssh_cmd)
|
||||
action = cmdargv[0]
|
||||
remote_addr = ssh_client.split(' ')[0] if ssh_client else None
|
||||
|
||||
def checkarg_atmost(cmdargv, *argdesc):
|
||||
if len(cmdargv) - 1 > len(argdesc):
|
||||
raise aurweb.exceptions.InvalidArgumentsException('too many arguments')
|
||||
|
||||
|
||||
def checkarg(cmdargv, *argdesc):
|
||||
checkarg_atleast(cmdargv, *argdesc)
|
||||
checkarg_atmost(cmdargv, *argdesc)
|
||||
|
||||
|
||||
def serve(action, cmdargv, user, privileged, remote_addr):
|
||||
if enable_maintenance:
|
||||
if remote_addr not in maintenance_exc:
|
||||
die("The AUR is down due to maintenance. We will be back soon.")
|
||||
raise aurweb.exceptions.MaintenanceException
|
||||
|
||||
if action == 'git' and cmdargv[1] in ('upload-pack', 'receive-pack'):
|
||||
action = action + '-' + cmdargv[1]
|
||||
del cmdargv[1]
|
||||
|
||||
if action == 'git-upload-pack' or action == 'git-receive-pack':
|
||||
if len(cmdargv) < 2:
|
||||
die_with_help("{:s}: missing path".format(action))
|
||||
checkarg(cmdargv, 'path')
|
||||
|
||||
path = cmdargv[1].rstrip('/')
|
||||
if not path.startswith('/'):
|
||||
|
@ -408,11 +415,11 @@ def main():
|
|||
path = path + '.git'
|
||||
pkgbase = path[1:-4]
|
||||
if not re.match(repo_regex, pkgbase):
|
||||
die('{:s}: invalid repository name: {:s}'.format(action, pkgbase))
|
||||
raise aurweb.exceptions.InvalidRepositoryNameException(pkgbase)
|
||||
|
||||
if action == 'git-receive-pack' and pkgbase_exists(pkgbase):
|
||||
if not privileged and not pkgbase_has_write_access(pkgbase, user):
|
||||
die('{:s}: permission denied: {:s}'.format(action, user))
|
||||
raise aurweb.exceptions.PermissionDeniedException(user)
|
||||
|
||||
os.environ["AUR_USER"] = user
|
||||
os.environ["AUR_PKGBASE"] = pkgbase
|
||||
|
@ -420,79 +427,48 @@ def main():
|
|||
cmd = action + " '" + repo_path + "'"
|
||||
os.execl(git_shell_cmd, git_shell_cmd, '-c', cmd)
|
||||
elif action == 'set-keywords':
|
||||
if len(cmdargv) < 2:
|
||||
die_with_help("{:s}: missing repository name".format(action))
|
||||
checkarg(cmdargv, 'repository name')
|
||||
pkgbase_set_keywords(cmdargv[1], cmdargv[2:])
|
||||
elif action == 'list-repos':
|
||||
if len(cmdargv) > 1:
|
||||
die_with_help("{:s}: too many arguments".format(action))
|
||||
checkarg(cmdargv)
|
||||
list_repos(user)
|
||||
elif action == 'setup-repo':
|
||||
if len(cmdargv) < 2:
|
||||
die_with_help("{:s}: missing repository name".format(action))
|
||||
if len(cmdargv) > 2:
|
||||
die_with_help("{:s}: too many arguments".format(action))
|
||||
checkarg(cmdargv, 'repository name')
|
||||
warn('{:s} is deprecated. '
|
||||
'Use `git push` to create new repositories.'.format(action))
|
||||
create_pkgbase(cmdargv[1], user)
|
||||
elif action == 'restore':
|
||||
if len(cmdargv) < 2:
|
||||
die_with_help("{:s}: missing repository name".format(action))
|
||||
if len(cmdargv) > 2:
|
||||
die_with_help("{:s}: too many arguments".format(action))
|
||||
checkarg(cmdargv, 'repository name')
|
||||
|
||||
pkgbase = cmdargv[1]
|
||||
if not re.match(repo_regex, pkgbase):
|
||||
die('{:s}: invalid repository name: {:s}'.format(action, pkgbase))
|
||||
|
||||
if pkgbase_exists(pkgbase):
|
||||
die('{:s}: package base exists: {:s}'.format(action, pkgbase))
|
||||
create_pkgbase(pkgbase, user)
|
||||
|
||||
os.environ["AUR_USER"] = user
|
||||
os.environ["AUR_PKGBASE"] = pkgbase
|
||||
os.execl(git_update_cmd, git_update_cmd, 'restore')
|
||||
elif action == 'adopt':
|
||||
if len(cmdargv) < 2:
|
||||
die_with_help("{:s}: missing repository name".format(action))
|
||||
if len(cmdargv) > 2:
|
||||
die_with_help("{:s}: too many arguments".format(action))
|
||||
checkarg(cmdargv, 'repository name')
|
||||
|
||||
pkgbase = cmdargv[1]
|
||||
pkgbase_adopt(pkgbase, user, privileged)
|
||||
elif action == 'disown':
|
||||
if len(cmdargv) < 2:
|
||||
die_with_help("{:s}: missing repository name".format(action))
|
||||
if len(cmdargv) > 2:
|
||||
die_with_help("{:s}: too many arguments".format(action))
|
||||
checkarg(cmdargv, 'repository name')
|
||||
|
||||
pkgbase = cmdargv[1]
|
||||
pkgbase_disown(pkgbase, user, privileged)
|
||||
elif action == 'flag':
|
||||
if len(cmdargv) < 2:
|
||||
die_with_help("{:s}: missing repository name".format(action))
|
||||
if len(cmdargv) < 3:
|
||||
die_with_help("{:s}: missing comment".format(action))
|
||||
if len(cmdargv) > 3:
|
||||
die_with_help("{:s}: too many arguments".format(action))
|
||||
checkarg(cmdargv, 'repository name', 'comment')
|
||||
|
||||
pkgbase = cmdargv[1]
|
||||
comment = cmdargv[2]
|
||||
if len(comment) < 3:
|
||||
die_with_help("{:s}: comment is too short".format(action))
|
||||
|
||||
pkgbase_flag(pkgbase, user, comment)
|
||||
elif action == 'unflag':
|
||||
if len(cmdargv) < 2:
|
||||
die_with_help("{:s}: missing repository name".format(action))
|
||||
if len(cmdargv) > 2:
|
||||
die_with_help("{:s}: too many arguments".format(action))
|
||||
checkarg(cmdargv, 'repository name')
|
||||
|
||||
pkgbase = cmdargv[1]
|
||||
pkgbase_unflag(pkgbase, user)
|
||||
elif action == 'set-comaintainers':
|
||||
if len(cmdargv) < 2:
|
||||
die_with_help("{:s}: missing repository name".format(action))
|
||||
checkarg_atleast(cmdargv, 'repository name')
|
||||
|
||||
pkgbase = cmdargv[1]
|
||||
userlist = cmdargv[2:]
|
||||
|
@ -514,7 +490,30 @@ def main():
|
|||
}
|
||||
usage(cmds)
|
||||
else:
|
||||
die_with_help("invalid command: {:s}".format(action))
|
||||
msg = 'invalid command: {:s}'.format(action)
|
||||
raise aurweb.exceptions.InvalidArgumentsException(msg)
|
||||
|
||||
|
||||
def main():
|
||||
user = os.environ.get('AUR_USER')
|
||||
privileged = (os.environ.get('AUR_PRIVILEGED', '0') == '1')
|
||||
ssh_cmd = os.environ.get('SSH_ORIGINAL_COMMAND')
|
||||
ssh_client = os.environ.get('SSH_CLIENT')
|
||||
|
||||
if not ssh_cmd:
|
||||
die_with_help("Interactive shell is disabled.")
|
||||
cmdargv = shlex.split(ssh_cmd)
|
||||
action = cmdargv[0]
|
||||
remote_addr = ssh_client.split(' ')[0] if ssh_client else None
|
||||
|
||||
try:
|
||||
serve(action, cmdargv, user, privileged, remote_addr)
|
||||
except aurweb.exceptions.MaintenanceException:
|
||||
die("The AUR is down due to maintenance. We will be back soon.")
|
||||
except aurweb.exceptions.InvalidArgumentsException as e:
|
||||
die_with_help('{:s}: {}'.format(action, e))
|
||||
except aurweb.exceptions.AurwebException as e:
|
||||
die('{:s}: {}'.format(action, e))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Loading…
Add table
Reference in a new issue