diff --git a/aurweb/api.py b/aurweb/api.py new file mode 100644 index 00000000..3e0356a3 --- /dev/null +++ b/aurweb/api.py @@ -0,0 +1,439 @@ +from collections import defaultdict +from typing import Any + +from sqlalchemy import Float, and_, cast, literal +from sqlalchemy.orm import Query, aliased +from sqlalchemy.sql.expression import func + +from aurweb import config, db, models +from aurweb.exceptions import APIError +from aurweb.models import ( + DependencyType, + Group, + Package, + PackageBase, + PackageComaintainer, + PackageDependency, + PackageGroup, + PackageKeyword, + PackageLicense, + PackageRelation, + RelationType, + User, +) +from aurweb.models.dependency_type import ( + CHECKDEPENDS_ID, + DEPENDS_ID, + MAKEDEPENDS_ID, + OPTDEPENDS_ID, +) +from aurweb.models.relation_type import CONFLICTS_ID, PROVIDES_ID, REPLACES_ID + +VALID_VERSIONS = {6} + +VALID_TYPES = { + "search", + "info", + "suggest", + "suggest-pkgbase", +} + +VALID_BYS_SEARCH = { + "name", + "name-desc", +} + +VALID_BYS_INFO = { + "name", + "maintainer", + "submitter", + "depends", + "makedepends", + "optdepends", + "checkdepends", + "provides", + "conflicts", + "replaces", + "groups", + "keywords", + "comaintainers", +} + +VALID_MODES = {"contains", "starts-with"} + +TYPE_MAPPING = { + "depends": "Depends", + "makedepends": "MakeDepends", + "checkdepends": "CheckDepends", + "optdepends": "OptDepends", + "conflicts": "Conflicts", + "provides": "Provides", + "replaces": "Replaces", +} + +DEPENDS_MAPPING = { + "depends": DEPENDS_ID, + "makedepends": MAKEDEPENDS_ID, + "optdepends": OPTDEPENDS_ID, + "checkdepends": CHECKDEPENDS_ID, +} + +RELATIONS_MAPPING = { + "provides": PROVIDES_ID, + "replaces": REPLACES_ID, + "conflicts": CONFLICTS_ID, +} + + +class API: + """ + API handler class. + + Parameters: + :param version: API version + :param type: Type of request + :param by: The field that is being used to filter our query + :param mode: The search mode (only applicable for the search type) + :param arg: A list of keywords that is used to filter our records + """ + + def __init__(self, version: str, type: str, by: str, mode: str, arg: list[str]): + try: + self.version = int(version) + except ValueError: + self.version = None + self.type = type + self.by = by + self.mode = mode + self.arg = arg + + # set default for empty "by" parameter + if type == "search" and (by == "" or by is None): + self.by = "name-desc" + elif by == "" or by is None: + self.by = "name" + + # set default for empty "mode" parameter + if mode == "" or mode is None: + self.mode = "contains" + + # base query + self.query = self._get_basequery() + + def get_results(self) -> dict[str, Any]: + """ + Returns a dictionary with our data. + """ + + # input validation + try: + self._validate_parameters() + except APIError as ex: + return self.error(str(ex)) + + data = { + "resultcount": 0, + "results": [], + "type": self.type, + "version": self.version, + } + + # get results according to type + try: + if self.type == "search": + data["results"] = self._search() + if self.type.startswith("suggest"): + data["suggest"] = self._suggest() + if self.type == "info": + data["results"] = self._info() + except APIError as ex: + return self.error(str(ex)) + + data["resultcount"] = len(data["results"]) + + return data + + def error(self, message: str) -> dict[str, Any]: + """ + Returns a dictionary with an empty result set and error message. + """ + return { + "error": message, + "resultcount": 0, + "results": [], + "type": "error", + "version": self.version, + } + + def _validate_parameters(self): + if self.version not in VALID_VERSIONS: + raise APIError("Invalid version specified.") + if self.type not in VALID_TYPES: + raise APIError("Incorrect request type specified.") + if (self.type == "search" and self.by not in VALID_BYS_SEARCH) or ( + self.type == "info" and self.by not in VALID_BYS_INFO + ): + raise APIError("Incorrect by field specified.") + if self.mode not in VALID_MODES: + raise APIError("Incorrect mode specified.") + if self.type == "search" and (len(self.arg) == 0 or len(self.arg[0]) < 2): + raise APIError("Query arg too small.") + if self.type == "info" and self.by != "maintainer" and len(self.arg) == 0: + raise APIError("Query arg too small.") + + def _search(self) -> list[dict[str, Any]]: + for keyword in self.arg[0].split(" "): + # define search expression according to "mode" + expression = f"{keyword}%" if self.mode == "starts-with" else f"%{keyword}%" + + # name or name/desc search + if self.by == "name": + self.query = self.query.filter(Package.Name.like(expression)) + else: + self.query = self.query.filter( + (Package.Name.like(expression)) + | (Package.Description.like(expression)) + ) + + return self._run_queries() + + def _suggest(self) -> list[dict[str, Any]]: + if len(self.arg) == 0: + self.arg.append("") + query = ( + db.query(Package.Name) + .join(PackageBase) + .filter( + (PackageBase.PackagerUID.isnot(None)) + & Package.Name.like(f"{self.arg[0]}%") + ) + .order_by(Package.Name) + ) + + if self.type == "suggest-pkgbase": + query = ( + db.query(PackageBase.Name) + .filter( + (PackageBase.PackagerUID.isnot(None)) + & PackageBase.Name.like(f"{self.arg[0]}%") + ) + .order_by(PackageBase.Name) + ) + + data = query.limit(20).all() + + # "suggest" returns an array of strings + return [rec[0] for rec in data] + + def _info(self) -> list[dict[str, Any]]: # noqa: C901 + # get unique list of arguments + args = set(self.arg) + + # subquery for submitter and comaintainer queries + users = db.query(User.ID).filter(User.Username.in_(args)) + + # Define joins and filters for our "by" parameter + if self.by == "name": + self.query = self.query.filter(Package.Name.in_(args)) + elif self.by == "maintainer": + if len(args) == 0 or self.arg[0] == "": + self.query = self.query.filter(PackageBase.MaintainerUID.is_(None)) + else: + self.query = self.query.filter(PackageBase.MaintainerUID.in_(users)) + elif self.by == "submitter": + self.query = self.query.filter(PackageBase.SubmitterUID.in_(users)) + elif self.by in ["depends", "makedepends", "optdepends", "checkdepends"]: + self.query = self.query.join(PackageDependency).filter( + (PackageDependency.DepTypeID == DEPENDS_MAPPING.get(self.by, 0)) + & (PackageDependency.DepName.in_(args)) + ) + elif self.by in ["provides", "conflicts", "replaces"]: + self.query = self.query.join(PackageRelation).filter( + (PackageRelation.RelTypeID == RELATIONS_MAPPING.get(self.by, 0)) + & (PackageRelation.RelName.in_(args)) + ) + + # A package always provides itself, so we have to include it. + # Union query is the fastest way of doing this. + if self.by == "provides": + itself = self._get_basequery().filter(Package.Name.in_(args)) + self.query = self.query.union(itself) + elif self.by == "groups": + self.query = ( + self.query.join(PackageGroup).join(Group).filter(Group.Name.in_(args)) + ) + elif self.by == "keywords": + self.query = ( + self.query.join(PackageKeyword) + .filter(PackageKeyword.Keyword.in_(args)) + .distinct() + ) + elif self.by == "comaintainers": + self.query = ( + self.query.join( + PackageComaintainer, + PackageBase.ID == PackageComaintainer.PackageBaseID, + ) + .filter(PackageComaintainer.UsersID.in_(users)) + .distinct() + ) + + return self._run_queries() + + def _run_queries(self) -> list[dict[str, Any]]: + max_results = config.getint("options", "max_rpc_results") + + # get basic package data + base_data = self.query.limit(max_results + 1).all() + packages = [dict(row._asdict()) for row in base_data] + + # return error if we exceed max results + if len(base_data) > max_results: + raise APIError("Too many package results.") + + # get list of package IDs for our subquery + ids = {pkg.ID for pkg in base_data} + + # get data from related tables + sub_data = self._get_subqueries(ids).all() + + # store extended information in dict for later lookup + extra_info = defaultdict(lambda: defaultdict(list)) + for record in sub_data: + type_ = TYPE_MAPPING.get(record.Type, record.Type) + + name = record.Name + if record.Cond: + name += record.Cond + + extra_info[record.ID][type_].append(name) + + # add extended info to each package + for pkg in packages: + pkg.update(extra_info.get(pkg["ID"], [])) + pkg.pop("ID") # remove ID from our results + + return packages + + def _get_basequery(self) -> Query: + snapshot_uri = config.get("options", "snapshot_uri") + Submitter = aliased(User) + return ( + db.query(Package) + .join(PackageBase) + .join( + User, + User.ID == PackageBase.MaintainerUID, + isouter=True, + ) + .join( + Submitter, + Submitter.ID == PackageBase.SubmitterUID, + isouter=True, + ) + .with_entities( + Package.ID, + Package.Name, + Package.Description, + Package.Version, + PackageBase.Name.label("PackageBase"), + Package.URL, + func.replace(snapshot_uri, "%s", PackageBase.Name).label("URLPath"), + User.Username.label("Maintainer"), + Submitter.Username.label("Submitter"), + PackageBase.SubmittedTS.label("FirstSubmitted"), + PackageBase.ModifiedTS.label("LastModified"), + PackageBase.OutOfDateTS.label("OutOfDate"), + PackageBase.NumVotes, + cast(PackageBase.Popularity, Float).label("Popularity"), + ) + ) + + def _get_subqueries(self, ids: set[int]) -> Query: + subqueries = [ + # PackageDependency + db.query(PackageDependency) + .join(DependencyType) + .filter(PackageDependency.PackageID.in_(ids)) + .with_entities( + PackageDependency.PackageID.label("ID"), + DependencyType.Name.label("Type"), + PackageDependency.DepName.label("Name"), + PackageDependency.DepCondition.label("Cond"), + ) + .distinct() # A package could have the same dependency multiple times + .order_by("Name"), + # PackageRelation + db.query(PackageRelation) + .join(RelationType) + .filter(PackageRelation.PackageID.in_(ids)) + .with_entities( + PackageRelation.PackageID.label("ID"), + RelationType.Name.label("Type"), + PackageRelation.RelName.label("Name"), + PackageRelation.RelCondition.label("Cond"), + ) + .distinct() # A package could have the same relation multiple times + .order_by("Name"), + # Groups + db.query(PackageGroup) + .join( + Group, + and_( + PackageGroup.GroupID == Group.ID, + PackageGroup.PackageID.in_(ids), + ), + ) + .with_entities( + PackageGroup.PackageID.label("ID"), + literal("Groups").label("Type"), + Group.Name.label("Name"), + literal(str()).label("Cond"), + ) + .order_by("Name"), + # Licenses + db.query(PackageLicense) + .join(models.License, PackageLicense.LicenseID == models.License.ID) + .filter(PackageLicense.PackageID.in_(ids)) + .with_entities( + PackageLicense.PackageID.label("ID"), + literal("License").label("Type"), + models.License.Name.label("Name"), + literal(str()).label("Cond"), + ) + .order_by("Name"), + # Keywords + db.query(PackageKeyword) + .join( + Package, + and_( + Package.PackageBaseID == PackageKeyword.PackageBaseID, + Package.ID.in_(ids), + ), + ) + .with_entities( + Package.ID.label("ID"), + literal("Keywords").label("Type"), + PackageKeyword.Keyword.label("Name"), + literal(str()).label("Cond"), + ) + .order_by("Name"), + # Co-Maintainer + db.query(PackageComaintainer) + .join(User, User.ID == PackageComaintainer.UsersID) + .join( + Package, + Package.PackageBaseID == PackageComaintainer.PackageBaseID, + ) + .with_entities( + Package.ID, + literal("CoMaintainers").label("Type"), + User.Username.label("Name"), + literal(str()).label("Cond"), + ) + .distinct() # A package could have the same co-maintainer multiple times + .order_by("Name"), + ] + + # Union all subqueries into one statement. + return subqueries[0].union_all(*subqueries[1:]) diff --git a/aurweb/exceptions.py b/aurweb/exceptions.py index e24eb607..66eca9b9 100644 --- a/aurweb/exceptions.py +++ b/aurweb/exceptions.py @@ -85,6 +85,10 @@ class RPCError(AurwebException): pass +class APIError(AurwebException): + pass + + class ValidationError(AurwebException): def __init__(self, data: Any, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/aurweb/routers/__init__.py b/aurweb/routers/__init__.py index ccd70662..a34f17bc 100644 --- a/aurweb/routers/__init__.py +++ b/aurweb/routers/__init__.py @@ -5,6 +5,7 @@ See https://fastapi.tiangolo.com/tutorial/bigger-applications/ """ from . import ( accounts, + api, auth, html, package_maintainer, @@ -23,6 +24,7 @@ to a fastapi.APIRouter. """ APP_ROUTES = [ accounts, + api, auth, html, packages, diff --git a/aurweb/routers/api.py b/aurweb/routers/api.py new file mode 100644 index 00000000..92d5b935 --- /dev/null +++ b/aurweb/routers/api.py @@ -0,0 +1,120 @@ +""" +API routing module + +Available routes: + +- GET /api/v{version}/{type}/{arg} +- GET /api/v{version}/{type}/{by}/{arg} +- GET /api/v{version}/{type}/{by}/{mode}/{arg} + +- GET /api/v{version}/{type} (query params: by, mode, arg) + +- POST /api/v{version}/{type} (form values: by, mode, arg) + +""" + +from http import HTTPStatus +from typing import Any + +import orjson +from fastapi import APIRouter, Request, Response +from fastapi.responses import JSONResponse + +from aurweb.api import API +from aurweb.ratelimit import check_ratelimit +from aurweb.util import remove_empty + +router = APIRouter() + + +# path based +@router.get("/api/v{version}/{type}/{arg}") +async def api_version_type_arg(request: Request, version: str, type: str, arg: str): + return handle_request( + request=request, version=version, type=type, by="", mode="", arg=list([arg]) + ) + + +@router.get("/api/v{version}/{type}/{by}/{arg}") +async def api_version_type_by_arg( + request: Request, version: str, type: str, by: str, arg: str +): + return handle_request( + request=request, version=version, type=type, by=by, mode="", arg=list([arg]) + ) + + +@router.get("/api/v{version}/{type}/{by}/{mode}/{arg}") +async def api_version_type_by_mode_arg( + request: Request, version: str, type: str, by: str, mode: str, arg: str +): + return handle_request( + request=request, version=version, type=type, by=by, mode=mode, arg=list([arg]) + ) + + +# query string +@router.get("/api/v{version}/{type}") +async def api_version_type(request: Request, version: str, type: str): + params = request.query_params + by = params.get("by") + mode = params.get("mode") + arg = params.getlist("arg") + + return handle_request( + request=request, version=version, type=type, by=by, mode=mode, arg=arg + ) + + +# form data (POST) +@router.post("/api/v{version}/{type}") +async def api_post_version_type(request: Request, version: str, type: str): + form = await request.form() + by = form.get("by") + mode = form.get("mode") + arg = form.getlist("arg") + + return handle_request( + request=request, version=version, type=type, by=by, mode=mode, arg=arg + ) + + +def handle_request( + request: Request, version: str, type: str, by: str, mode: str, arg: list[str] +): + """ + Middleware for checking rate-limits + + All routers should initiate requests through this function. + """ + api = API(version=version, type=type, by=by, mode=mode, arg=arg) + + # rate limit check + if check_ratelimit(request): + return JSONResponse( + api.error("Rate limit reached"), + status_code=int(HTTPStatus.TOO_MANY_REQUESTS), + ) + + # run query and return results + return compose_response(api.get_results()) + + +def compose_response(result: dict[str, Any]) -> Response: + """ + Converts our data into JSON format and generates a response. + We also check for any errors and set the http status accordingly. + """ + + status = HTTPStatus.OK + if result.get("error") is not None: + status = HTTPStatus.BAD_REQUEST + + # suggest calls are returned as plain JSON string array + if result.get("suggest") is not None: + return JSONResponse(result["suggest"]) + + # remove null values from results and compose JSON data + result["results"] = remove_empty(result["results"]) + data = orjson.dumps(result) + return Response(content=data, status_code=status, media_type="application/json") diff --git a/aurweb/util.py b/aurweb/util.py index 3410e4d8..532a5854 100644 --- a/aurweb/util.py +++ b/aurweb/util.py @@ -208,3 +208,16 @@ def hash_query(query: Query): return sha1( str(query.statement.compile(compile_kwargs={"literal_binds": True})).encode() ).hexdigest() + + +def remove_empty(value): + """ + Recursively remove all None values from dictionaries and lists, + and return the result as a new dictionary or list. + """ + if isinstance(value, list): + return [remove_empty(x) for x in value if x is not None] + elif isinstance(value, dict): + return {key: remove_empty(val) for key, val in value.items() if val is not None} + else: + return value diff --git a/doc/api.md b/doc/api.md new file mode 100644 index 00000000..4cf82f0a --- /dev/null +++ b/doc/api.md @@ -0,0 +1,306 @@ +# aurweb v6 /api + +Specification for version 6 of the metadata REST-API + +## Endpoints + +Endpoints are classified in 3 categories: + +* Search -> Search for packages +* Info -> Lookup information for package (exact keyword) +* Suggest -> Search for package names (max. 20 results) + +### Search + +Performs a search query according to the `by` parameter. + +* GET`/api/v6/search/{arg}` + GET`/api/v6/search/{by}/{arg}` + GET`/api/v6/search/{by}/{mode}/{arg}` + * `arg` (path; mandatory) + * `by` (path; optional) + * `mode` (path; optional) + +#### Parameters + +* `by` + + The field that is utilized in the search query + If not specified, searching is performend with ***name-desc*** + * Type: `string` + * Allowed values + * `name` + * `name-desc` + * Default: `name-desc` + +* `mode` + + The search-mode that is being used to query packages + If not specified, searching is performend with ***contains*** + * Type: `string` + * Allowed values + * `contains` + * `starts-with` + * Default: `contains` + +* `arg` + + The argument/search-term(s) for the search query + Multiple terms/words can be supplied (space separated) + to perform and *AND*-like query + + * Type: `string` + +#### Response + +Data is returned in JSON format. +Empty fields are ommitted in the output. + +`200 - OK` + +* PackageData + + ```json + { + "resultcount": 1, + "results": [ + { + "Name": "my-pkg", + "Description": "Package description", + "Version": "1.7.5-1", + "PackageBase": "my-pkg", + "URL": "https://example.com", + "URLPath": "/cgit/aur.git/snapshot/my-pkg.tar.gz", + "Maintainer": "someone", + "Submitter": "someone", + "FirstSubmitted": 1648375227, + "LastModified": 1666386881, + "OutOfDate": 1648375227, + "NumVotes": 10, + "Popularity": 6.463867, + "License": [ + "MIT", + "GPL3" + ], + "Depends": [ + "some-pkg", + "another-pkg" + ], + "MakeDepends": [ + "some-pkg", + "another-pkg" + ], + "OptDepends": [ + "some-pkg", + "another-pkg" + ], + "CheckDepends": [ + "some-pkg", + "another-pkg" + ], + "Provides": [ + "some-pkg", + "another-pkg" + ], + "Conflicts": [ + "some-pkg", + "another-pkg" + ], + "Replaces": [ + "some-pkg", + "another-pkg" + ], + "Groups": [ + "some-grp", + "another-grp" + ], + "Keywords": [ + "some-keyword", + "another-keyword" + ], + "CoMaintainers": [ + "someone", + "another-one" + ] + } + ], + "type": "search", + "version": 6 + } + ``` + +`400 - Bad Request` + +* Error + + ```json + { + "error": "Incorrect by field specified", + "resultcount": 1, + "results": [], + "type": "error", + "version": 6 + } + ``` + +### Info + +Returns a list of detailed package data for one or more packages + +#### Single lookup +* GET`/api/v6/info/{arg}` + GET`/api/v6/info/{by}/{arg}` + * `arg` (path; mandatory) + * `by` (path; optional) + +#### Multi lookup +* GET`/api/v6/info?arg=xyz&arg=abc` + GET`/api/v6/info?by=provides&arg=xyz&arg=abc` + * `arg` (query-string; mandatory; one or more) + * `by` (query-string; optional) + +* POST`/api/v6/info` + * BODY (`application/x-www-form-urlencoded`): + ```xml + arg=one&arg=two&by=provides + ``` + +#### Parameters + +* `by` + + The field is being utilized in the lookup query + If not specified, a lookup is performend with ***name*** + * Type: `string` + * Allowed values + * `name` + * `depends` + * `checkdepends` + * `optdepends` + * `makedepends` + * `maintainer` + * `submitter` + * `provides` + * `conflicts` + * `replaces` + * `keywords` + * `groups` + * `comaintainers` + * Default: `name` + +* `arg` + + One or more keywords + + * Type: `string` or `string-array` (depending on the endpoint) + +#### Response + +Data is returned in JSON format. +Empty fields are ommitted in the output. + +`200 - OK` + +* PackageData + + See `Search` type + +`400 - Bad Request` + +* Error + + See `Search` type + +### Suggest + +Returns a list of package names starting with the supplied argument. +Mostly used for auto-completion fields when typing. + +#### Starts-with search + +* GET`/api/v6/suggest/{arg}` + * `arg` (path; mandatory) + +* GET`/api/v6/suggest-pkgbase/{arg}` + * `arg` (path; mandatory) + + +#### Response + +Data is returned in JSON format. + +`200 - OK` + +* PackageNames + + ```json + [ + "pkg1", + "pkg2", + "pkg3", + ] + ``` + +#### Parameters + +* `arg` + + Search term (starts-with) + + * Type: `string` + +## Examples + +Below you'll find some basic examples for the different types of requests. + +### Search + +* packages containing `firefox` in the package name or description + + GET`/api/v6/search/firefox` + GET`/api/v6/search/name-desc/firefox` + GET`/api/v6/search/name-desc/contains/firefox` + +* packages whose name start with `firefox` + + GET`/api/v6/search/name/starts-with/firefox` + +* packages containing both `fire` and `fox` in the name + + GET`/api/v6/search/name/fire+fox` + (note that `+` is a URL-encoded whitespace character) + +### Info + +* a package with the name `firefox` + + GET`/api/v6/info/firefox` + +* packages providing `firefox` + + GET`/api/v6/info/provides/firefox` + GET`/api/v6/info?by=provides&arg=firefox` + +* packages co-maintained by `someuser` + + GET`/api/v6/info/comaintainers/someuser` + GET`/api/v6/info?by=comaintainers&arg=someuser` + +* packages with name `firefox` or `chromium` + + GET`/api/v6/info?by=name&arg=firefox&arg=chromium` + + POST`/api/v6/info` + ``` + arg=firefox&arg=chromium&by=name + ``` + +### Suggest + +* packages starting with `fire` + + GET`/api/v6/suggest/fire` + +* packages whose pkgbase starts with `fire` + + GET`/api/v6/suggest-pkgbase/fire` diff --git a/test/test_api.py b/test/test_api.py new file mode 100644 index 00000000..1f3a9705 --- /dev/null +++ b/test/test_api.py @@ -0,0 +1,1220 @@ +from http import HTTPStatus +from unittest import mock + +import orjson +import pytest +from fastapi.testclient import TestClient +from redis.client import Pipeline + +import aurweb.models.dependency_type as dt +import aurweb.models.relation_type as rt +from aurweb import asgi, config, db, scripts, time +from aurweb.aur_redis import redis_connection +from aurweb.models.account_type import USER_ID +from aurweb.models.group import Group +from aurweb.models.license import License +from aurweb.models.package import Package +from aurweb.models.package_base import PackageBase +from aurweb.models.package_comaintainer import PackageComaintainer +from aurweb.models.package_dependency import PackageDependency +from aurweb.models.package_group import PackageGroup +from aurweb.models.package_keyword import PackageKeyword +from aurweb.models.package_license import PackageLicense +from aurweb.models.package_relation import PackageRelation +from aurweb.models.package_vote import PackageVote +from aurweb.models.user import User + + +@pytest.fixture +def client() -> TestClient: + yield TestClient(app=asgi.app) + + +@pytest.fixture +def user(db_test) -> User: + with db.begin(): + user = db.create( + User, + Username="test", + Email="test@example.org", + RealName="Test User 1", + Passwd=str(), + AccountTypeID=USER_ID, + ) + yield user + + +@pytest.fixture +def user2() -> User: + with db.begin(): + user = db.create( + User, + Username="user2", + Email="user2@example.org", + RealName="Test User 2", + Passwd=str(), + AccountTypeID=USER_ID, + ) + yield user + + +@pytest.fixture +def user3() -> User: + with db.begin(): + user = db.create( + User, + Username="user3", + Email="user3@example.org", + RealName="Test User 3", + Passwd=str(), + AccountTypeID=USER_ID, + ) + yield user + + +@pytest.fixture +def packages(user: User, user2: User, user3: User) -> list[Package]: + output = [] + + # Create package records used in our tests. + with db.begin(): + pkgbase = db.create( + PackageBase, + Name="big-chungus", + Maintainer=user, + Packager=user, + Submitter=user2, + SubmittedTS=1672214227, + ModifiedTS=1672214227, + ) + pkg = db.create( + Package, + PackageBase=pkgbase, + Name=pkgbase.Name, + Description="Bunny bunny around bunny", + URL="https://example.com/", + Version="1.0.0", + ) + output.append(pkg) + + pkgbase = db.create( + PackageBase, + Name="chungy-chungus", + Maintainer=user, + Packager=user, + Submitter=user2, + SubmittedTS=1672214227, + ModifiedTS=1672214227, + ) + pkg = db.create( + Package, + PackageBase=pkgbase, + Name=pkgbase.Name, + Description="Wubby wubby on wobba wuubu", + URL="https://example.com/", + Version="2.0.0", + ) + output.append(pkg) + + pkgbase = db.create( + PackageBase, Name="gluggly-chungus", Maintainer=user, Packager=user + ) + pkg = db.create( + Package, + PackageBase=pkgbase, + Name=pkgbase.Name, + Description="glurrba glurrba gur globba", + URL="https://example.com/", + ) + output.append(pkg) + + pkgbase = db.create( + PackageBase, Name="fugly-chungus", Maintainer=user, Packager=user + ) + + desc = "A Package belonging to a PackageBase with another name." + pkg = db.create( + Package, + PackageBase=pkgbase, + Name="other-pkg", + Description=desc, + URL="https://example.com", + ) + output.append(pkg) + + pkgbase = db.create(PackageBase, Name="woogly-chungus") + pkg = db.create( + Package, + PackageBase=pkgbase, + Name=pkgbase.Name, + Description="wuggla woblabeloop shemashmoop", + URL="https://example.com/", + ) + output.append(pkg) + + # Setup a few more related records on the first package: + # a license, group, some keywords, comaintainer and some votes. + with db.begin(): + lic = db.create(License, Name="GPL") + db.create(PackageLicense, Package=output[0], License=lic) + + grp = db.create(Group, Name="testgroup") + db.create(PackageGroup, Package=output[0], Group=grp) + + db.create( + PackageComaintainer, + PackageBase=output[0].PackageBase, + User=user2, + Priority=1, + ) + + for keyword in ["big-chungus", "smol-chungus", "sizeable-chungus"]: + db.create( + PackageKeyword, PackageBase=output[0].PackageBase, Keyword=keyword + ) + + now = time.utcnow() + for user_ in [user, user2, user3]: + db.create( + PackageVote, User=user_, PackageBase=output[0].PackageBase, VoteTS=now + ) + scripts.popupdate.run_single(output[0].PackageBase) + + yield output + + +@pytest.fixture +def depends(packages: list[Package]) -> list[PackageDependency]: + output = [] + + with db.begin(): + dep = db.create( + PackageDependency, + Package=packages[0], + DepTypeID=dt.DEPENDS_ID, + DepName="chungus-depends", + ) + output.append(dep) + + dep = db.create( + PackageDependency, + Package=packages[1], + DepTypeID=dt.DEPENDS_ID, + DepName="chungy-depends", + ) + output.append(dep) + + dep = db.create( + PackageDependency, + Package=packages[0], + DepTypeID=dt.OPTDEPENDS_ID, + DepName="chungus-optdepends", + DepCondition="=50", + ) + output.append(dep) + + dep = db.create( + PackageDependency, + Package=packages[0], + DepTypeID=dt.MAKEDEPENDS_ID, + DepName="chungus-makedepends", + ) + output.append(dep) + + dep = db.create( + PackageDependency, + Package=packages[0], + DepTypeID=dt.CHECKDEPENDS_ID, + DepName="chungus-checkdepends", + ) + output.append(dep) + + yield output + + +@pytest.fixture +def relations(packages: list[Package]) -> list[PackageRelation]: + output = [] + + with db.begin(): + rel = db.create( + PackageRelation, + Package=packages[0], + RelTypeID=rt.CONFLICTS_ID, + RelName="chungus-conflicts", + ) + output.append(rel) + + rel = db.create( + PackageRelation, + Package=packages[1], + RelTypeID=rt.CONFLICTS_ID, + RelName="chungy-conflicts", + ) + output.append(rel) + + rel = db.create( + PackageRelation, + Package=packages[0], + RelTypeID=rt.PROVIDES_ID, + RelName="chungus-provides", + RelCondition="<=200", + ) + output.append(rel) + + rel = db.create( + PackageRelation, + Package=packages[0], + RelTypeID=rt.REPLACES_ID, + RelName="chungus-replaces", + RelCondition="<=200", + ) + output.append(rel) + + # Finally, yield the packages. + yield output + + +@pytest.fixture +def comaintainer( + user2: User, user3: User, packages: list[Package] +) -> list[PackageComaintainer]: + output = [] + + with db.begin(): + comaintainer = db.create( + PackageComaintainer, + User=user2, + PackageBase=packages[0].PackageBase, + Priority=1, + ) + output.append(comaintainer) + + comaintainer = db.create( + PackageComaintainer, + User=user3, + PackageBase=packages[0].PackageBase, + Priority=1, + ) + output.append(comaintainer) + + # Finally, yield the packages. + yield output + + +@pytest.fixture(autouse=True) +def setup(db_test): + # Create some extra package relationships. + pass + + +@pytest.fixture +def pipeline(): + redis = redis_connection() + pipeline = redis.pipeline() + + # The 'testclient' host is used when requesting the app + # via fastapi.testclient.TestClient. + pipeline.delete("ratelimit-ws:testclient") + pipeline.delete("ratelimit:testclient") + pipeline.execute() + + yield pipeline + + +expected_single = { + "version": 6, + "results": [ + { + "Name": "big-chungus", + "Version": "1.0.0", + "Description": "Bunny bunny around bunny", + "URL": "https://example.com/", + "PackageBase": "big-chungus", + "NumVotes": 3, + "Popularity": 3.0, + "Maintainer": "test", + "Submitter": "user2", + "FirstSubmitted": 1672214227, + "LastModified": 1672214227, + "URLPath": "/cgit/aur.git/snapshot/big-chungus.tar.gz", + "Depends": ["chungus-depends"], + "OptDepends": ["chungus-optdepends=50"], + "MakeDepends": ["chungus-makedepends"], + "CheckDepends": ["chungus-checkdepends"], + "Conflicts": ["chungus-conflicts"], + "CoMaintainers": ["user2", "user3"], + "Provides": ["chungus-provides<=200"], + "Replaces": ["chungus-replaces<=200"], + "License": ["GPL"], + "Keywords": ["big-chungus", "sizeable-chungus", "smol-chungus"], + "Groups": ["testgroup"], + } + ], + "resultcount": 1, + "type": "info", +} + +expected_multi = { + "version": 6, + "results": [ + expected_single["results"][0], + { + "Name": "chungy-chungus", + "Version": "2.0.0", + "Description": "Wubby wubby on wobba wuubu", + "URL": "https://example.com/", + "PackageBase": "chungy-chungus", + "NumVotes": 0, + "Popularity": 0.0, + "Maintainer": "test", + "Submitter": "user2", + "FirstSubmitted": 1672214227, + "LastModified": 1672214227, + "URLPath": "/cgit/aur.git/snapshot/chungy-chungus.tar.gz", + "Depends": ["chungy-depends"], + "Conflicts": ["chungy-conflicts"], + }, + ], + "resultcount": 2, + "type": "info", +} + + +def test_api_info_get_query( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "name" + with client as request: + resp = request.get( + "/api/v6/info", + params={ + "arg": ["big-chungus"], + }, + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_info_get_path( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with path param + with client as request: + resp = request.get( + "/api/v6/info/big-chungus", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + # Make request: GET with path params + with client as request: + resp = request.get( + "/api/v6/info/name/big-chungus", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_info_post( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: POST with form param + with client as request: + resp = request.post( + "/api/v6/info", + data={ + "arg": ["big-chungus"], + }, + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_info_get_multiple( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "name" + with client as request: + resp = request.get( + "/api/v6/info", + params={ + "arg": ["big-chungus", "chungy-chungus"], + }, + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_multi + + +def test_api_info_post_multiple( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: POST with form param + with client as request: + resp = request.post( + "/api/v6/info", + data={ + "arg": ["big-chungus", "chungy-chungus"], + }, + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_multi + + +def test_api_info_depends( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "depends" + with client as request: + resp = request.get( + "/api/v6/info/depends/chungus-depends", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + # Make request: GET with query param by "depends", multiple + with client as request: + resp = request.get( + "/api/v6/info", + params={ + "by": "depends", + "arg": ["chungus-depends", "chungy-depends"], + }, + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_multi + + +def test_api_info_makedepends( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "makedepends" + with client as request: + resp = request.get( + "/api/v6/info/makedepends/chungus-makedepends", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_info_optdepends( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "optdepends" + with client as request: + resp = request.get( + "/api/v6/info/optdepends/chungus-optdepends", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_info_checkdepends( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "checkdepends" + with client as request: + resp = request.get( + "/api/v6/info/checkdepends/chungus-checkdepends", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_info_provides( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "provides" + with client as request: + resp = request.get( + "/api/v6/info/provides/chungus-provides", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + # Make request: GET with query param by "provides", multiple + with client as request: + resp = request.get( + "/api/v6/info", + params={ + "by": "provides", + "arg": ["big-chungus", "chungy-chungus"], + }, + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_multi + + +def test_api_info_conflicts( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "conflicts" + with client as request: + resp = request.get( + "/api/v6/info/conflicts/chungus-conflicts", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + # Make request: GET with query param by "conflicts", multiple + with client as request: + resp = request.get( + "/api/v6/info", + params={ + "by": "conflicts", + "arg": ["chungus-conflicts", "chungy-conflicts"], + }, + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_multi + + +def test_api_info_replaces( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "replaces" + with client as request: + resp = request.get( + "/api/v6/info/replaces/chungus-replaces", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_info_submitter( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "submitter" + with client as request: + resp = request.get( + "/api/v6/info/submitter/user2", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_multi + + +def test_api_info_maintainer( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "maintainer" + with client as request: + resp = request.get( + "/api/v6/info/maintainer/test", + ) + + response_data = orjson.loads(resp.text) + assert response_data["resultcount"] == 4 + + +def test_api_info_keywords( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "keywords" + with client as request: + resp = request.get( + "/api/v6/info/keywords/big-chungus", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + # Make request: GET with query param by "keywords", multiple + with client as request: + resp = request.get( + "/api/v6/info", + params={ + "by": "keywords", + "arg": ["big-chungus", "sizeable-chungus"], + }, + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_info_groups( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "groups" + with client as request: + resp = request.get( + "/api/v6/info/groups/testgroup", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_info_comaintainers( + client: TestClient, + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with query param by "comaintainers" + with client as request: + resp = request.get( + "/api/v6/info/comaintainers/user2", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_info_none( + client: TestClient, + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with non existent pkg + with client as request: + resp = request.get( + "/api/v6/info/nonsense", + ) + + response_data = orjson.loads(resp.text) + assert len(response_data["results"]) == 0 + assert response_data["resultcount"] == 0 + + +def test_api_info_orphans( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET by maintainer without arg + with client as request: + resp = request.get( + "/api/v6/info", + params={ + "by": "maintainer", + }, + ) + + response_data = orjson.loads(resp.text) + assert len(response_data["results"]) == 1 + assert response_data["resultcount"] == 1 + + # Make request: GET by maintainer with empty arg + with client as request: + resp = request.get("/api/v6/info", params={"by": "maintainer", "arg": ""}) + + response_data = orjson.loads(resp.text) + assert len(response_data["results"]) == 1 + assert response_data["resultcount"] == 1 + + +def test_api_search_get_query( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # set expected "type" + expected_single["type"] = "search" + + # Make request: GET with query param by "name-desc" + with client as request: + resp = request.get( + "/api/v6/search", + params={ + "arg": ["big-chungus"], + }, + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + # Make request: GET with query param by "name" + with client as request: + resp = request.get( + "/api/v6/search", + params={ + "by": "name", + "arg": ["big-chungus"], + }, + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + # Make request: GET with query param by "name" with "mode" + with client as request: + resp = request.get( + "/api/v6/search", + params={ + "mode": "contains", + "by": "name", + "arg": ["big-chungus"], + }, + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_search_get_path( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # set expected "type" + expected_single["type"] = "search" + + # Make request: GET with path param + with client as request: + resp = request.get( + "/api/v6/search/big-chungus", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + # Make request: GET with path params + with client as request: + resp = request.get( + "/api/v6/search/name/big-chungus", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_search_post( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # set expected "type" + expected_single["type"] = "search" + + # Make request: POST with form param + with client as request: + resp = request.post( + "/api/v6/search", + data={ + "by": "name", + "arg": ["big-chungus"], + }, + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_search_description( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # set expected "type" + expected_single["type"] = "search" + + # Make request: GET with path param + with client as request: + resp = request.get( + "/api/v6/search/name-desc/bunny", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_search_multiterm( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # set expected "type" + expected_single["type"] = "search" + + # Make request: GET with multiple terms + with client as request: + resp = request.get( + "/api/v6/search/name-desc/big%20around", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_search_starts_with( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # set expected "type" + expected_single["type"] = "search" + + # Make request: GET with path param + with client as request: + resp = request.get( + "/api/v6/search/name/starts-with/big", + ) + + response_data = orjson.loads(resp.text) + assert response_data == expected_single + + +def test_api_search_all( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # set expected "type" + expected_single["type"] = "search" + + # Make request: GET with path param + with client as request: + resp = request.get( + "/api/v6/search/name/contains/chungus", + ) + + response_data = orjson.loads(resp.text) + assert response_data["resultcount"] == 4 + + +def test_api_suggest( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with path param + with client as request: + resp = request.get( + "/api/v6/suggest/big", + ) + + response_data = orjson.loads(resp.text) + assert response_data[0] == "big-chungus" + + # Make request: GET with query param + with client as request: + resp = request.get( + "/api/v6/suggest", + params={ + "arg": "big", + }, + ) + + response_data = orjson.loads(resp.text) + assert response_data[0] == "big-chungus" + + +def test_api_suggest_empty( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # set expected "type" + expected_single["type"] = "suggest" + + # Make request: GET without arg + with client as request: + resp = request.get( + "/api/v6/suggest", + ) + + response_data = orjson.loads(resp.text) + assert response_data[0] == "big-chungus" + assert len(response_data) == 4 + + +def test_api_suggest_pkgbase( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with path param + with client as request: + resp = request.get( + "/api/v6/suggest-pkgbase/big", + ) + + response_data = orjson.loads(resp.text) + assert response_data[0] == "big-chungus" + + # Make request: GET with query param + with client as request: + resp = request.get( + "/api/v6/suggest-pkgbase", + params={ + "arg": "big", + }, + ) + + response_data = orjson.loads(resp.text) + + # Validate response + assert response_data[0] == "big-chungus" + + +def test_api_error_wrong_type( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with wrong type + with client as request: + resp = request.get( + "/api/v6/nonsense/bla", + ) + + response_data = orjson.loads(resp.text) + assert resp.status_code == int(HTTPStatus.BAD_REQUEST) + assert response_data["error"] == "Incorrect request type specified." + + +def test_api_error_wrong_version( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with wrong version + with client as request: + resp = request.get( + "/api/vnonsense/info/bla", + ) + + response_data = orjson.loads(resp.text) + assert resp.status_code == int(HTTPStatus.BAD_REQUEST) + assert response_data["error"] == "Invalid version specified." + + +def test_api_error_wrong_by( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with wrong by field + with client as request: + resp = request.get( + "/api/v6/info/nonsense/bla", + ) + + response_data = orjson.loads(resp.text) + assert resp.status_code == int(HTTPStatus.BAD_REQUEST) + assert response_data["error"] == "Incorrect by field specified." + + +def test_api_error_wrong_mode( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with wrong mode + with client as request: + resp = request.get( + "/api/v6/search/name/nonsense/bla", + ) + + response_data = orjson.loads(resp.text) + assert resp.status_code == int(HTTPStatus.BAD_REQUEST) + assert response_data["error"] == "Incorrect mode specified." + + +def test_api_error_arg_too_small( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + # Make request: GET with no arg + with client as request: + resp = request.get( + "/api/v6/search", + ) + + response_data = orjson.loads(resp.text) + assert resp.status_code == int(HTTPStatus.BAD_REQUEST) + assert response_data["error"] == "Query arg too small." + + # Make request: GET with single character arg + with client as request: + resp = request.get( + "/api/v6/search/a", + ) + + response_data = orjson.loads(resp.text) + assert resp.status_code == int(HTTPStatus.BAD_REQUEST) + assert response_data["error"] == "Query arg too small." + + +def test_api_error_too_many_results( + client: TestClient, + packages: list[Package], + depends: list[PackageDependency], + relations: list[PackageRelation], + comaintainer: list[PackageComaintainer], +): + config_getint = config.getint + + def mock_config(section: str, key: str): + if key == "max_rpc_results": + return 1 + return config_getint(section, key) + + # Make request: GET with too many results + with mock.patch("aurweb.config.getint", side_effect=mock_config): + with client as request: + resp = request.get( + "/api/v6/search/chungus", + ) + + response_data = orjson.loads(resp.text) + assert resp.status_code == int(HTTPStatus.BAD_REQUEST) + assert response_data["error"] == "Too many package results." + + +def test_api_error_ratelimit( + client: TestClient, + pipeline: Pipeline, + packages: list[Package], +): + config_getint = config.getint + + def mock_config(section: str, key: str): + if key == "request_limit": + return 4 + return config_getint(section, key) + + with mock.patch("aurweb.config.getint", side_effect=mock_config): + for _ in range(4): + # The first 4 requests should be good. + with client as request: + resp = request.get( + "/api/v6/suggest", + ) + assert resp.status_code == int(HTTPStatus.OK) + + # The fifth request should be banned. + with client as request: + resp = request.get( + "/api/v6/suggest", + ) + assert resp.status_code == int(HTTPStatus.TOO_MANY_REQUESTS) + + # Delete the cached records. + pipeline.delete("ratelimit-ws:testclient") + pipeline.delete("ratelimit:testclient") + one, two = pipeline.execute() + assert one and two + + # The new request should be good. + with client as request: + resp = request.get( + "/api/v6/suggest", + ) + assert resp.status_code == int(HTTPStatus.OK)