diff --git a/aurweb/asgi.py b/aurweb/asgi.py index 5f0ad01d..cf878294 100644 --- a/aurweb/asgi.py +++ b/aurweb/asgi.py @@ -18,7 +18,7 @@ from aurweb.auth import BasicAuthBackend from aurweb.db import get_engine, query from aurweb.models.accepted_term import AcceptedTerm from aurweb.models.term import Term -from aurweb.routers import accounts, auth, errors, html, sso, trusted_user +from aurweb.routers import accounts, auth, errors, html, rss, sso, trusted_user # Setup the FastAPI app. app = FastAPI(exception_handlers=errors.exceptions) @@ -50,6 +50,7 @@ async def app_startup(): app.include_router(auth.router) app.include_router(accounts.router) app.include_router(trusted_user.router) + app.include_router(rss.router) # Initialize the database engine and ORM. get_engine() diff --git a/aurweb/routers/rss.py b/aurweb/routers/rss.py new file mode 100644 index 00000000..50127dd2 --- /dev/null +++ b/aurweb/routers/rss.py @@ -0,0 +1,84 @@ +from datetime import datetime + +from fastapi import APIRouter, Request +from fastapi.responses import Response +from feedgen.feed import FeedGenerator + +from aurweb import db, util +from aurweb.models.package import Package +from aurweb.models.package_base import PackageBase + +router = APIRouter() + + +def make_rss_feed(request: Request, packages: list, + date_attr: str): + """ Create an RSS Feed string for some packages. + + :param request: A FastAPI request + :param packages: A list of packages to add to the RSS feed + :param date_attr: The date attribute (DB column) to use + :return: RSS Feed string + """ + + feed = FeedGenerator() + feed.title("AUR Newest Packages") + feed.description("The latest and greatest packages in the AUR") + base = f"{request.url.scheme}://{request.url.netloc}" + feed.link(href=base, rel="alternate") + feed.link(href=f"{base}/rss", rel="self") + feed.image(title="AUR Newest Packages", + url=f"{base}/css/archnavbar/aurlogo.png", + link=base, + description="AUR Newest Packages Feed") + + for pkg in packages: + entry = feed.add_entry(order="append") + entry.title(pkg.Name) + entry.link(href=f"{base}/packages/{pkg.Name}", rel="alternate") + entry.link(href=f"{base}/rss", rel="self", type="application/rss+xml") + entry.description(pkg.Description or str()) + + attr = getattr(pkg.PackageBase, date_attr) + dt = util.timestamp_to_datetime(attr) + dt = util.as_timezone(dt, request.user.Timezone) + entry.pubDate(dt.strftime("%Y-%m-%d %H:%M:%S%z")) + + entry.source(f"{base}") + if pkg.PackageBase.Maintainer: + entry.author(author={"name": pkg.PackageBase.Maintainer.Username}) + entry.guid(f"{pkg.Name} - {attr}") + + return feed.rss_str() + + +@router.get("/rss/") +async def rss(request: Request): + packages = db.query(Package).join(PackageBase).order_by( + PackageBase.SubmittedTS.desc()).limit(100) + feed = make_rss_feed(request, packages, "SubmittedTS") + + response = Response(feed, media_type="application/rss+xml") + package = packages.first() + if package: + dt = datetime.utcfromtimestamp(package.PackageBase.SubmittedTS) + modified = dt.strftime("%a, %d %m %Y %H:%M:%S GMT") + response.headers["Last-Modified"] = modified + + return response + + +@router.get("/rss/modified") +async def rss_modified(request: Request): + packages = db.query(Package).join(PackageBase).order_by( + PackageBase.ModifiedTS.desc()).limit(100) + feed = make_rss_feed(request, packages, "ModifiedTS") + + response = Response(feed, media_type="application/rss+xml") + package = packages.first() + if package: + dt = datetime.utcfromtimestamp(package.PackageBase.ModifiedTS) + modified = dt.strftime("%a, %d %m %Y %H:%M:%S GMT") + response.headers["Last-Modified"] = modified + + return response diff --git a/test/test_rss.py b/test/test_rss.py new file mode 100644 index 00000000..7dd5bb47 --- /dev/null +++ b/test/test_rss.py @@ -0,0 +1,110 @@ +import logging + +from datetime import datetime +from http import HTTPStatus + +import lxml.etree +import pytest + +from fastapi.testclient import TestClient + +from aurweb import db +from aurweb.asgi import app +from aurweb.models.account_type import AccountType +from aurweb.models.package import Package +from aurweb.models.package_base import PackageBase +from aurweb.models.user import User +from aurweb.testing import setup_test_db + +logger = logging.getLogger(__name__) + + +@pytest.fixture(autouse=True) +def setup(): + setup_test_db( + Package.__tablename__, + PackageBase.__tablename__, + User.__tablename__) + + +@pytest.fixture +def client(): + yield TestClient(app=app) + + +@pytest.fixture +def user(): + account_type = db.query(AccountType, + AccountType.AccountType == "User").first() + yield db.create(User, Username="test", + Email="test@example.org", + RealName="Test User", + Passwd="testPassword", + AccountType=account_type) + + +@pytest.fixture +def packages(user): + pkgs = [] + now = int(datetime.utcnow().timestamp()) + + # Create 101 packages; we limit 100 on RSS feeds. + for i in range(101): + pkgbase = db.create( + PackageBase, Maintainer=user, Name=f"test-package-{i}", + SubmittedTS=(now + i), ModifiedTS=(now + i), autocommit=False) + pkg = db.create(Package, Name=pkgbase.Name, PackageBase=pkgbase, + autocommit=False) + pkgs.append(pkg) + db.commit() + yield pkgs + + +def parse_root(xml): + return lxml.etree.fromstring(xml) + + +def test_rss(client, user, packages): + with client as request: + response = request.get("/rss/") + assert response.status_code == int(HTTPStatus.OK) + + # Test that the RSS we got is sorted by descending SubmittedTS. + def key_(pkg): + return pkg.PackageBase.SubmittedTS + packages = list(reversed(sorted(packages, key=key_))) + + # Just take the first 100. + packages = packages[:100] + + root = parse_root(response.content) + items = root.xpath("//channel/item") + assert len(items) == 100 + + for i, item in enumerate(items): + title = next(iter(item.xpath('./title'))) + logger.debug(f"title: '{title.text}' vs name: '{packages[i].Name}'") + assert title.text == packages[i].Name + + +def test_rss_modified(client, user, packages): + with client as request: + response = request.get("/rss/modified") + assert response.status_code == int(HTTPStatus.OK) + + # Test that the RSS we got is sorted by descending SubmittedTS. + def key_(pkg): + return pkg.PackageBase.ModifiedTS + packages = list(reversed(sorted(packages, key=key_))) + + # Just take the first 100. + packages = packages[:100] + + root = parse_root(response.content) + items = root.xpath("//channel/item") + assert len(items) == 100 + + for i, item in enumerate(items): + title = next(iter(item.xpath('./title'))) + logger.debug(f"title: '{title.text}' vs name: '{packages[i].Name}'") + assert title.text == packages[i].Name