add aurweb.cache, a redis caching utility module

Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
Kevin Morris 2021-08-06 22:44:19 -07:00
parent 968ed736c1
commit 9e73936c4e
No known key found for this signature in database
GPG key ID: F7E46DED420788F3
2 changed files with 94 additions and 0 deletions

20
aurweb/cache.py Normal file
View file

@ -0,0 +1,20 @@
from redis import Redis
from sqlalchemy import orm
async def db_count_cache(redis: Redis, key: str, query: orm.Query,
expire: int = None) -> int:
""" Store and retrieve a query.count() via redis cache.
:param redis: Redis handle
:param key: Redis key
:param query: SQLAlchemy ORM query
:param expire: Optional expiration in seconds
:return: query.count()
"""
result = redis.get(key)
if result is None:
redis.set(key, (result := int(query.count())))
if expire:
redis.expire(key, expire)
return int(result)

74
test/test_cache.py Normal file
View file

@ -0,0 +1,74 @@
import pytest
from aurweb import cache, db
from aurweb.models.account_type import USER_ID
from aurweb.models.user import User
from aurweb.testing import setup_test_db
@pytest.fixture(autouse=True)
def setup():
setup_test_db(
User.__tablename__
)
class StubRedis:
""" A class which acts as a RedisConnection without using Redis. """
cache = dict()
expires = dict()
def get(self, key, *args):
if "key" not in self.cache:
self.cache[key] = None
return self.cache[key]
def set(self, key, *args):
self.cache[key] = list(args)[0]
def expire(self, key, *args):
self.expires[key] = list(args)[0]
async def execute(self, command, key, *args):
f = getattr(self, command)
return f(key, *args)
@pytest.fixture
def redis():
yield StubRedis()
@pytest.mark.asyncio
async def test_db_count_cache(redis):
db.create(User, Username="user1",
Email="user1@example.org",
Passwd="testPassword",
AccountTypeID=USER_ID)
query = db.query(User)
# Now, perform several checks that db_count_cache matches query.count().
# We have no cached value yet.
assert await cache.db_count_cache(redis, "key1", query) == query.count()
# It's cached now.
assert await cache.db_count_cache(redis, "key1", query) == query.count()
@pytest.mark.asyncio
async def test_db_count_cache_expires(redis):
db.create(User, Username="user1",
Email="user1@example.org",
Passwd="testPassword",
AccountTypeID=USER_ID)
query = db.query(User)
# Cache a query with an expire.
value = await cache.db_count_cache(redis, "key1", query, 100)
assert value == query.count()
assert redis.expires["key1"] == 100