aurweb/aurweb/rpc.py
Kevin Morris abe8c0630c
fix(rpc): improve type=info performance
Now, we use an equivalent query to PHP's query, yet we grab
every piece of data we need for all packages asked for in one
database query.

At this time, local benchmarks have shown a slight performance
improvement when compared to PHP.

fastapi 262 requests/sec
php 250 requests/sec

Extras:

- Moved RPCError to the aurweb.exceptions module

Signed-off-by: Kevin Morris <kevr@0cost.org>
2021-11-17 02:30:06 -08:00

361 lines
13 KiB
Python

from collections import defaultdict
from typing import Any, Callable, Dict, List, NewType, Union
from sqlalchemy import and_, literal
import aurweb.config as config
from aurweb import db, defaults, models, util
from aurweb.exceptions import RPCError
from aurweb.packages.search import RPCSearch
TYPE_MAPPING = {
"depends": "Depends",
"makedepends": "MakeDepends",
"checkdepends": "CheckDepends",
"optdepends": "OptDepends",
"conflicts": "Conflicts",
"provides": "Provides",
"replaces": "Replaces",
}
DataGenerator = NewType("DataGenerator",
Callable[[models.Package], Dict[str, Any]])
class RPC:
""" RPC API handler class.
There are various pieces to RPC's process, and encapsulating them
inside of a class means that external users do not abuse the
RPC implementation to achieve goals. We call type handlers
by taking a reference to the callback named "_handle_{type}_type(...)",
and if the handler does not exist, we return a not implemented
error to the API user.
EXPOSED_VERSIONS holds the set of versions that the API
officially supports.
EXPOSED_TYPES holds the set of types that the API officially
supports.
ALIASES holds an alias mapping of type -> type strings.
We should focus on privatizing implementation helpers and
focusing on performance in the code used.
"""
# A set of RPC versions supported by this API.
EXPOSED_VERSIONS = {5}
# A set of RPC types supported by this API.
EXPOSED_TYPES = {
"info", "multiinfo",
"search", "msearch",
"suggest", "suggest-pkgbase"
}
# A mapping of type aliases.
TYPE_ALIASES = {"info": "multiinfo"}
EXPOSED_BYS = {
"name-desc", "name", "maintainer",
"depends", "makedepends", "optdepends", "checkdepends"
}
# A mapping of by aliases.
BY_ALIASES = {"name-desc": "nd", "name": "n", "maintainer": "m"}
def __init__(self, version: int = 0, type: str = None) -> "RPC":
self.version = version
self.type = RPC.TYPE_ALIASES.get(type, type)
def error(self, message: str) -> Dict[str, Any]:
return {
"version": self.version,
"results": [],
"resultcount": 0,
"type": "error",
"error": message
}
def _verify_inputs(self, by: str = [], args: List[str] = []) -> None:
if self.version is None:
raise RPCError("Please specify an API version.")
if self.version not in RPC.EXPOSED_VERSIONS:
raise RPCError("Invalid version specified.")
if by not in RPC.EXPOSED_BYS:
raise RPCError("Incorrect by field specified.")
if self.type is None:
raise RPCError("No request type/data specified.")
if self.type not in RPC.EXPOSED_TYPES:
raise RPCError("Incorrect request type specified.")
def _enforce_args(self, args: List[str]) -> None:
if not args:
raise RPCError("No request type/data specified.")
def _get_json_data(self, package: models.Package) -> Dict[str, Any]:
""" Produce dictionary data of one Package that can be JSON-serialized.
:param package: Package instance
:returns: JSON-serializable dictionary
"""
# Produce RPC API compatible Popularity: If zero, it's an integer
# 0, otherwise, it's formatted to the 6th decimal place.
pop = package.PackageBase.Popularity
pop = 0 if not pop else float(util.number_format(pop, 6))
snapshot_uri = config.get("options", "snapshot_uri")
data = defaultdict(list)
data.update({
"ID": package.ID,
"Name": package.Name,
"PackageBaseID": package.PackageBaseID,
"PackageBase": package.PackageBase.Name,
# Maintainer should be set following this update if one exists.
"Maintainer": None,
"Version": package.Version,
"Description": package.Description,
"URL": package.URL,
"URLPath": snapshot_uri % package.Name,
"NumVotes": package.PackageBase.NumVotes,
"Popularity": pop,
"OutOfDate": package.PackageBase.OutOfDateTS,
"FirstSubmitted": package.PackageBase.SubmittedTS,
"LastModified": package.PackageBase.ModifiedTS
})
if package.PackageBase.Maintainer is not None:
# We do have a maintainer: set the Maintainer key.
data["Maintainer"] = package.PackageBase.Maintainer.Username
return data
def _get_info_json_data(self, package: models.Package) -> Dict[str, Any]:
data = self._get_json_data(package)
# All info results have _at least_ an empty list of
# License and Keywords.
data.update({
"License": [],
"Keywords": []
})
# If we actually got extra_info records, update data with
# them for this particular package.
if self.extra_info:
data.update(self.extra_info.get(package.ID, {}))
return data
def _assemble_json_data(self, packages: List[models.Package],
data_generator: DataGenerator) \
-> List[Dict[str, Any]]:
"""
Assemble JSON data out of a list of packages.
:param packages: A list of Package instances or a Package ORM query
:param data_generator: Generator callable of single-Package JSON data
"""
output = []
for pkg in packages:
db.refresh(pkg)
output.append(data_generator(pkg))
return output
def _handle_multiinfo_type(self, args: List[str] = [], **kwargs) \
-> List[Dict[str, Any]]:
self._enforce_args(args)
args = set(args)
packages = db.query(models.Package).join(models.PackageBase).filter(
models.Package.Name.in_(args))
ids = {pkg.ID for pkg in packages}
# Aliases for 80-width.
Package = models.Package
PackageKeyword = models.PackageKeyword
subqueries = [
# PackageDependency
db.query(
models.PackageDependency
).join(models.DependencyType).filter(
models.PackageDependency.PackageID.in_(ids)
).with_entities(
models.PackageDependency.PackageID.label("ID"),
models.DependencyType.Name.label("Type"),
models.PackageDependency.DepName.label("Name"),
models.PackageDependency.DepCondition.label("Cond")
).distinct().order_by("ID"),
# PackageRelation
db.query(
models.PackageRelation
).join(models.RelationType).filter(
models.PackageRelation.PackageID.in_(ids)
).with_entities(
models.PackageRelation.PackageID.label("ID"),
models.RelationType.Name.label("Type"),
models.PackageRelation.RelName.label("Name"),
models.PackageRelation.RelCondition.label("Cond")
).distinct().order_by("ID"),
# Groups
db.query(models.PackageGroup).join(
models.Group,
and_(models.PackageGroup.GroupID == models.Group.ID,
models.PackageGroup.PackageID.in_(ids))
).with_entities(
models.PackageGroup.PackageID.label("ID"),
literal("Groups").label("Type"),
models.Group.Name.label("Name"),
literal(str()).label("Cond")
).distinct().order_by("ID"),
# Licenses
db.query(models.PackageLicense).join(
models.License,
models.PackageLicense.LicenseID == models.License.ID
).filter(
models.PackageLicense.PackageID.in_(ids)
).with_entities(
models.PackageLicense.PackageID.label("ID"),
literal("License").label("Type"),
models.License.Name.label("Name"),
literal(str()).label("Cond")
).distinct().order_by("ID"),
# Keywords
db.query(models.PackageKeyword).join(
models.Package,
and_(Package.PackageBaseID == PackageKeyword.PackageBaseID,
Package.ID.in_(ids))
).with_entities(
models.Package.ID.label("ID"),
literal("Keywords").label("Type"),
models.PackageKeyword.Keyword.label("Name"),
literal(str()).label("Cond")
).distinct().order_by("ID")
]
# Union all subqueries together.
query = subqueries[0].union_all(*subqueries[1:])
# Store our extra information in a class-wise dictionary,
# which contains package id -> extra info dict mappings.
self.extra_info = defaultdict(lambda: defaultdict(list))
for record in query:
type_ = TYPE_MAPPING.get(record.Type, record.Type)
name = record.Name
if record.Cond:
name += record.Cond
self.extra_info[record.ID][type_].append(name)
return self._assemble_json_data(packages, self._get_info_json_data)
def _handle_search_type(self, by: str = defaults.RPC_SEARCH_BY,
args: List[str] = []) -> List[Dict[str, Any]]:
# If `by` isn't maintainer and we don't have any args, raise an error.
# In maintainer's case, return all orphans if there are no args,
# so we need args to pass through to the handler without errors.
if by != "m" and not len(args):
raise RPCError("No request type/data specified.")
arg = args[0] if args else str()
if by != "m" and len(arg) < 2:
raise RPCError("Query arg too small.")
search = RPCSearch()
search.search_by(by, arg)
max_results = config.getint("options", "max_rpc_results")
results = search.results().limit(max_results)
return self._assemble_json_data(results, self._get_json_data)
def _handle_msearch_type(self, args: List[str] = [], **kwargs)\
-> List[Dict[str, Any]]:
return self._handle_search_type(by="m", args=args)
def _handle_suggest_type(self, args: List[str] = [], **kwargs)\
-> List[str]:
if not args:
return []
arg = args[0]
packages = db.query(models.Package.Name).join(
models.PackageBase
).filter(
and_(models.PackageBase.PackagerUID.isnot(None),
models.Package.Name.like(f"%{arg}%"))
).order_by(models.Package.Name.asc()).limit(20)
return [pkg.Name for pkg in packages]
def _handle_suggest_pkgbase_type(self, args: List[str] = [], **kwargs)\
-> List[str]:
if not args:
return []
packages = db.query(models.PackageBase.Name).filter(
and_(models.PackageBase.PackagerUID.isnot(None),
models.PackageBase.Name.like(f"%{args[0]}%"))
).order_by(models.PackageBase.Name.asc()).limit(20)
return [pkg.Name for pkg in packages]
def _is_suggestion(self) -> bool:
return self.type.startswith("suggest")
def _handle_callback(self, by: str, args: List[str])\
-> Union[List[Dict[str, Any]], List[str]]:
# Get a handle to our callback and trap an RPCError with
# an empty list of results based on callback's execution.
callback = getattr(self, f"_handle_{self.type.replace('-', '_')}_type")
results = callback(by=by, args=args)
return results
def handle(self, by: str = defaults.RPC_SEARCH_BY, args: List[str] = [])\
-> Union[List[Dict[str, Any]], Dict[str, Any]]:
""" Request entrypoint. A router should pass v, type and args
to this function and expect an output dictionary to be returned.
:param v: RPC version argument
:param type: RPC type argument
:param args: Deciphered list of arguments based on arg/arg[] inputs
"""
# Prepare our output data dictionary with some basic keys.
data = {"version": self.version, "type": self.type}
# Run some verification on our given arguments.
try:
self._verify_inputs(by=by, args=args)
except RPCError as exc:
return self.error(str(exc))
# Convert by to its aliased value if it has one.
by = RPC.BY_ALIASES.get(by, by)
# Process the requested handler.
try:
results = self._handle_callback(by, args)
except RPCError as exc:
return self.error(str(exc))
# These types are special: we produce a different kind of
# successful JSON output: a list of results.
if self._is_suggestion():
return results
# Return JSON output.
data.update({
"resultcount": len(results),
"results": results
})
return data