mirror of
https://gitlab.archlinux.org/archlinux/aurweb.git
synced 2025-02-03 10:43:03 +01:00
change(fastapi): rework /rpc (get)
This reworks the base implementation of the RPC to use a class called RPC for handling of requests. Took a bit of a different approach than PHP in terms of exposed methods, but it does end up achieving the same goal, with one additional error: "Request type '{type}' is not yet implemented." For FastAPI development, we'll stick with: - If the supplied 'type' argument has an alias mapping in RPC.ALIASES, we convert the type argument over to its alias before doing anything. Example: 'info' is aliased to 'multiinfo', so when a user requests type=info, it is converted to type=multiinfo. - If the type does not exist in RPC.EXPOSED_TYPES, the following error is produced: "No request type/data specified." - If the type **does** exist in RPC.EXPOSED_TYPES, but does not have an implemented `RPC._handle_{type}_type` function, the following error is produced: "Request type '{type}' is not yet implemented." Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
parent
30ab45f459
commit
7c4fb539d8
3 changed files with 227 additions and 255 deletions
|
@ -1,48 +1,57 @@
|
|||
from typing import List, Optional
|
||||
from urllib.parse import unquote
|
||||
|
||||
from fastapi import APIRouter, Query, Request
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from aurweb.rpc import RPC
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
def arg_legacy_gen(request):
|
||||
# '[]' characters in the path randomly kept getting transformed to (what
|
||||
# appears to be) their HTML-formatted variants, so we keep that behavior
|
||||
# just in case.
|
||||
arguments = request.url.query.replace("%5B%5D", "[]").split("&")
|
||||
arguments.reverse()
|
||||
def parse_args(request: Request):
|
||||
""" Handle legacy logic of 'arg' and 'arg[]' query parameter handling.
|
||||
|
||||
temp_args = []
|
||||
When 'arg' appears as the last argument given to the query string,
|
||||
that argument is used by itself as one single argument, regardless
|
||||
of any more 'arg' or 'arg[]' parameters supplied before it.
|
||||
|
||||
for i in arguments:
|
||||
# We only want to deal with 'arg' and 'arg[]' strings, so only take those.
|
||||
if i.split("=")[0] in ("arg", "arg[]"):
|
||||
temp_args += [i]
|
||||
When 'arg[]' appears as the last argument given to the query string,
|
||||
we iterate from last to first and build a list of arguments until
|
||||
we hit an 'arg'.
|
||||
|
||||
returned_arguments = []
|
||||
argument_bracketed = False
|
||||
TODO: This handling should be addressed in v6 of the RPC API. This
|
||||
was most likely a bi-product of legacy handling of versions 1-4
|
||||
which we no longer support.
|
||||
|
||||
for i in temp_args:
|
||||
# Split argument on first occurance of '='.
|
||||
current_argument = i.split("=")
|
||||
:param request: FastAPI request
|
||||
:returns: List of deduced arguments
|
||||
"""
|
||||
# Create a list of (key, value) pairs of the given 'arg' and 'arg[]'
|
||||
# query parameters from last to first.
|
||||
query = list(reversed(unquote(request.url.query).split("&")))
|
||||
parts = [
|
||||
e.split("=", 1) for e in query if e.startswith(("arg=", "arg[]="))
|
||||
]
|
||||
|
||||
argument_name = current_argument[0]
|
||||
argument_value = "".join(current_argument[1:])
|
||||
args = []
|
||||
if parts:
|
||||
# If we found 'arg' and/or 'arg[]' arguments, we begin processing
|
||||
# the set of arguments depending on the last key found.
|
||||
last = parts[0][0]
|
||||
|
||||
# Process argument.
|
||||
if argument_name == "arg[]":
|
||||
returned_arguments += [argument_value]
|
||||
argument_bracketed = True
|
||||
if last == "arg":
|
||||
# If the last key was 'arg', then it is our sole argument.
|
||||
args.append(parts[0][1])
|
||||
else:
|
||||
# Otherwise, it must be 'arg[]', so traverse backward
|
||||
# until we reach a non-'arg[]' key.
|
||||
for key, value in parts:
|
||||
if key != last:
|
||||
break
|
||||
args.append(value)
|
||||
|
||||
elif argument_name == "arg":
|
||||
# Only set this argument if 'arg[]' hasen't previously been found.
|
||||
if not argument_bracketed:
|
||||
returned_arguments = [argument_value]
|
||||
break
|
||||
|
||||
return returned_arguments
|
||||
return args
|
||||
|
||||
|
||||
@router.get("/rpc")
|
||||
|
@ -51,51 +60,7 @@ async def rpc(request: Request,
|
|||
type: Optional[str] = Query(None),
|
||||
arg: Optional[str] = Query(None),
|
||||
args: Optional[List[str]] = Query(None, alias="arg[]")):
|
||||
# Defaults for returned data
|
||||
returned_data = {}
|
||||
|
||||
returned_data["version"] = v
|
||||
returned_data["results"] = []
|
||||
returned_data["resultcount"] = 0
|
||||
|
||||
# Default the type field to "error", until we determine that
|
||||
# we're not erroneous (below).
|
||||
returned_data["type"] = "error"
|
||||
|
||||
# Ensure valid version was passed
|
||||
if v is None:
|
||||
returned_data["error"] = "Please specify an API version."
|
||||
return returned_data
|
||||
elif v != 5:
|
||||
returned_data["error"] = "Invalid version specified."
|
||||
return returned_data
|
||||
else:
|
||||
# We got past initial error cases; set the type to what
|
||||
# the user gave us.
|
||||
returned_data["type"] = type
|
||||
|
||||
# Take arguments from either 'args' or 'args[]' and put them into 'argument_list'.
|
||||
argument_list = []
|
||||
|
||||
# In the PHP implementation, aurweb uses the last 'arg' value or all the
|
||||
# last 'arg[]' values when both 'arg' and 'arg[]' are part of the query
|
||||
# request. We thus preserve that behavior here for legacy purposes.
|
||||
if arg is not None and args is not None:
|
||||
argument_list = arg_legacy_gen(request)
|
||||
elif arg is not None:
|
||||
argument_list = [arg]
|
||||
elif args is not None:
|
||||
argument_list = args
|
||||
else:
|
||||
# Abort because no package arguments were passed.
|
||||
returned_data["type"] = "error"
|
||||
returned_data["error"] = "No request type/data specified."
|
||||
return returned_data
|
||||
|
||||
# Process and return data
|
||||
returned_data = RPC(v=v,
|
||||
type=type,
|
||||
argument_list=argument_list,
|
||||
returned_data=returned_data)
|
||||
|
||||
return returned_data
|
||||
# Prepare output list of arguments.
|
||||
arguments = parse_args(request)
|
||||
return JSONResponse(RPC().handle(v=v, type=type, args=arguments))
|
||||
|
|
350
aurweb/rpc.py
350
aurweb/rpc.py
|
@ -1,11 +1,14 @@
|
|||
from collections import defaultdict
|
||||
from typing import List
|
||||
|
||||
from sqlalchemy import and_
|
||||
|
||||
import aurweb.config as config
|
||||
|
||||
from aurweb import db, models
|
||||
from aurweb import db, models, util
|
||||
from aurweb.models import dependency_type, relation_type
|
||||
|
||||
# Define dependency types.
|
||||
# Define dependency type mappings from ID to RPC-compatible keys.
|
||||
DEP_TYPES = {
|
||||
dependency_type.DEPENDS_ID: "Depends",
|
||||
dependency_type.MAKEDEPENDS_ID: "MakeDepends",
|
||||
|
@ -13,7 +16,7 @@ DEP_TYPES = {
|
|||
dependency_type.OPTDEPENDS_ID: "OptDepends"
|
||||
}
|
||||
|
||||
# Define relationship types.
|
||||
# Define relationship type mappings from ID to RPC-compatible keys.
|
||||
REL_TYPES = {
|
||||
relation_type.CONFLICTS_ID: "Conflicts",
|
||||
relation_type.PROVIDES_ID: "Provides",
|
||||
|
@ -21,195 +24,186 @@ REL_TYPES = {
|
|||
}
|
||||
|
||||
|
||||
# Define functions for request types.
|
||||
def add_deps(current_array, db_dep):
|
||||
if db_dep.count() > 0:
|
||||
# Create lists for all dependency types.
|
||||
for i in DEP_TYPES.values():
|
||||
current_array[i] = []
|
||||
|
||||
# Generate each dependency item in list.
|
||||
for i in db_dep.all():
|
||||
dep_string = i.DepName
|
||||
|
||||
# Add relationship version restrictor (i.e. '<=5') if it exists.
|
||||
if i.DepCondition is not None:
|
||||
dep_string += i.DepCondition
|
||||
|
||||
# Add item to list.
|
||||
current_deptype = DEP_TYPES.get(i.DepTypeID)
|
||||
current_array[current_deptype] += [dep_string]
|
||||
|
||||
# Remove any dependency lists that are empty.
|
||||
for i in DEP_TYPES.values():
|
||||
if current_array[i] == []:
|
||||
current_array.pop(i)
|
||||
|
||||
return current_array
|
||||
class RPCError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def add_rels(current_array, db_rel):
|
||||
if db_rel.count() > 0:
|
||||
# Create lists for all relationship types.
|
||||
for i in REL_TYPES.values():
|
||||
current_array[i] = []
|
||||
class RPC:
|
||||
""" RPC API handler class.
|
||||
|
||||
# Generate each relationship item in list.
|
||||
for i in db_rel.all():
|
||||
rel_string = i.RelName
|
||||
There are various pieces to RPC's process, and encapsulating them
|
||||
inside of a class means that external users do not abuse the
|
||||
RPC implementation to achieve goals. We call type handlers
|
||||
by taking a reference to the callback named "_handle_{type}_type(...)",
|
||||
and if the handler does not exist, we return a not implemented
|
||||
error to the API user.
|
||||
|
||||
# Add relationship version restrictor (i.e. '<=5') if it exists.
|
||||
if i.RelCondition is not None:
|
||||
rel_string += i.RelCondition
|
||||
EXPOSED_VERSIONS holds the set of versions that the API
|
||||
officially supports.
|
||||
|
||||
# Add item to list.
|
||||
current_reltype = REL_TYPES.get(i.RelTypeID)
|
||||
current_array[current_reltype] += [rel_string]
|
||||
EXPOSED_TYPES holds the set of types that the API officially
|
||||
supports.
|
||||
|
||||
# Remove any relationship lists that are empty.
|
||||
for i in REL_TYPES.values():
|
||||
if current_array[i] == []:
|
||||
current_array.pop(i)
|
||||
ALIASES holds an alias mapping of type -> type strings.
|
||||
|
||||
return current_array
|
||||
We should focus on privatizing implementation helpers and
|
||||
focusing on performance in the code used.
|
||||
"""
|
||||
|
||||
# A set of RPC versions supported by this API.
|
||||
EXPOSED_VERSIONS = {5}
|
||||
|
||||
def run_info(returned_data, package_name, snapshot_uri):
|
||||
# Get package name.
|
||||
db_package = db.query(models.Package).filter(
|
||||
models.Package.Name == package_name
|
||||
)
|
||||
|
||||
if db_package.count() == 0:
|
||||
return returned_data
|
||||
|
||||
db_package = db_package.first()
|
||||
|
||||
# Get name of package under PackageBaseID.
|
||||
db_package_baseid = db.query(models.PackageBase).filter(
|
||||
models.PackageBase.ID == db_package.PackageBaseID
|
||||
).first()
|
||||
|
||||
# Get maintainer info.
|
||||
db_package_maintainer = db.query(models.User).filter(
|
||||
models.User.ID == db_package_baseid.MaintainerUID
|
||||
).first()
|
||||
|
||||
current_array = {}
|
||||
returned_data["resultcount"] = returned_data["resultcount"] + 1
|
||||
|
||||
# Data from the Packages table.
|
||||
current_array["ID"] = db_package.ID
|
||||
current_array["Name"] = db_package.Name
|
||||
current_array["PackageBaseID"] = db_package.PackageBaseID
|
||||
current_array["Version"] = db_package.Version
|
||||
current_array["Description"] = db_package.Description
|
||||
current_array["URL"] = db_package.URL
|
||||
|
||||
# PackageBase table.
|
||||
current_array["PackageBase"] = db_package_baseid.Name
|
||||
current_array["NumVotes"] = db_package_baseid.NumVotes
|
||||
current_array["Popularity"] = db_package_baseid.Popularity
|
||||
current_array["OutOfDate"] = db_package_baseid.OutOfDateTS
|
||||
current_array["FirstSubmitted"] = db_package_baseid.SubmittedTS
|
||||
current_array["LastModified"] = db_package_baseid.ModifiedTS
|
||||
|
||||
# User table.
|
||||
try:
|
||||
current_array["Maintainer"] = db_package_maintainer.Username
|
||||
except AttributeError:
|
||||
current_array["Maintainer"] = None
|
||||
|
||||
# Generate and add snapshot_uri.
|
||||
current_array["URLPath"] = snapshot_uri.replace("%s", package_name)
|
||||
|
||||
# Add package votes.
|
||||
current_array["NumVotes"] = db.query(models.PackageVote).count()
|
||||
|
||||
# Generate dependency listing.
|
||||
db_dep = db.query(models.PackageDependency).filter(
|
||||
models.PackageDependency.PackageID == db_package.ID)
|
||||
current_array = add_deps(current_array, db_dep)
|
||||
|
||||
# Generate relationship listing.
|
||||
db_rel = db.query(models.PackageRelation).filter(
|
||||
models.PackageRelation.PackageID == db_package.ID)
|
||||
current_array = add_rels(current_array, db_rel)
|
||||
|
||||
# License table.
|
||||
current_array["License"] = []
|
||||
|
||||
licenses = db.query(models.PackageLicense).filter(
|
||||
models.PackageLicense.PackageID == db_package.ID)
|
||||
for i in licenses:
|
||||
current_array["License"] += [i.License.Name]
|
||||
|
||||
# Keywords table.
|
||||
current_array["Keywords"] = []
|
||||
|
||||
keywords = db.query(models.PackageKeyword).filter(
|
||||
models.PackageKeyword.PackageBaseID == db_package_baseid.ID)
|
||||
for i in keywords:
|
||||
current_array["Keywords"] += [i.Keyword]
|
||||
|
||||
# Add current array to returned results.
|
||||
returned_data["results"] += [current_array]
|
||||
return returned_data
|
||||
|
||||
|
||||
def run_suggest_pkgbase(returned_data, arg, snapshot_uri):
|
||||
results = db.query(models.PackageBase).filter(
|
||||
and_(models.PackageBase.PackagerUID.isnot(None),
|
||||
models.PackageBase.Name.like(f"%{arg}%"))
|
||||
).order_by(models.PackageBase.Name.asc()).limit(20)
|
||||
return [result.Name for result in results]
|
||||
|
||||
|
||||
def RPC(**function_args):
|
||||
# Get arguments.
|
||||
#
|
||||
# We'll use 'v' in the future when we add v6.
|
||||
# v = function_args.gea name used for an individual person, place, or
|
||||
# organization, spelled with initial capital letters, e.g., Larry,
|
||||
# Mexico, and Boston Red Sox.t("v")
|
||||
type = function_args.get("type")
|
||||
args = function_args.get("argument_list")
|
||||
returned_data = function_args.get("returned_data")
|
||||
|
||||
# Get Snapshot URI
|
||||
snapshot_uri = config.get("options", "snapshot_uri")
|
||||
|
||||
# Set request type to run.
|
||||
type_actions = {
|
||||
"info": run_info,
|
||||
"multiinfo": run_info,
|
||||
"suggest-pkgbase": run_suggest_pkgbase
|
||||
# A set of RPC types supported by this API.
|
||||
EXPOSED_TYPES = {
|
||||
"info", "multiinfo",
|
||||
"search", "msearch",
|
||||
"suggest", "suggest-pkgbase"
|
||||
}
|
||||
|
||||
# This if statement should always be executed, as we checked if the
|
||||
# specified type was valid in aurweb/routers/rpc.py.
|
||||
if type in type_actions:
|
||||
run_request = type_actions.get(type)
|
||||
# A mapping of aliases.
|
||||
ALIASES = {"info": "multiinfo"}
|
||||
|
||||
# If type is 'info', overwrite type to 'multiinfo' to match the
|
||||
# behavior of the PHP implementation.
|
||||
if type == "info":
|
||||
returned_data["type"] = "multiinfo"
|
||||
def _verify_inputs(self, v: int, type: str, args: List[str] = []):
|
||||
if v is None:
|
||||
raise RPCError("Please specify an API version.")
|
||||
|
||||
# Remove duplicate arguments if type is 'multiinfo' so we don't
|
||||
# fetch results for a package multiple times.
|
||||
if returned_data["type"] == "multiinfo":
|
||||
args = set(args)
|
||||
if v not in RPC.EXPOSED_VERSIONS:
|
||||
raise RPCError("Invalid version specified.")
|
||||
|
||||
for i in args:
|
||||
returned_data = run_request(returned_data, i, snapshot_uri)
|
||||
if type is None or not len(args):
|
||||
raise RPCError("No request type/data specified.")
|
||||
|
||||
elif type is None:
|
||||
returned_data["type"] = "error"
|
||||
returned_data["error"] = "No request type/data specified."
|
||||
if type not in RPC.EXPOSED_TYPES:
|
||||
raise RPCError("Incorrect request type specified.")
|
||||
|
||||
else:
|
||||
returned_data["type"] = "error"
|
||||
returned_data["error"] = "Incorrect request type specified."
|
||||
try:
|
||||
getattr(self, f"_handle_{type.replace('-', '_')}_type")
|
||||
except AttributeError:
|
||||
raise RPCError(f"Request type '{type}' is not yet implemented.")
|
||||
|
||||
return returned_data
|
||||
def _get_json_data(self, package: models.Package):
|
||||
""" Produce dictionary data of one Package that can be JSON-serialized.
|
||||
|
||||
:param package: Package instance
|
||||
:returns: JSON-serializable dictionary
|
||||
"""
|
||||
|
||||
# Produce RPC API compatible Popularity: If zero, it's an integer
|
||||
# 0, otherwise, it's formatted to the 6th decimal place.
|
||||
pop = package.PackageBase.Popularity
|
||||
pop = 0 if not pop else float(util.number_format(pop, 6))
|
||||
|
||||
snapshot_uri = config.get("options", "snapshot_uri")
|
||||
data = defaultdict(list)
|
||||
data.update({
|
||||
"ID": package.ID,
|
||||
"Name": package.Name,
|
||||
"PackageBaseID": package.PackageBaseID,
|
||||
"PackageBase": package.PackageBase.Name,
|
||||
# Maintainer should be set following this update if one exists.
|
||||
"Maintainer": None,
|
||||
"Version": package.Version,
|
||||
"Description": package.Description,
|
||||
"URL": package.URL,
|
||||
"URLPath": snapshot_uri % package.Name,
|
||||
"NumVotes": package.PackageBase.NumVotes,
|
||||
"Popularity": pop,
|
||||
"OutOfDate": package.PackageBase.OutOfDateTS,
|
||||
"FirstSubmitted": package.PackageBase.SubmittedTS,
|
||||
"LastModified": package.PackageBase.ModifiedTS,
|
||||
"License": [
|
||||
lic.License.Name for lic in package.package_licenses
|
||||
],
|
||||
"Keywords": [
|
||||
keyword.Keyword for keyword in package.PackageBase.keywords
|
||||
]
|
||||
})
|
||||
|
||||
if package.PackageBase.Maintainer is not None:
|
||||
# We do have a maintainer: set the Maintainer key.
|
||||
data["Maintainer"] = package.PackageBase.Maintainer.Username
|
||||
|
||||
# Walk through all related PackageDependencies and produce
|
||||
# the appropriate dict entries.
|
||||
if depends := package.package_dependencies:
|
||||
for dep in depends:
|
||||
if dep.DepTypeID in DEP_TYPES:
|
||||
key = DEP_TYPES.get(dep.DepTypeID)
|
||||
|
||||
display = dep.DepName
|
||||
if dep.DepCondition:
|
||||
display += dep.DepCondition
|
||||
|
||||
data[key].append(display)
|
||||
|
||||
# Walk through all related PackageRelations and produce
|
||||
# the appropriate dict entries.
|
||||
if relations := package.package_relations:
|
||||
for rel in relations:
|
||||
if rel.RelTypeID in REL_TYPES:
|
||||
key = REL_TYPES.get(rel.RelTypeID)
|
||||
|
||||
display = rel.RelName
|
||||
if rel.RelCondition:
|
||||
display += rel.RelCondition
|
||||
|
||||
data[key].append(display)
|
||||
|
||||
return data
|
||||
|
||||
def _handle_multiinfo_type(self, args: List[str] = []):
|
||||
args = set(args)
|
||||
packages = db.query(models.Package).filter(
|
||||
models.Package.Name.in_(args))
|
||||
return [self._get_json_data(pkg) for pkg in packages]
|
||||
|
||||
def _handle_suggest_pkgbase_type(self, args: List[str] = []):
|
||||
records = db.query(models.PackageBase).filter(
|
||||
and_(models.PackageBase.PackagerUID.isnot(None),
|
||||
models.PackageBase.Name.like(f"%{args[0]}%"))
|
||||
).order_by(models.PackageBase.Name.asc()).limit(20)
|
||||
return [record.Name for record in records]
|
||||
|
||||
def handle(self, v: int = 0, type: str = None, args: List[str] = []):
|
||||
""" Request entrypoint. A router should pass v, type and args
|
||||
to this function and expect an output dictionary to be returned.
|
||||
|
||||
:param v: RPC version argument
|
||||
:param type: RPC type argument
|
||||
:param args: Deciphered list of arguments based on arg/arg[] inputs
|
||||
"""
|
||||
# Convert type aliased types.
|
||||
if type in RPC.ALIASES:
|
||||
type = RPC.ALIASES.get(type)
|
||||
|
||||
# Prepare our output data dictionary with some basic keys.
|
||||
data = {"version": v, "type": type}
|
||||
|
||||
# Run some verification on our given arguments.
|
||||
try:
|
||||
self._verify_inputs(v, type, args)
|
||||
except RPCError as exc:
|
||||
data.update({
|
||||
"results": [],
|
||||
"resultcount": 0,
|
||||
"type": "error",
|
||||
"error": str(exc)
|
||||
})
|
||||
return data
|
||||
|
||||
# Get a handle to our callback and trap an RPCError with
|
||||
# an empty list of results based on callback's execution.
|
||||
callback = getattr(self, f"_handle_{type.replace('-', '_')}_type")
|
||||
results = callback(args)
|
||||
|
||||
# These types are special: we produce a different kind of
|
||||
# successful JSON output: a list of results.
|
||||
if type in ("suggest", "suggest-pkgbase"):
|
||||
return results
|
||||
|
||||
# Return JSON output.
|
||||
data.update({
|
||||
"resultcount": len(results),
|
||||
"results": results
|
||||
})
|
||||
return data
|
||||
|
|
|
@ -3,6 +3,7 @@ import pytest
|
|||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from aurweb import db, scripts
|
||||
from aurweb.asgi import app
|
||||
from aurweb.db import begin, create, query
|
||||
from aurweb.models.account_type import AccountType
|
||||
|
@ -173,6 +174,9 @@ def setup():
|
|||
PackageBase=pkgbase1,
|
||||
VoteTS=5000)
|
||||
|
||||
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
|
||||
scripts.popupdate.run_single(conn, pkgbase1)
|
||||
|
||||
|
||||
def test_rpc_singular_info():
|
||||
# Define expected response.
|
||||
|
@ -284,7 +288,7 @@ def test_rpc_no_dependencies():
|
|||
'Description': 'Wubby wubby on wobba wuubu',
|
||||
'URL': 'https://example.com/',
|
||||
'PackageBase': 'chungy-chungus',
|
||||
'NumVotes': 3,
|
||||
'NumVotes': 0,
|
||||
'Popularity': 0.0,
|
||||
'OutOfDate': None,
|
||||
'Maintainer': 'user1',
|
||||
|
@ -428,3 +432,12 @@ def test_rpc_suggest_pkgbase():
|
|||
response = make_request("/rpc?v=5&type=suggest-pkgbase&arg=chungy")
|
||||
data = response.json()
|
||||
assert data == ["chungy-chungus"]
|
||||
|
||||
|
||||
def test_rpc_unimplemented_types():
|
||||
unimplemented = ["search", "msearch", "suggest"]
|
||||
for type in unimplemented:
|
||||
response = make_request(f"/rpc?v=5&type={type}&arg=big")
|
||||
data = response.json()
|
||||
expected = f"Request type '{type}' is not yet implemented."
|
||||
assert data.get("error") == expected
|
||||
|
|
Loading…
Add table
Reference in a new issue