mirror of
https://gitlab.archlinux.org/archlinux/aurweb.git
synced 2025-02-03 10:43:03 +01:00
feat: Implement statistics class & additional metrics
The new module/class helps us constructing queries and count records to expose various statistics on the homepage. We also utilize for some new prometheus metrics (package and user gauges). Record counts are being cached with Redis. Signed-off-by: moson <moson@archlinux.org>
This commit is contained in:
parent
347c2ce721
commit
44c158b8c2
7 changed files with 143 additions and 89 deletions
|
@ -1,20 +1,15 @@
|
||||||
import pickle
|
import pickle
|
||||||
|
|
||||||
from prometheus_client import Counter
|
|
||||||
from sqlalchemy import orm
|
from sqlalchemy import orm
|
||||||
|
|
||||||
from aurweb import config
|
from aurweb import config
|
||||||
from aurweb.aur_redis import redis_connection
|
from aurweb.aur_redis import redis_connection
|
||||||
|
from aurweb.prometheus import SEARCH_REQUESTS
|
||||||
|
|
||||||
_redis = redis_connection()
|
_redis = redis_connection()
|
||||||
|
|
||||||
# Prometheus metrics
|
|
||||||
SEARCH_REQUESTS = Counter(
|
|
||||||
"search_requests", "Number of search requests by cache hit/miss", ["cache"]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
def db_count_cache(key: str, query: orm.Query, expire: int = None) -> int:
|
||||||
async def db_count_cache(key: str, query: orm.Query, expire: int = None) -> int:
|
|
||||||
"""Store and retrieve a query.count() via redis cache.
|
"""Store and retrieve a query.count() via redis cache.
|
||||||
|
|
||||||
:param key: Redis key
|
:param key: Redis key
|
||||||
|
@ -30,7 +25,7 @@ async def db_count_cache(key: str, query: orm.Query, expire: int = None) -> int:
|
||||||
return int(result)
|
return int(result)
|
||||||
|
|
||||||
|
|
||||||
async def db_query_cache(key: str, query: orm.Query, expire: int = None) -> list:
|
def db_query_cache(key: str, query: orm.Query, expire: int = None) -> list:
|
||||||
"""Store and retrieve query results via redis cache.
|
"""Store and retrieve query results via redis cache.
|
||||||
|
|
||||||
:param key: Redis key
|
:param key: Redis key
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
from typing import Any, Callable, Optional
|
from typing import Any, Callable, Optional
|
||||||
|
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter, Gauge
|
||||||
from prometheus_fastapi_instrumentator import Instrumentator
|
from prometheus_fastapi_instrumentator import Instrumentator
|
||||||
from prometheus_fastapi_instrumentator.metrics import Info
|
from prometheus_fastapi_instrumentator.metrics import Info
|
||||||
from starlette.routing import Match, Route
|
from starlette.routing import Match, Route
|
||||||
|
@ -11,10 +11,26 @@ logger = aur_logging.get_logger(__name__)
|
||||||
_instrumentator = Instrumentator()
|
_instrumentator = Instrumentator()
|
||||||
|
|
||||||
|
|
||||||
|
# Custom metrics
|
||||||
|
SEARCH_REQUESTS = Counter(
|
||||||
|
"aur_search_requests", "Number of search requests by cache hit/miss", ["cache"]
|
||||||
|
)
|
||||||
|
USERS = Gauge(
|
||||||
|
"aur_users", "Number of AUR users by type", ["type"], multiprocess_mode="livemax"
|
||||||
|
)
|
||||||
|
PACKAGES = Gauge(
|
||||||
|
"aur_packages",
|
||||||
|
"Number of AUR packages by state",
|
||||||
|
["state"],
|
||||||
|
multiprocess_mode="livemax",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def instrumentator():
|
def instrumentator():
|
||||||
return _instrumentator
|
return _instrumentator
|
||||||
|
|
||||||
|
|
||||||
|
# FastAPI metrics
|
||||||
# Taken from https://github.com/stephenhillier/starlette_exporter
|
# Taken from https://github.com/stephenhillier/starlette_exporter
|
||||||
# Their license is included in LICENSES/starlette_exporter.
|
# Their license is included in LICENSES/starlette_exporter.
|
||||||
# The code has been modified to remove child route checks
|
# The code has been modified to remove child route checks
|
||||||
|
|
|
@ -17,11 +17,10 @@ from sqlalchemy import case, or_
|
||||||
import aurweb.config
|
import aurweb.config
|
||||||
import aurweb.models.package_request
|
import aurweb.models.package_request
|
||||||
from aurweb import aur_logging, cookies, db, models, time, util
|
from aurweb import aur_logging, cookies, db, models, time, util
|
||||||
from aurweb.cache import db_count_cache
|
|
||||||
from aurweb.exceptions import handle_form_exceptions
|
from aurweb.exceptions import handle_form_exceptions
|
||||||
from aurweb.models.account_type import TRUSTED_USER_AND_DEV_ID, TRUSTED_USER_ID
|
|
||||||
from aurweb.models.package_request import PENDING_ID
|
from aurweb.models.package_request import PENDING_ID
|
||||||
from aurweb.packages.util import query_notified, query_voted, updated_packages
|
from aurweb.packages.util import query_notified, query_voted, updated_packages
|
||||||
|
from aurweb.statistics import Statistics, update_prometheus_metrics
|
||||||
from aurweb.templates import make_context, render_template
|
from aurweb.templates import make_context, render_template
|
||||||
|
|
||||||
logger = aur_logging.get_logger(__name__)
|
logger = aur_logging.get_logger(__name__)
|
||||||
|
@ -87,68 +86,12 @@ async def index(request: Request):
|
||||||
context = make_context(request, "Home")
|
context = make_context(request, "Home")
|
||||||
context["ssh_fingerprints"] = util.get_ssh_fingerprints()
|
context["ssh_fingerprints"] = util.get_ssh_fingerprints()
|
||||||
|
|
||||||
bases = db.query(models.PackageBase)
|
|
||||||
|
|
||||||
cache_expire = aurweb.config.getint("cache", "expiry_time")
|
cache_expire = aurweb.config.getint("cache", "expiry_time")
|
||||||
|
|
||||||
# Package statistics.
|
# Package statistics.
|
||||||
context["package_count"] = await db_count_cache(
|
stats = Statistics(cache_expire)
|
||||||
"package_count", bases, expire=cache_expire
|
for counter in stats.HOMEPAGE_COUNTERS:
|
||||||
)
|
context[counter] = stats.get_count(counter)
|
||||||
|
|
||||||
query = bases.filter(models.PackageBase.MaintainerUID.is_(None))
|
|
||||||
context["orphan_count"] = await db_count_cache(
|
|
||||||
"orphan_count", query, expire=cache_expire
|
|
||||||
)
|
|
||||||
|
|
||||||
query = db.query(models.User)
|
|
||||||
context["user_count"] = await db_count_cache(
|
|
||||||
"user_count", query, expire=cache_expire
|
|
||||||
)
|
|
||||||
|
|
||||||
query = query.filter(
|
|
||||||
or_(
|
|
||||||
models.User.AccountTypeID == TRUSTED_USER_ID,
|
|
||||||
models.User.AccountTypeID == TRUSTED_USER_AND_DEV_ID,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
context["trusted_user_count"] = await db_count_cache(
|
|
||||||
"trusted_user_count", query, expire=cache_expire
|
|
||||||
)
|
|
||||||
|
|
||||||
# Current timestamp.
|
|
||||||
now = time.utcnow()
|
|
||||||
|
|
||||||
seven_days = 86400 * 7 # Seven days worth of seconds.
|
|
||||||
seven_days_ago = now - seven_days
|
|
||||||
|
|
||||||
one_hour = 3600
|
|
||||||
updated = bases.filter(
|
|
||||||
models.PackageBase.ModifiedTS - models.PackageBase.SubmittedTS >= one_hour
|
|
||||||
)
|
|
||||||
|
|
||||||
query = bases.filter(models.PackageBase.SubmittedTS >= seven_days_ago)
|
|
||||||
context["seven_days_old_added"] = await db_count_cache(
|
|
||||||
"seven_days_old_added", query, expire=cache_expire
|
|
||||||
)
|
|
||||||
|
|
||||||
query = updated.filter(models.PackageBase.ModifiedTS >= seven_days_ago)
|
|
||||||
context["seven_days_old_updated"] = await db_count_cache(
|
|
||||||
"seven_days_old_updated", query, expire=cache_expire
|
|
||||||
)
|
|
||||||
|
|
||||||
year = seven_days * 52 # Fifty two weeks worth: one year.
|
|
||||||
year_ago = now - year
|
|
||||||
query = updated.filter(models.PackageBase.ModifiedTS >= year_ago)
|
|
||||||
context["year_old_updated"] = await db_count_cache(
|
|
||||||
"year_old_updated", query, expire=cache_expire
|
|
||||||
)
|
|
||||||
|
|
||||||
query = bases.filter(
|
|
||||||
models.PackageBase.ModifiedTS - models.PackageBase.SubmittedTS < 3600
|
|
||||||
)
|
|
||||||
context["never_updated"] = await db_count_cache(
|
|
||||||
"never_updated", query, expire=cache_expire
|
|
||||||
)
|
|
||||||
|
|
||||||
# Get the 15 most recently updated packages.
|
# Get the 15 most recently updated packages.
|
||||||
context["package_updates"] = updated_packages(15, cache_expire)
|
context["package_updates"] = updated_packages(15, cache_expire)
|
||||||
|
@ -193,7 +136,7 @@ async def index(request: Request):
|
||||||
)
|
)
|
||||||
|
|
||||||
archive_time = aurweb.config.getint("options", "request_archive_time")
|
archive_time = aurweb.config.getint("options", "request_archive_time")
|
||||||
start = now - archive_time
|
start = time.utcnow() - archive_time
|
||||||
|
|
||||||
# Package requests created by request.user.
|
# Package requests created by request.user.
|
||||||
context["package_requests"] = (
|
context["package_requests"] = (
|
||||||
|
@ -269,6 +212,9 @@ async def metrics(request: Request):
|
||||||
status_code=HTTPStatus.SERVICE_UNAVAILABLE,
|
status_code=HTTPStatus.SERVICE_UNAVAILABLE,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# update prometheus gauges for packages and users
|
||||||
|
update_prometheus_metrics()
|
||||||
|
|
||||||
registry = CollectorRegistry()
|
registry = CollectorRegistry()
|
||||||
multiprocess.MultiProcessCollector(registry)
|
multiprocess.MultiProcessCollector(registry)
|
||||||
data = generate_latest(registry)
|
data = generate_latest(registry)
|
||||||
|
|
|
@ -91,9 +91,7 @@ async def packages_get(
|
||||||
# increase the amount of time required to collect a count.
|
# increase the amount of time required to collect a count.
|
||||||
# we use redis for caching the results of the query
|
# we use redis for caching the results of the query
|
||||||
cache_expire = config.getint("cache", "expiry_time")
|
cache_expire = config.getint("cache", "expiry_time")
|
||||||
num_packages = await db_count_cache(
|
num_packages = db_count_cache(hash_query(search.query), search.query, cache_expire)
|
||||||
hash_query(search.query), search.query, cache_expire
|
|
||||||
)
|
|
||||||
|
|
||||||
# Apply user-specified sort column and ordering.
|
# Apply user-specified sort column and ordering.
|
||||||
search.sort_by(sort_by, sort_order)
|
search.sort_by(sort_by, sort_order)
|
||||||
|
@ -118,7 +116,7 @@ async def packages_get(
|
||||||
results = results.limit(per_page).offset(offset)
|
results = results.limit(per_page).offset(offset)
|
||||||
|
|
||||||
# we use redis for caching the results of the query
|
# we use redis for caching the results of the query
|
||||||
packages = await db_query_cache(hash_query(results), results, cache_expire)
|
packages = db_query_cache(hash_query(results), results, cache_expire)
|
||||||
|
|
||||||
context["packages"] = packages
|
context["packages"] = packages
|
||||||
context["packages_count"] = num_packages
|
context["packages_count"] = num_packages
|
||||||
|
|
102
aurweb/statistics.py
Normal file
102
aurweb/statistics.py
Normal file
|
@ -0,0 +1,102 @@
|
||||||
|
from aurweb import config, db, time
|
||||||
|
from aurweb.cache import db_count_cache
|
||||||
|
from aurweb.models import PackageBase, User
|
||||||
|
from aurweb.models.account_type import TRUSTED_USER_AND_DEV_ID, TRUSTED_USER_ID, USER_ID
|
||||||
|
from aurweb.prometheus import PACKAGES, USERS
|
||||||
|
|
||||||
|
|
||||||
|
class Statistics:
|
||||||
|
HOMEPAGE_COUNTERS = [
|
||||||
|
"package_count",
|
||||||
|
"orphan_count",
|
||||||
|
"seven_days_old_added",
|
||||||
|
"seven_days_old_updated",
|
||||||
|
"year_old_updated",
|
||||||
|
"never_updated",
|
||||||
|
"user_count",
|
||||||
|
"trusted_user_count",
|
||||||
|
]
|
||||||
|
PROMETHEUS_USER_COUNTERS = [
|
||||||
|
("trusted_user_count", "tu"),
|
||||||
|
("regular_user_count", "user"),
|
||||||
|
]
|
||||||
|
PROMETHEUS_PACKAGE_COUNTERS = [
|
||||||
|
("orphan_count", "orphan"),
|
||||||
|
("never_updated", "not_updated"),
|
||||||
|
("updated_packages", "updated"),
|
||||||
|
]
|
||||||
|
|
||||||
|
seven_days = 86400 * 7
|
||||||
|
one_hour = 3600
|
||||||
|
year = seven_days * 52
|
||||||
|
|
||||||
|
def __init__(self, cache_expire: int = None) -> "Statistics":
|
||||||
|
self.expiry_time = cache_expire
|
||||||
|
self.now = time.utcnow()
|
||||||
|
self.seven_days_ago = self.now - self.seven_days
|
||||||
|
self.year_ago = self.now - self.year
|
||||||
|
self.user_query = db.query(User)
|
||||||
|
self.bases_query = db.query(PackageBase)
|
||||||
|
self.updated_query = db.query(PackageBase).filter(
|
||||||
|
PackageBase.ModifiedTS - PackageBase.SubmittedTS >= self.one_hour
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_count(self, counter: str) -> int:
|
||||||
|
query = None
|
||||||
|
match counter:
|
||||||
|
case "package_count":
|
||||||
|
query = self.bases_query
|
||||||
|
case "orphan_count":
|
||||||
|
query = self.bases_query.filter(PackageBase.MaintainerUID.is_(None))
|
||||||
|
case "seven_days_old_added":
|
||||||
|
query = self.bases_query.filter(
|
||||||
|
PackageBase.SubmittedTS >= self.seven_days_ago
|
||||||
|
)
|
||||||
|
case "seven_days_old_updated":
|
||||||
|
query = self.updated_query.filter(
|
||||||
|
PackageBase.ModifiedTS >= self.seven_days_ago
|
||||||
|
)
|
||||||
|
case "year_old_updated":
|
||||||
|
query = self.updated_query.filter(
|
||||||
|
PackageBase.ModifiedTS >= self.year_ago
|
||||||
|
)
|
||||||
|
case "never_updated":
|
||||||
|
query = self.bases_query.filter(
|
||||||
|
PackageBase.ModifiedTS - PackageBase.SubmittedTS < self.one_hour
|
||||||
|
)
|
||||||
|
case "updated_packages":
|
||||||
|
query = self.bases_query.filter(
|
||||||
|
PackageBase.ModifiedTS - PackageBase.SubmittedTS > self.one_hour,
|
||||||
|
~PackageBase.MaintainerUID.is_(None),
|
||||||
|
)
|
||||||
|
case "user_count":
|
||||||
|
query = self.user_query
|
||||||
|
case "trusted_user_count":
|
||||||
|
query = self.user_query.filter(
|
||||||
|
User.AccountTypeID.in_(
|
||||||
|
(
|
||||||
|
TRUSTED_USER_ID,
|
||||||
|
TRUSTED_USER_AND_DEV_ID,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
case "regular_user_count":
|
||||||
|
query = self.user_query.filter(User.AccountTypeID == USER_ID)
|
||||||
|
case _:
|
||||||
|
return -1
|
||||||
|
|
||||||
|
return db_count_cache(counter, query, expire=self.expiry_time)
|
||||||
|
|
||||||
|
|
||||||
|
def update_prometheus_metrics():
|
||||||
|
cache_expire = config.getint("cache", "expiry_time")
|
||||||
|
stats = Statistics(cache_expire)
|
||||||
|
# Users gauge
|
||||||
|
for counter, utype in stats.PROMETHEUS_USER_COUNTERS:
|
||||||
|
count = stats.get_count(counter)
|
||||||
|
USERS.labels(utype).set(count)
|
||||||
|
|
||||||
|
# Packages gauge
|
||||||
|
for counter, state in stats.PROMETHEUS_PACKAGE_COUNTERS:
|
||||||
|
count = stats.get_count(counter)
|
||||||
|
PACKAGES.labels(state).set(count)
|
|
@ -31,15 +31,14 @@ def clear_fakeredis_cache():
|
||||||
cache._redis.flushall()
|
cache._redis.flushall()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
def test_db_count_cache(user):
|
||||||
async def test_db_count_cache(user):
|
|
||||||
query = db.query(User)
|
query = db.query(User)
|
||||||
|
|
||||||
# We have no cached value yet.
|
# We have no cached value yet.
|
||||||
assert cache._redis.get("key1") is None
|
assert cache._redis.get("key1") is None
|
||||||
|
|
||||||
# Add to cache
|
# Add to cache
|
||||||
assert await cache.db_count_cache("key1", query) == query.count()
|
assert cache.db_count_cache("key1", query) == query.count()
|
||||||
|
|
||||||
# It's cached now.
|
# It's cached now.
|
||||||
assert cache._redis.get("key1") is not None
|
assert cache._redis.get("key1") is not None
|
||||||
|
@ -48,35 +47,34 @@ async def test_db_count_cache(user):
|
||||||
assert cache._redis.ttl("key1") == -1
|
assert cache._redis.ttl("key1") == -1
|
||||||
|
|
||||||
# Cache a query with an expire.
|
# Cache a query with an expire.
|
||||||
value = await cache.db_count_cache("key2", query, 100)
|
value = cache.db_count_cache("key2", query, 100)
|
||||||
assert value == query.count()
|
assert value == query.count()
|
||||||
|
|
||||||
assert cache._redis.ttl("key2") == 100
|
assert cache._redis.ttl("key2") == 100
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
def test_db_query_cache(user):
|
||||||
async def test_db_query_cache(user):
|
|
||||||
query = db.query(User)
|
query = db.query(User)
|
||||||
|
|
||||||
# We have no cached value yet.
|
# We have no cached value yet.
|
||||||
assert cache._redis.get("key1") is None
|
assert cache._redis.get("key1") is None
|
||||||
|
|
||||||
# Add to cache
|
# Add to cache
|
||||||
await cache.db_query_cache("key1", query)
|
cache.db_query_cache("key1", query)
|
||||||
|
|
||||||
# It's cached now.
|
# It's cached now.
|
||||||
assert cache._redis.get("key1") is not None
|
assert cache._redis.get("key1") is not None
|
||||||
|
|
||||||
# Modify our user and make sure we got a cached value
|
# Modify our user and make sure we got a cached value
|
||||||
user.Username = "changed"
|
user.Username = "changed"
|
||||||
cached = await cache.db_query_cache("key1", query)
|
cached = cache.db_query_cache("key1", query)
|
||||||
assert cached[0].Username != query.all()[0].Username
|
assert cached[0].Username != query.all()[0].Username
|
||||||
|
|
||||||
# It does not expire
|
# It does not expire
|
||||||
assert cache._redis.ttl("key1") == -1
|
assert cache._redis.ttl("key1") == -1
|
||||||
|
|
||||||
# Cache a query with an expire.
|
# Cache a query with an expire.
|
||||||
value = await cache.db_query_cache("key2", query, 100)
|
value = cache.db_query_cache("key2", query, 100)
|
||||||
assert len(value) == query.count()
|
assert len(value) == query.count()
|
||||||
assert value[0].Username == query.all()[0].Username
|
assert value[0].Username == query.all()[0].Username
|
||||||
|
|
||||||
|
@ -90,7 +88,7 @@ async def test_db_query_cache(user):
|
||||||
|
|
||||||
with mock.patch("aurweb.config.getint", side_effect=mock_max_search_entries):
|
with mock.patch("aurweb.config.getint", side_effect=mock_max_search_entries):
|
||||||
# Try to add another entry (we already have 2)
|
# Try to add another entry (we already have 2)
|
||||||
await cache.db_query_cache("key3", query)
|
cache.db_query_cache("key3", query)
|
||||||
|
|
||||||
# Make sure it was not added because it exceeds our max.
|
# Make sure it was not added because it exceeds our max.
|
||||||
assert cache._redis.get("key3") is None
|
assert cache._redis.get("key3") is None
|
||||||
|
|
|
@ -26,11 +26,10 @@ def user() -> User:
|
||||||
yield user
|
yield user
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
def test_search_cache_metrics(user: User):
|
||||||
async def test_search_cache_metrics(user: User):
|
|
||||||
# Fire off 3 identical queries for caching
|
# Fire off 3 identical queries for caching
|
||||||
for _ in range(3):
|
for _ in range(3):
|
||||||
await db_query_cache("key", db.query(User))
|
db_query_cache("key", db.query(User))
|
||||||
|
|
||||||
# Get metrics
|
# Get metrics
|
||||||
metrics = str(generate_latest(REGISTRY))
|
metrics = str(generate_latest(REGISTRY))
|
||||||
|
|
Loading…
Add table
Reference in a new issue