import re from http import HTTPStatus from unittest import mock import pytest from fastapi.testclient import TestClient from aurweb import asgi, db, time from aurweb.models import License, PackageLicense from aurweb.models.account_type import USER_ID, AccountType from aurweb.models.dependency_type import DependencyType from aurweb.models.official_provider import OfficialProvider from aurweb.models.package import Package from aurweb.models.package_base import PackageBase from aurweb.models.package_comaintainer import PackageComaintainer from aurweb.models.package_comment import PackageComment from aurweb.models.package_dependency import PackageDependency from aurweb.models.package_keyword import PackageKeyword from aurweb.models.package_notification import PackageNotification from aurweb.models.package_relation import PackageRelation from aurweb.models.package_request import PackageRequest from aurweb.models.package_vote import PackageVote from aurweb.models.relation_type import CONFLICTS_ID, PROVIDES_ID, REPLACES_ID, RelationType from aurweb.models.request_type import DELETION_ID, RequestType from aurweb.models.user import User from aurweb.testing.html import get_errors, get_successes, parse_root from aurweb.testing.requests import Request def package_endpoint(package: Package) -> str: return f"/packages/{package.Name}" def create_package(pkgname: str, maintainer: User) -> Package: pkgbase = db.create(PackageBase, Name=pkgname, Maintainer=maintainer) return db.create(Package, Name=pkgbase.Name, PackageBase=pkgbase) def create_package_dep(package: Package, depname: str, dep_type_name: str = "depends") -> PackageDependency: dep_type = db.query(DependencyType, DependencyType.Name == dep_type_name).first() return db.create(PackageDependency, DependencyType=dep_type, Package=package, DepName=depname) def create_package_rel(package: Package, relname: str) -> PackageRelation: rel_type = db.query(RelationType, RelationType.ID == PROVIDES_ID).first() return db.create(PackageRelation, RelationType=rel_type, Package=package, RelName=relname) @pytest.fixture(autouse=True) def setup(db_test): return @pytest.fixture def client() -> TestClient: """ Yield a FastAPI TestClient. """ yield TestClient(app=asgi.app) def create_user(username: str) -> User: with db.begin(): user = db.create(User, Username=username, Email=f"{username}@example.org", Passwd="testPassword", AccountTypeID=USER_ID) return user @pytest.fixture def user() -> User: """ Yield a user. """ user = create_user("test") yield user @pytest.fixture def maintainer() -> User: """ Yield a specific User used to maintain packages. """ account_type = db.query(AccountType, AccountType.ID == USER_ID).first() with db.begin(): maintainer = db.create(User, Username="test_maintainer", Email="test_maintainer@example.org", Passwd="testPassword", AccountType=account_type) yield maintainer @pytest.fixture def tu_user(): tu_type = db.query(AccountType, AccountType.AccountType == "Trusted User").first() with db.begin(): tu_user = db.create(User, Username="test_tu", Email="test_tu@example.org", RealName="Test TU", Passwd="testPassword", AccountType=tu_type) yield tu_user @pytest.fixture def package(maintainer: User) -> Package: """ Yield a Package created by user. """ now = time.utcnow() with db.begin(): pkgbase = db.create(PackageBase, Name="test-package", Maintainer=maintainer, Packager=maintainer, Submitter=maintainer, ModifiedTS=now) package = db.create(Package, PackageBase=pkgbase, Name=pkgbase.Name) yield package @pytest.fixture def pkgbase(package: Package) -> PackageBase: yield package.PackageBase @pytest.fixture def target(maintainer: User) -> PackageBase: """ Merge target. """ now = time.utcnow() with db.begin(): pkgbase = db.create(PackageBase, Name="target-package", Maintainer=maintainer, Packager=maintainer, Submitter=maintainer, ModifiedTS=now) db.create(Package, PackageBase=pkgbase, Name=pkgbase.Name) yield pkgbase @pytest.fixture def pkgreq(user: User, pkgbase: PackageBase) -> PackageRequest: """ Yield a PackageRequest related to `pkgbase`. """ with db.begin(): pkgreq = db.create(PackageRequest, ReqTypeID=DELETION_ID, User=user, PackageBase=pkgbase, PackageBaseName=pkgbase.Name, Comments=f"Deletion request for {pkgbase.Name}", ClosureComment=str()) yield pkgreq @pytest.fixture def comment(user: User, package: Package) -> PackageComment: pkgbase = package.PackageBase now = time.utcnow() with db.begin(): comment = db.create(PackageComment, User=user, PackageBase=pkgbase, Comments="Test comment.", RenderedComment=str(), CommentTS=now) yield comment @pytest.fixture def packages(maintainer: User) -> list[Package]: """ Yield 55 packages named pkg_0 .. pkg_54. """ packages_ = [] now = time.utcnow() with db.begin(): for i in range(55): pkgbase = db.create(PackageBase, Name=f"pkg_{i}", Maintainer=maintainer, Packager=maintainer, Submitter=maintainer, ModifiedTS=now) package = db.create(Package, PackageBase=pkgbase, Name=f"pkg_{i}") packages_.append(package) yield packages_ def test_package_not_found(client: TestClient): with client as request: resp = request.get("/packages/not_found") assert resp.status_code == int(HTTPStatus.NOT_FOUND) def test_package(client: TestClient, package: Package): """ Test a single / packages / {name} route. """ with db.begin(): db.create(PackageRelation, PackageID=package.ID, RelTypeID=PROVIDES_ID, RelName="test_provider1") db.create(PackageRelation, PackageID=package.ID, RelTypeID=PROVIDES_ID, RelName="test_provider2") db.create(PackageRelation, PackageID=package.ID, RelTypeID=REPLACES_ID, RelName="test_replacer1") db.create(PackageRelation, PackageID=package.ID, RelTypeID=REPLACES_ID, RelName="test_replacer2") db.create(PackageRelation, PackageID=package.ID, RelTypeID=CONFLICTS_ID, RelName="test_conflict1") db.create(PackageRelation, PackageID=package.ID, RelTypeID=CONFLICTS_ID, RelName="test_conflict2") # Create some licenses. licenses = [ db.create(License, Name="test_license1"), db.create(License, Name="test_license2") ] db.create(PackageLicense, PackageID=package.ID, License=licenses[0]) db.create(PackageLicense, PackageID=package.ID, License=licenses[1]) with client as request: resp = request.get(package_endpoint(package)) assert resp.status_code == int(HTTPStatus.OK) root = parse_root(resp.text) h2 = root.find('.//div[@id="pkgdetails"]/h2') sections = h2.text.split(":") assert sections[0] == "Package Details" name, version = sections[1].lstrip().split(" ") assert name == package.Name version == package.Version rows = root.findall('.//table[@id="pkginfo"]//tr') row = rows[1] # Second row is our target. pkgbase = row.find("./td/a") assert pkgbase.text.strip() == package.PackageBase.Name licenses = root.xpath('//tr[@id="licenses"]/td') expected = ["test_license1", "test_license2"] assert licenses[0].text.strip() == ", ".join(expected) provides = root.xpath('//tr[@id="provides"]/td') expected = ["test_provider1", "test_provider2"] assert provides[0].text.strip() == ", ".join(expected) replaces = root.xpath('//tr[@id="replaces"]/td') expected = ["test_replacer1", "test_replacer2"] assert replaces[0].text.strip() == ", ".join(expected) conflicts = root.xpath('//tr[@id="conflicts"]/td') expected = ["test_conflict1", "test_conflict2"] assert conflicts[0].text.strip() == ", ".join(expected) def paged_depends_required(client: TestClient, package: Package): maint = package.PackageBase.Maintainer new_pkgs = [] with db.begin(): # Create 25 new packages that'll be used to depend on our package. for i in range(26): base = db.create(PackageBase, Name=f"new_pkg{i}", Maintainer=maint) new_pkgs.append(db.create(Package, Name=base.Name)) # Create 25 deps. for i in range(25): create_package_dep(package, f"dep_{i}") with db.begin(): # Create depends on this package so we get some required by listings. for new_pkg in new_pkgs: create_package_dep(new_pkg, package.Name) with client as request: resp = request.get(package_endpoint(package)) assert resp.status_code == int(HTTPStatus.OK) # Test depends show link. assert "Show 5 more" in resp.text # Test required by show more link, we added 26 packages. assert "Show 6 more" in resp.text # Follow both links at the same time. with client as request: resp = request.get( package_endpoint(package), params={ "all_deps": True, "all_reqs": True, } ) assert resp.status_code == int(HTTPStatus.OK) # We're should see everything and have no link. assert "Show 5 more" not in resp.text assert "Show 6 more" not in resp.text def test_package_comments(client: TestClient, user: User, package: Package): now = (time.utcnow()) with db.begin(): comment = db.create(PackageComment, PackageBase=package.PackageBase, User=user, Comments="Test comment", CommentTS=now) cookies = {"AURSID": user.login(Request(), "testPassword")} with client as request: resp = request.get(package_endpoint(package), cookies=cookies) assert resp.status_code == int(HTTPStatus.OK) root = parse_root(resp.text) expected = [ comment.Comments ] comments = root.xpath('.//div[contains(@class, "package-comments")]' '/div[@class="article-content"]/div/text()') for i, row in enumerate(expected): assert comments[i].strip() == row def test_package_requests_display(client: TestClient, user: User, package: Package, pkgreq: PackageRequest): # Test that a single request displays "1 pending request". with client as request: resp = request.get(package_endpoint(package)) assert resp.status_code == int(HTTPStatus.OK) root = parse_root(resp.text) selector = '//div[@id="actionlist"]/ul/li/span[@class="flagged"]' target = root.xpath(selector)[0] assert target.text.strip() == "1 pending request" type_ = db.query(RequestType, RequestType.ID == DELETION_ID).first() with db.begin(): db.create(PackageRequest, PackageBase=package.PackageBase, PackageBaseName=package.PackageBase.Name, User=user, RequestType=type_, Comments="Test comment2.", ClosureComment=str()) # Test that a two requests display "2 pending requests". with client as request: resp = request.get(package_endpoint(package)) assert resp.status_code == int(HTTPStatus.OK) root = parse_root(resp.text) selector = '//div[@id="actionlist"]/ul/li/span[@class="flagged"]' target = root.xpath(selector)[0] assert target.text.strip() == "2 pending requests" def test_package_authenticated(client: TestClient, user: User, package: Package): """ We get the same here for either authenticated or not authenticated. Form inputs are presented to maintainers. This process also occurs when pkgbase.html is rendered. """ cookies = {"AURSID": user.login(Request(), "testPassword")} with client as request: resp = request.get(package_endpoint(package), cookies=cookies) assert resp.status_code == int(HTTPStatus.OK) expected = [ "View PKGBUILD", "View Changes", "Download snapshot", "Search wiki", "Flag package out-of-date", "Vote for this package", "Enable notifications", "Submit Request" ] for expected_text in expected: assert expected_text in resp.text # When no requests are up, make sure we don't see the display for them. root = parse_root(resp.text) selector = '//div[@id="actionlist"]/ul/li/span[@class="flagged"]' target = root.xpath(selector) assert len(target) == 0 def test_package_authenticated_maintainer(client: TestClient, maintainer: User, package: Package): cookies = {"AURSID": maintainer.login(Request(), "testPassword")} with client as request: resp = request.get(package_endpoint(package), cookies=cookies) assert resp.status_code == int(HTTPStatus.OK) expected = [ "View PKGBUILD", "View Changes", "Download snapshot", "Search wiki", "Flag package out-of-date", "Vote for this package", "Enable notifications", "Manage Co-Maintainers", "Submit Request", "Disown Package" ] for expected_text in expected: assert expected_text in resp.text def test_package_authenticated_tu(client: TestClient, tu_user: User, package: Package): cookies = {"AURSID": tu_user.login(Request(), "testPassword")} with client as request: resp = request.get(package_endpoint(package), cookies=cookies) assert resp.status_code == int(HTTPStatus.OK) expected = [ "View PKGBUILD", "View Changes", "Download snapshot", "Search wiki", "Flag package out-of-date", "Vote for this package", "Enable notifications", "Manage Co-Maintainers", "Submit Request", "Delete Package", "Merge Package", "Disown Package" ] for expected_text in expected: assert expected_text in resp.text def test_package_dependencies(client: TestClient, maintainer: User, package: Package): # Create a normal dependency of type depends. with db.begin(): dep_pkg = create_package("test-dep-1", maintainer) dep = create_package_dep(package, dep_pkg.Name) # Also, create a makedepends. make_dep_pkg = create_package("test-dep-2", maintainer) make_dep = create_package_dep(package, make_dep_pkg.Name, dep_type_name="makedepends") make_dep.DepArch = "x86_64" # And... a checkdepends! check_dep_pkg = create_package("test-dep-3", maintainer) create_package_dep(package, check_dep_pkg.Name, dep_type_name="checkdepends") # Geez. Just stop. This is optdepends. opt_dep_pkg = create_package("test-dep-4", maintainer) create_package_dep(package, opt_dep_pkg.Name, dep_type_name="optdepends") # Heh. Another optdepends to test one with a description. opt_desc_dep_pkg = create_package("test-dep-5", maintainer) opt_desc_dep = create_package_dep(package, opt_desc_dep_pkg.Name, dep_type_name="optdepends") opt_desc_dep.DepDesc = "Test description." broken_dep = create_package_dep(package, "test-dep-6", dep_type_name="depends") # Create an official provider record. db.create(OfficialProvider, Name="test-dep-99", Repo="core", Provides="test-dep-99") create_package_dep(package, "test-dep-99") # Also, create a provider who provides our test-dep-99. provider = create_package("test-provider", maintainer) create_package_rel(provider, dep.DepName) with client as request: resp = request.get(package_endpoint(package)) assert resp.status_code == int(HTTPStatus.OK) # Let's make sure all the non-broken deps are ordered as we expect. expected = list(filter( lambda e: e.is_package(), package.package_dependencies.order_by( PackageDependency.DepTypeID.asc(), PackageDependency.DepName.asc() ).all() )) root = parse_root(resp.text) pkgdeps = root.findall('.//ul[@id="pkgdepslist"]/li/a') for i, expectation in enumerate(expected): assert pkgdeps[i].text.strip() == expectation.DepName # Let's make sure the DepArch was displayed for our target make dep. arch = root.findall('.//ul[@id="pkgdepslist"]/li')[3] arch = arch.xpath('./em')[0] assert arch.text.strip() == "(make, x86_64)" # And let's make sure that the broken package was displayed. broken_node = root.find('.//ul[@id="pkgdepslist"]/li/span') assert broken_node.text.strip() == broken_dep.DepName def test_packages(client: TestClient, packages: list[Package]): with client as request: response = request.get("/packages", params={ "SeB": "X", # "X" isn't valid, defaults to "nd" "PP": "1 or 1", "O": "0 or 0" }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) stats = root.xpath('//div[@class="pkglist-stats"]/p')[0] pager_text = re.sub(r'\s+', " ", stats.text.replace("\n", "").strip()) assert pager_text == "55 packages found. Page 1 of 2." rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 50 # Default per-page def test_packages_empty(client: TestClient): with client as request: response = request.get("/packages") assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) results = root.xpath('//div[@id="pkglist-results"]/p') expected = "No packages matched your search criteria." assert results[0].text.strip() == expected def test_packages_search_by_name(client: TestClient, packages: list[Package]): with client as request: response = request.get("/packages", params={ "SeB": "n", "K": "pkg_" }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 50 # Default per-page def test_packages_search_by_exact_name(client: TestClient, packages: list[Package]): with client as request: response = request.get("/packages", params={ "SeB": "N", "K": "pkg_" }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') # There is no package named exactly 'pkg_', we get 0 results. assert len(rows) == 0 with client as request: response = request.get("/packages", params={ "SeB": "N", "K": "pkg_1" }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') # There's just one package named 'pkg_1', we get 1 result. assert len(rows) == 1 def test_packages_search_by_pkgbase(client: TestClient, packages: list[Package]): with client as request: response = request.get("/packages", params={ "SeB": "b", "K": "pkg_" }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 50 def test_packages_search_by_exact_pkgbase(client: TestClient, packages: list[Package]): with client as request: response = request.get("/packages", params={ "SeB": "B", "K": "pkg_" }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 0 with client as request: response = request.get("/packages", params={ "SeB": "B", "K": "pkg_1" }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 1 def test_packages_search_by_keywords(client: TestClient, packages: list[Package]): # None of our packages have keywords, so this query should return nothing. with client as request: response = request.get("/packages", params={ "SeB": "k", "K": "testKeyword" }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 0 # But now, let's create the keyword for the first package. package = packages[0] with db.begin(): db.create(PackageKeyword, PackageBase=package.PackageBase, Keyword="testKeyword") # And request packages with that keyword, we should get 1 result. with client as request: response = request.get("/packages", params={ "SeB": "k", "K": "testKeyword" }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 1 def test_packages_search_by_maintainer(client: TestClient, maintainer: User, package: Package): # We should expect that searching by `package`'s maintainer # returns `package` in the results. with client as request: response = request.get("/packages", params={ "SeB": "m", "K": maintainer.Username }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 1 # Search again by maintainer with no keywords given. # This kind of search returns all orphans instead. # In this first case, there are no orphan packages; assert that. with client as request: response = request.get("/packages", params={"SeB": "m"}) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 0 # Orphan `package`. with db.begin(): package.PackageBase.Maintainer = None # This time, we should get `package` returned, since it's now an orphan. with client as request: response = request.get("/packages", params={"SeB": "m"}) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 1 def test_packages_search_by_comaintainer(client: TestClient, maintainer: User, package: Package): # Nobody's a comaintainer yet. with client as request: response = request.get("/packages", params={ "SeB": "c", "K": maintainer.Username }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 0 # Now, we create a comaintainer. with db.begin(): db.create(PackageComaintainer, PackageBase=package.PackageBase, User=maintainer, Priority=1) # Then test that it's returned by our search. with client as request: response = request.get("/packages", params={ "SeB": "c", "K": maintainer.Username }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 1 def test_packages_search_by_co_or_maintainer(client: TestClient, maintainer: User, package: Package): with client as request: response = request.get("/packages", params={ "SeB": "M", "SB": "BLAH", # Invalid SB; gets reset to default "n". "K": maintainer.Username }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 1 with db.begin(): user = db.create(User, Username="comaintainer", Email="comaintainer@example.org", Passwd="testPassword") db.create(PackageComaintainer, PackageBase=package.PackageBase, User=user, Priority=1) with client as request: response = request.get("/packages", params={ "SeB": "M", "K": user.Username }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 1 def test_packages_search_by_submitter(client: TestClient, maintainer: User, package: Package): with client as request: response = request.get("/packages", params={ "SeB": "s", "K": maintainer.Username }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 1 def test_packages_sort_by_name(client: TestClient, packages: list[Package]): with client as request: response = request.get("/packages", params={ "SB": "n", # Name "SO": "a", # Ascending "PP": "150" }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') rows = [row.xpath('./td/a')[0].text.strip() for row in rows] with client as request: response2 = request.get("/packages", params={ "SB": "n", # Name "SO": "d", # Ascending "PP": "150" }) assert response2.status_code == int(HTTPStatus.OK) root = parse_root(response2.text) rows2 = root.xpath('//table[@class="results"]/tbody/tr') rows2 = [row.xpath('./td/a')[0].text.strip() for row in rows2] assert rows == list(reversed(rows2)) def test_packages_sort_by_votes(client: TestClient, maintainer: User, packages: list[Package]): # Set the first package's NumVotes to 1. with db.begin(): packages[0].PackageBase.NumVotes = 1 # Test that, by default, the first result is what we just set above. with client as request: response = request.get("/packages", params={ "SB": "v" # Votes. }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') votes = rows[0].xpath('./td')[2] # The third column of the first row. assert votes.text.strip() == "1" # Now, test that with an ascending order, the last result is # the one we set, since the default (above) is descending. with client as request: response = request.get("/packages", params={ "SB": "v", # Votes. "SO": "a", # Ascending. "O": "50" # Second page. }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') votes = rows[-1].xpath('./td')[2] # The third column of the last row. assert votes.text.strip() == "1" def test_packages_sort_by_popularity(client: TestClient, maintainer: User, packages: list[Package]): # Set the first package's Popularity to 0.50. with db.begin(): packages[0].PackageBase.Popularity = "0.50" # Test that, by default, the first result is what we just set above. with client as request: response = request.get("/packages", params={ "SB": "p" # Popularity }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') pop = rows[0].xpath('./td')[3] # The fourth column of the first row. assert pop.text.strip() == "0.50" def test_packages_sort_by_voted(client: TestClient, maintainer: User, packages: list[Package]): now = time.utcnow() with db.begin(): db.create(PackageVote, PackageBase=packages[0].PackageBase, User=maintainer, VoteTS=now) # Test that, by default, the first result is what we just set above. cookies = {"AURSID": maintainer.login(Request(), "testPassword")} with client as request: response = request.get("/packages", params={ "SB": "w", # Voted "SO": "d" # Descending, Voted first. }, cookies=cookies) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') voted = rows[0].xpath('./td')[5] # The sixth column of the first row. assert voted.text.strip() == "Yes" # Conversely, everything else was not voted on. voted = rows[1].xpath('./td')[5] # The sixth column of the second row. assert voted.text.strip() == str() # Empty. def test_packages_sort_by_notify(client: TestClient, maintainer: User, packages: list[Package]): db.create(PackageNotification, PackageBase=packages[0].PackageBase, User=maintainer) # Test that, by default, the first result is what we just set above. cookies = {"AURSID": maintainer.login(Request(), "testPassword")} with client as request: response = request.get("/packages", params={ "SB": "o", # Voted "SO": "d" # Descending, Voted first. }, cookies=cookies) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') notify = rows[0].xpath('./td')[6] # The sixth column of the first row. assert notify.text.strip() == "Yes" # Conversely, everything else was not voted on. notify = rows[1].xpath('./td')[6] # The sixth column of the second row. assert notify.text.strip() == str() # Empty. def test_packages_sort_by_maintainer(client: TestClient, maintainer: User, package: Package): """ Sort a package search by the maintainer column. """ # Create a second package, so the two can be ordered and checked. with db.begin(): maintainer2 = db.create(User, Username="maintainer2", Email="maintainer2@example.org", Passwd="testPassword") base2 = db.create(PackageBase, Name="pkg_2", Maintainer=maintainer2, Submitter=maintainer2, Packager=maintainer2) db.create(Package, Name="pkg_2", PackageBase=base2) # Check the descending order route. with client as request: response = request.get("/packages", params={ "SB": "m", "SO": "d" }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') col = rows[0].xpath('./td')[5].xpath('./a')[0] # Last column. assert col.text.strip() == maintainer.Username # On the other hand, with ascending, we should get reverse ordering. with client as request: response = request.get("/packages", params={ "SB": "m", "SO": "a" }) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') col = rows[0].xpath('./td')[5].xpath('./a')[0] # Last column. assert col.text.strip() == maintainer2.Username def test_packages_sort_by_last_modified(client: TestClient, packages: list[Package]): now = time.utcnow() # Set the first package's ModifiedTS to be 1000 seconds before now. package = packages[0] with db.begin(): package.PackageBase.ModifiedTS = now - 1000 with client as request: response = request.get("/packages", params={ "SB": "l", "SO": "a" # Ascending; oldest modification first. }) assert response.status_code == int(HTTPStatus.OK) # We should have 50 (default per page) results. root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 50 # Let's assert that the first item returned was the one we modified above. row = rows[0] col = row.xpath('./td')[0].xpath('./a')[0] assert col.text.strip() == package.Name def test_packages_flagged(client: TestClient, maintainer: User, packages: list[Package]): package = packages[0] now = time.utcnow() with db.begin(): package.PackageBase.OutOfDateTS = now package.PackageBase.Flagger = maintainer with client as request: response = request.get("/packages", params={ "outdated": "on" }) assert response.status_code == int(HTTPStatus.OK) # We should only get one result from this query; the package we flagged. root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 1 with client as request: response = request.get("/packages", params={ "outdated": "off" }) assert response.status_code == int(HTTPStatus.OK) # In this case, we should get 54 results, which means that the first # page will have 50 results (55 packages - 1 outdated = 54 not outdated). root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 50 def test_packages_orphans(client: TestClient, packages: list[Package]): package = packages[0] with db.begin(): package.PackageBase.Maintainer = None with client as request: response = request.get("/packages", params={"submit": "Orphans"}) assert response.status_code == int(HTTPStatus.OK) # We only have one orphan. Let's make sure that's what is returned. root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 1 def test_packages_per_page(client: TestClient, maintainer: User): """ Test the ability for /packages to deal with the PP query argument specifications (50, 100, 250; default: 50). """ with db.begin(): for i in range(255): base = db.create(PackageBase, Name=f"pkg_{i}", Maintainer=maintainer, Submitter=maintainer, Packager=maintainer) db.create(Package, PackageBase=base, Name=base.Name) # Test default case, PP of 50. with client as request: response = request.get("/packages", params={"PP": 50}) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 50 # Alright, test the next case, PP of 100. with client as request: response = request.get("/packages", params={"PP": 100}) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 100 # And finally, the last case, a PP of 250. with client as request: response = request.get("/packages", params={"PP": 250}) assert response.status_code == int(HTTPStatus.OK) root = parse_root(response.text) rows = root.xpath('//table[@class="results"]/tbody/tr') assert len(rows) == 250 def test_packages_post_unknown_action(client: TestClient, user: User, package: Package): cookies = {"AURSID": user.login(Request(), "testPassword")} with client as request: resp = request.post("/packages", data={"action": "unknown"}, cookies=cookies, allow_redirects=False) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) def test_packages_post_error(client: TestClient, user: User, package: Package): async def stub_action(request: Request, **kwargs): return (False, ["Some error."]) actions = {"stub": stub_action} with mock.patch.dict("aurweb.routers.packages.PACKAGE_ACTIONS", actions): cookies = {"AURSID": user.login(Request(), "testPassword")} with client as request: resp = request.post("/packages", data={"action": "stub"}, cookies=cookies, allow_redirects=False) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = "Some error." assert errors[0].text.strip() == expected def test_packages_post(client: TestClient, user: User, package: Package): async def stub_action(request: Request, **kwargs): return (True, ["Some success."]) actions = {"stub": stub_action} with mock.patch.dict("aurweb.routers.packages.PACKAGE_ACTIONS", actions): cookies = {"AURSID": user.login(Request(), "testPassword")} with client as request: resp = request.post("/packages", data={"action": "stub"}, cookies=cookies, allow_redirects=False) assert resp.status_code == int(HTTPStatus.OK) errors = get_successes(resp.text) expected = "Some success." assert errors[0].text.strip() == expected def test_packages_post_unflag(client: TestClient, user: User, maintainer: User, package: Package): # Flag `package` as `user`. now = time.utcnow() with db.begin(): package.PackageBase.Flagger = user package.PackageBase.OutOfDateTS = now cookies = {"AURSID": user.login(Request(), "testPassword")} # Don't supply any packages. post_data = {"action": "unflag", "IDs": []} with client as request: resp = request.post("/packages", data=post_data, cookies=cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = "You did not select any packages to unflag." assert errors[0].text.strip() == expected # Unflag the package as `user`. post_data = {"action": "unflag", "IDs": [package.ID]} with client as request: resp = request.post("/packages", data=post_data, cookies=cookies) assert resp.status_code == int(HTTPStatus.OK) assert package.PackageBase.Flagger is None successes = get_successes(resp.text) expected = "The selected packages have been unflagged." assert successes[0].text.strip() == expected # Re-flag `package` as `user`. now = time.utcnow() with db.begin(): package.PackageBase.Flagger = user package.PackageBase.OutOfDateTS = now # Try to unflag the package as `maintainer`, which is not allowed. maint_cookies = {"AURSID": maintainer.login(Request(), "testPassword")} post_data = {"action": "unflag", "IDs": [package.ID]} with client as request: resp = request.post("/packages", data=post_data, cookies=maint_cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = "You did not select any packages to unflag." assert errors[0].text.strip() == expected def test_packages_post_notify(client: TestClient, user: User, package: Package): notif = package.PackageBase.notifications.filter( PackageNotification.UserID == user.ID ).first() assert notif is None # Try to enable notifications but supply no packages, causing # an error to be rendered. cookies = {"AURSID": user.login(Request(), "testPassword")} with client as request: resp = request.post("/packages", data={"action": "notify"}, cookies=cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = "You did not select any packages to be notified about." assert errors[0].text.strip() == expected # Now let's actually enable notifications on `package`. with client as request: resp = request.post("/packages", data={ "action": "notify", "IDs": [package.ID] }, cookies=cookies) assert resp.status_code == int(HTTPStatus.OK) expected = "The selected packages' notifications have been enabled." successes = get_successes(resp.text) assert successes[0].text.strip() == expected # Try to enable notifications when they're already enabled, # causing an error to be rendered. with client as request: resp = request.post("/packages", data={ "action": "notify", "IDs": [package.ID] }, cookies=cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = "You did not select any packages to be notified about." assert errors[0].text.strip() == expected def test_packages_post_unnotify(client: TestClient, user: User, package: Package): # Create a notification record. with db.begin(): notif = db.create(PackageNotification, PackageBase=package.PackageBase, User=user) assert notif is not None # Request removal of the notification without any IDs. cookies = {"AURSID": user.login(Request(), "testPassword")} with client as request: resp = request.post("/packages", data={ "action": "unnotify" }, cookies=cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = "You did not select any packages for notification removal." assert errors[0].text.strip() == expected # Request removal of the notification; really. with client as request: resp = request.post("/packages", data={ "action": "unnotify", "IDs": [package.ID] }, cookies=cookies) assert resp.status_code == int(HTTPStatus.OK) successes = get_successes(resp.text) expected = "The selected packages' notifications have been removed." assert successes[0].text.strip() == expected # Let's ensure the record got removed. notif = package.PackageBase.notifications.filter( PackageNotification.UserID == user.ID ).first() assert notif is None # Try it again. The notif no longer exists. with client as request: resp = request.post("/packages", data={ "action": "unnotify", "IDs": [package.ID] }, cookies=cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = "A package you selected does not have notifications enabled." assert errors[0].text.strip() == expected def test_packages_post_adopt(client: TestClient, user: User, package: Package): # Try to adopt an empty list of packages. cookies = {"AURSID": user.login(Request(), "testPassword")} with client as request: resp = request.post("/packages", data={ "action": "adopt" }, cookies=cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = "You did not select any packages to adopt." assert errors[0].text.strip() == expected # Now, let's try to adopt a package that's already maintained. with client as request: resp = request.post("/packages", data={ "action": "adopt", "IDs": [package.ID], "confirm": True }, cookies=cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = "You are not allowed to adopt one of the packages you selected." assert errors[0].text.strip() == expected # Remove the maintainer from the DB. with db.begin(): package.PackageBase.Maintainer = None assert package.PackageBase.Maintainer is None # Now, let's try to adopt without confirming. with client as request: resp = request.post("/packages", data={ "action": "adopt", "IDs": [package.ID] }, cookies=cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = ("The selected packages have not been adopted, " "check the confirmation checkbox.") assert errors[0].text.strip() == expected # Let's do it again now that there is no maintainer. with client as request: resp = request.post("/packages", data={ "action": "adopt", "IDs": [package.ID], "confirm": True }, cookies=cookies) assert resp.status_code == int(HTTPStatus.OK) successes = get_successes(resp.text) expected = "The selected packages have been adopted." assert successes[0].text.strip() == expected def test_packages_post_disown_as_maintainer(client: TestClient, user: User, maintainer: User, package: Package): """ Disown packages as a maintainer. """ # Initially prove that we have a maintainer. assert package.PackageBase.Maintainer is not None assert package.PackageBase.Maintainer == maintainer # Try to run the disown action with no IDs; get an error. cookies = {"AURSID": maintainer.login(Request(), "testPassword")} with client as request: resp = request.post("/packages", data={ "action": "disown" }, cookies=cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = "You did not select any packages to disown." assert errors[0].text.strip() == expected assert package.PackageBase.Maintainer is not None # Try to disown `package` without giving the confirm argument. with client as request: resp = request.post("/packages", data={ "action": "disown", "IDs": [package.ID] }, cookies=cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) assert package.PackageBase.Maintainer is not None errors = get_errors(resp.text) expected = ("The selected packages have not been disowned, " "check the confirmation checkbox.") assert errors[0].text.strip() == expected # Now, try to disown `package` without credentials (as `user`). user_cookies = {"AURSID": user.login(Request(), "testPassword")} with client as request: resp = request.post("/packages", data={ "action": "disown", "IDs": [package.ID], "confirm": True }, cookies=user_cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) assert package.PackageBase.Maintainer is not None errors = get_errors(resp.text) expected = "You are not allowed to disown one of the packages you selected." assert errors[0].text.strip() == expected # Now, let's really disown `package` as `maintainer`. with client as request: resp = request.post("/packages", data={ "action": "disown", "IDs": [package.ID], "confirm": True }, cookies=cookies) assert package.PackageBase.Maintainer is None successes = get_successes(resp.text) expected = "The selected packages have been disowned." assert successes[0].text.strip() == expected def test_packages_post_disown(client: TestClient, tu_user: User, maintainer: User, package: Package): """ Disown packages as a Trusted User, which cannot bypass idle time. """ cookies = {"AURSID": tu_user.login(Request(), "testPassword")} with client as request: resp = request.post("/packages", data={ "action": "disown", "IDs": [package.ID], "confirm": True }, cookies=cookies) errors = get_errors(resp.text) expected = r"^No due existing orphan requests to accept for .+\.$" assert re.match(expected, errors[0].text.strip()) def test_packages_post_delete(caplog: pytest.fixture, client: TestClient, user: User, tu_user: User, package: Package): # First, let's try to use the delete action with no packages IDs. user_cookies = {"AURSID": user.login(Request(), "testPassword")} with client as request: resp = request.post("/packages", data={ "action": "delete" }, cookies=user_cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = "You did not select any packages to delete." assert errors[0].text.strip() == expected # Now, let's try to delete real packages without supplying "confirm". with client as request: resp = request.post("/packages", data={ "action": "delete", "IDs": [package.ID] }, cookies=user_cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = ("The selected packages have not been deleted, " "check the confirmation checkbox.") assert errors[0].text.strip() == expected # And again, with everything, but `user` doesn't have permissions. with client as request: resp = request.post("/packages", data={ "action": "delete", "IDs": [package.ID], "confirm": True }, cookies=user_cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = "You do not have permission to delete packages." assert errors[0].text.strip() == expected # Now, let's switch over to making the requests as a TU. # However, this next request will be rejected due to supplying # an invalid package ID. tu_cookies = {"AURSID": tu_user.login(Request(), "testPassword")} with client as request: resp = request.post("/packages", data={ "action": "delete", "IDs": [0], "confirm": True }, cookies=tu_cookies) assert resp.status_code == int(HTTPStatus.BAD_REQUEST) errors = get_errors(resp.text) expected = "One of the packages you selected does not exist." assert errors[0].text.strip() == expected # Whoo. Now, let's finally make a valid request as `tu_user` # to delete `package`. with client as request: resp = request.post("/packages", data={ "action": "delete", "IDs": [package.ID], "confirm": True }, cookies=tu_cookies) assert resp.status_code == int(HTTPStatus.OK) successes = get_successes(resp.text) expected = "The selected packages have been deleted." assert successes[0].text.strip() == expected # Expect that the package deletion was logged. pkgbases = [package.PackageBase.Name] expected = (f"Privileged user '{tu_user.Username}' deleted the " f"following package bases: {str(pkgbases)}.") assert expected in caplog.text def test_account_comments_unauthorized(client: TestClient, user: User): """ This test may seem out of place, but it requires packages, so its being included in the packages routes test suite to leverage existing fixtures. """ endpoint = f"/account/{user.Username}/comments" with client as request: resp = request.get(endpoint, allow_redirects=False) assert resp.status_code == int(HTTPStatus.SEE_OTHER) assert resp.headers.get("location").startswith("/login") def test_account_comments(client: TestClient, user: User, package: Package): """ This test may seem out of place, but it requires packages, so its being included in the packages routes test suite to leverage existing fixtures. """ now = time.utcnow() with db.begin(): # This comment's CommentTS is `now + 1`, so it is found in rendered # HTML before the rendered_comment, which has a CommentTS of `now`. comment = db.create(PackageComment, PackageBase=package.PackageBase, User=user, Comments="Test comment", CommentTS=now + 1) rendered_comment = db.create(PackageComment, PackageBase=package.PackageBase, User=user, Comments="Test comment", RenderedComment="
Test comment
", CommentTS=now) cookies = {"AURSID": user.login(Request(), "testPassword")} endpoint = f"/account/{user.Username}/comments" with client as request: resp = request.get(endpoint, cookies=cookies) assert resp.status_code == int(HTTPStatus.OK) root = parse_root(resp.text) comments = root.xpath('//div[@class="article-content"]/div') # Assert that we got Comments rendered from the first comment. assert comments[0].text.strip() == comment.Comments # And from the second, we have rendered content. rendered = comments[1].xpath('./p') expected = rendered_comment.RenderedComment.replace( "", "").replace("
", "") assert rendered[0].text.strip() == expected