aurweb/test/test_packages_routes.py
Kevin Morris 26b1674c9e
fix(requests): rework handling of requests
This commit changes several things about how we were handling
package requests.

Modifications (requests):
-------------
- `/requests/{id}/close` no longer provides an Accepted selection.
  All manual request closures will cause a rejection.
- Relevent `pkgbase` actions now trigger request closures:
  `/pkgbase/{name}/delete` (deletion), `/pkgbase/{name}/merge` (merge)
  and `/pkgbase/{name}/disown` (orphan).
- Comment fields have been added to
  `/pkgbase/{name}/{delete,merge,disown}`, which is used to set the
  `PackageRequest.ClosureComment` on pending requests. If the comment
  field is left blank, a closure comment is autogenerated.
- Autogenerated request notifications are only sent out once
  as a closure notification.
- Some markup has been fixed.

Modifications (disown/orphan):
-----------------------------
- Orphan requests are now handled through the same path as
  deletion/merge.
- We now check for due date when disowning as non-maintainer;
  previously, this was only done for display and not functionally.
  This check applies to Trusted Users' disowning of a package.

This style of notification flow does reduce our visibility, but
accounting can still be done via the close request; it includes
the action, pkgbase name and the user who accepted it.

Closes #204

Signed-off-by: Kevin Morris <kevr@0cost.org>
2021-12-09 19:09:51 -08:00

2639 lines
100 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from datetime import datetime
from http import HTTPStatus
from typing import List
from unittest import mock
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import and_
from aurweb import asgi, db, defaults
from aurweb.models import License, PackageLicense
from aurweb.models.account_type import USER_ID, AccountType
from aurweb.models.dependency_type import DependencyType
from aurweb.models.official_provider import OfficialProvider
from aurweb.models.package import Package
from aurweb.models.package_base import PackageBase
from aurweb.models.package_comaintainer import PackageComaintainer
from aurweb.models.package_comment import PackageComment
from aurweb.models.package_dependency import PackageDependency
from aurweb.models.package_keyword import PackageKeyword
from aurweb.models.package_notification import PackageNotification
from aurweb.models.package_relation import PackageRelation
from aurweb.models.package_request import ACCEPTED_ID, REJECTED_ID, PackageRequest
from aurweb.models.package_vote import PackageVote
from aurweb.models.relation_type import CONFLICTS_ID, PROVIDES_ID, REPLACES_ID, RelationType
from aurweb.models.request_type import DELETION_ID, MERGE_ID, RequestType
from aurweb.models.user import User
from aurweb.testing.email import Email
from aurweb.testing.html import get_errors, get_successes, parse_root
from aurweb.testing.requests import Request
def package_endpoint(package: Package) -> str:
return f"/packages/{package.Name}"
def create_package(pkgname: str, maintainer: User) -> Package:
pkgbase = db.create(PackageBase,
Name=pkgname,
Maintainer=maintainer)
return db.create(Package, Name=pkgbase.Name, PackageBase=pkgbase)
def create_package_dep(package: Package, depname: str,
dep_type_name: str = "depends") -> PackageDependency:
dep_type = db.query(DependencyType,
DependencyType.Name == dep_type_name).first()
return db.create(PackageDependency,
DependencyType=dep_type,
Package=package,
DepName=depname)
def create_package_rel(package: Package,
relname: str) -> PackageRelation:
rel_type = db.query(RelationType,
RelationType.ID == PROVIDES_ID).first()
return db.create(PackageRelation,
RelationType=rel_type,
Package=package,
RelName=relname)
@pytest.fixture(autouse=True)
def setup(db_test):
return
@pytest.fixture
def client() -> TestClient:
""" Yield a FastAPI TestClient. """
yield TestClient(app=asgi.app)
@pytest.fixture
def user() -> User:
""" Yield a user. """
with db.begin():
user = db.create(User, Username="test",
Email="test@example.org",
Passwd="testPassword",
AccountTypeID=USER_ID)
yield user
@pytest.fixture
def maintainer() -> User:
""" Yield a specific User used to maintain packages. """
account_type = db.query(AccountType, AccountType.ID == USER_ID).first()
with db.begin():
maintainer = db.create(User, Username="test_maintainer",
Email="test_maintainer@example.org",
Passwd="testPassword",
AccountType=account_type)
yield maintainer
@pytest.fixture
def tu_user():
tu_type = db.query(AccountType,
AccountType.AccountType == "Trusted User").first()
with db.begin():
tu_user = db.create(User, Username="test_tu",
Email="test_tu@example.org",
RealName="Test TU", Passwd="testPassword",
AccountType=tu_type)
yield tu_user
@pytest.fixture
def package(maintainer: User) -> Package:
""" Yield a Package created by user. """
now = int(datetime.utcnow().timestamp())
with db.begin():
pkgbase = db.create(PackageBase,
Name="test-package",
Maintainer=maintainer,
Packager=maintainer,
Submitter=maintainer,
ModifiedTS=now)
package = db.create(Package,
PackageBase=pkgbase,
Name=pkgbase.Name)
yield package
@pytest.fixture
def pkgbase(package: Package) -> PackageBase:
yield package.PackageBase
@pytest.fixture
def target(maintainer: User) -> PackageBase:
""" Merge target. """
now = int(datetime.utcnow().timestamp())
with db.begin():
pkgbase = db.create(PackageBase, Name="target-package",
Maintainer=maintainer,
Packager=maintainer,
Submitter=maintainer,
ModifiedTS=now)
db.create(Package, PackageBase=pkgbase, Name=pkgbase.Name)
yield pkgbase
@pytest.fixture
def pkgreq(user: User, pkgbase: PackageBase) -> PackageRequest:
""" Yield a PackageRequest related to `pkgbase`. """
with db.begin():
pkgreq = db.create(PackageRequest,
ReqTypeID=DELETION_ID,
User=user,
PackageBase=pkgbase,
PackageBaseName=pkgbase.Name,
Comments=f"Deletion request for {pkgbase.Name}",
ClosureComment=str())
yield pkgreq
@pytest.fixture
def comment(user: User, package: Package) -> PackageComment:
pkgbase = package.PackageBase
now = int(datetime.utcnow().timestamp())
with db.begin():
comment = db.create(PackageComment,
User=user,
PackageBase=pkgbase,
Comments="Test comment.",
RenderedComment=str(),
CommentTS=now)
yield comment
@pytest.fixture
def packages(maintainer: User) -> List[Package]:
""" Yield 55 packages named pkg_0 .. pkg_54. """
packages_ = []
now = int(datetime.utcnow().timestamp())
with db.begin():
for i in range(55):
pkgbase = db.create(PackageBase,
Name=f"pkg_{i}",
Maintainer=maintainer,
Packager=maintainer,
Submitter=maintainer,
ModifiedTS=now)
package = db.create(Package,
PackageBase=pkgbase,
Name=f"pkg_{i}")
packages_.append(package)
yield packages_
@pytest.fixture
def requests(user: User, packages: List[Package]) -> List[PackageRequest]:
pkgreqs = []
deletion_type = db.query(RequestType).filter(
RequestType.ID == DELETION_ID
).first()
with db.begin():
for i in range(55):
pkgreq = db.create(PackageRequest,
RequestType=deletion_type,
User=user,
PackageBase=packages[i].PackageBase,
PackageBaseName=packages[i].Name,
Comments=f"Deletion request for pkg_{i}",
ClosureComment=str())
pkgreqs.append(pkgreq)
yield pkgreqs
def test_package_not_found(client: TestClient):
with client as request:
resp = request.get("/packages/not_found")
assert resp.status_code == int(HTTPStatus.NOT_FOUND)
def test_package_official_not_found(client: TestClient, package: Package):
""" When a Package has a matching OfficialProvider record, it is not
hosted on AUR, but in the official repositories. Getting a package
with this kind of record should return a status code 404. """
with db.begin():
db.create(OfficialProvider,
Name=package.Name,
Repo="core",
Provides=package.Name)
with client as request:
resp = request.get(package_endpoint(package))
assert resp.status_code == int(HTTPStatus.NOT_FOUND)
def test_package(client: TestClient, package: Package):
""" Test a single / packages / {name} route. """
with db.begin():
db.create(PackageRelation, PackageID=package.ID,
RelTypeID=PROVIDES_ID,
RelName="test_provider1")
db.create(PackageRelation, PackageID=package.ID,
RelTypeID=PROVIDES_ID,
RelName="test_provider2")
db.create(PackageRelation, PackageID=package.ID,
RelTypeID=REPLACES_ID,
RelName="test_replacer1")
db.create(PackageRelation, PackageID=package.ID,
RelTypeID=REPLACES_ID,
RelName="test_replacer2")
db.create(PackageRelation, PackageID=package.ID,
RelTypeID=CONFLICTS_ID,
RelName="test_conflict1")
db.create(PackageRelation, PackageID=package.ID,
RelTypeID=CONFLICTS_ID,
RelName="test_conflict2")
# Create some licenses.
licenses = [
db.create(License, Name="test_license1"),
db.create(License, Name="test_license2")
]
db.create(PackageLicense, PackageID=package.ID,
License=licenses[0])
db.create(PackageLicense, PackageID=package.ID,
License=licenses[1])
with client as request:
resp = request.get(package_endpoint(package))
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
h2 = root.find('.//div[@id="pkgdetails"]/h2')
sections = h2.text.split(":")
assert sections[0] == "Package Details"
name, version = sections[1].lstrip().split(" ")
assert name == package.Name
version == package.Version
rows = root.findall('.//table[@id="pkginfo"]//tr')
row = rows[1] # Second row is our target.
pkgbase = row.find("./td/a")
assert pkgbase.text.strip() == package.PackageBase.Name
licenses = root.xpath('//tr[@id="licenses"]/td')
expected = ["test_license1", "test_license2"]
assert licenses[0].text.strip() == ", ".join(expected)
provides = root.xpath('//tr[@id="provides"]/td')
expected = ["test_provider1", "test_provider2"]
assert provides[0].text.strip() == ", ".join(expected)
replaces = root.xpath('//tr[@id="replaces"]/td')
expected = ["test_replacer1", "test_replacer2"]
assert replaces[0].text.strip() == ", ".join(expected)
conflicts = root.xpath('//tr[@id="conflicts"]/td')
expected = ["test_conflict1", "test_conflict2"]
assert conflicts[0].text.strip() == ", ".join(expected)
def test_package_comments(client: TestClient, user: User, package: Package):
now = (datetime.utcnow().timestamp())
with db.begin():
comment = db.create(PackageComment, PackageBase=package.PackageBase,
User=user, Comments="Test comment", CommentTS=now)
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.get(package_endpoint(package), cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
expected = [
comment.Comments
]
comments = root.xpath('.//div[contains(@class, "package-comments")]'
'/div[@class="article-content"]/div/text()')
for i, row in enumerate(expected):
assert comments[i].strip() == row
def test_package_requests_display(client: TestClient, user: User,
package: Package, pkgreq: PackageRequest):
# Test that a single request displays "1 pending request".
with client as request:
resp = request.get(package_endpoint(package))
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
selector = '//div[@id="actionlist"]/ul/li/span[@class="flagged"]'
target = root.xpath(selector)[0]
assert target.text.strip() == "1 pending request"
type_ = db.query(RequestType, RequestType.ID == DELETION_ID).first()
with db.begin():
db.create(PackageRequest, PackageBase=package.PackageBase,
PackageBaseName=package.PackageBase.Name,
User=user, RequestType=type_,
Comments="Test comment2.",
ClosureComment=str())
# Test that a two requests display "2 pending requests".
with client as request:
resp = request.get(package_endpoint(package))
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
selector = '//div[@id="actionlist"]/ul/li/span[@class="flagged"]'
target = root.xpath(selector)[0]
assert target.text.strip() == "2 pending requests"
def test_package_authenticated(client: TestClient, user: User,
package: Package):
""" We get the same here for either authenticated or not
authenticated. Form inputs are presented to maintainers.
This process also occurs when pkgbase.html is rendered. """
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.get(package_endpoint(package), cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
expected = [
"View PKGBUILD",
"View Changes",
"Download snapshot",
"Search wiki",
"Flag package out-of-date",
"Vote for this package",
"Enable notifications",
"Submit Request"
]
for expected_text in expected:
assert expected_text in resp.text
# When no requests are up, make sure we don't see the display for them.
root = parse_root(resp.text)
selector = '//div[@id="actionlist"]/ul/li/span[@class="flagged"]'
target = root.xpath(selector)
assert len(target) == 0
def test_package_authenticated_maintainer(client: TestClient,
maintainer: User,
package: Package):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
with client as request:
resp = request.get(package_endpoint(package), cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
expected = [
"View PKGBUILD",
"View Changes",
"Download snapshot",
"Search wiki",
"Flag package out-of-date",
"Vote for this package",
"Enable notifications",
"Manage Co-Maintainers",
"Submit Request",
"Disown Package"
]
for expected_text in expected:
assert expected_text in resp.text
def test_package_authenticated_tu(client: TestClient,
tu_user: User,
package: Package):
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
with client as request:
resp = request.get(package_endpoint(package), cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
expected = [
"View PKGBUILD",
"View Changes",
"Download snapshot",
"Search wiki",
"Flag package out-of-date",
"Vote for this package",
"Enable notifications",
"Manage Co-Maintainers",
"Submit Request",
"Delete Package",
"Merge Package",
"Disown Package"
]
for expected_text in expected:
assert expected_text in resp.text
def test_package_dependencies(client: TestClient, maintainer: User,
package: Package):
# Create a normal dependency of type depends.
with db.begin():
dep_pkg = create_package("test-dep-1", maintainer)
dep = create_package_dep(package, dep_pkg.Name)
dep.DepArch = "x86_64"
# Also, create a makedepends.
make_dep_pkg = create_package("test-dep-2", maintainer)
make_dep = create_package_dep(package, make_dep_pkg.Name,
dep_type_name="makedepends")
# And... a checkdepends!
check_dep_pkg = create_package("test-dep-3", maintainer)
check_dep = create_package_dep(package, check_dep_pkg.Name,
dep_type_name="checkdepends")
# Geez. Just stop. This is optdepends.
opt_dep_pkg = create_package("test-dep-4", maintainer)
opt_dep = create_package_dep(package, opt_dep_pkg.Name,
dep_type_name="optdepends")
# Heh. Another optdepends to test one with a description.
opt_desc_dep_pkg = create_package("test-dep-5", maintainer)
opt_desc_dep = create_package_dep(package, opt_desc_dep_pkg.Name,
dep_type_name="optdepends")
opt_desc_dep.DepDesc = "Test description."
broken_dep = create_package_dep(package, "test-dep-6",
dep_type_name="depends")
# Create an official provider record.
db.create(OfficialProvider, Name="test-dep-99",
Repo="core", Provides="test-dep-99")
official_dep = create_package_dep(package, "test-dep-99")
# Also, create a provider who provides our test-dep-99.
provider = create_package("test-provider", maintainer)
create_package_rel(provider, dep.DepName)
with client as request:
resp = request.get(package_endpoint(package))
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
expected = [
dep.DepName,
make_dep.DepName,
check_dep.DepName,
opt_dep.DepName,
opt_desc_dep.DepName,
official_dep.DepName
]
pkgdeps = root.findall('.//ul[@id="pkgdepslist"]/li/a')
for i, expectation in enumerate(expected):
assert pkgdeps[i].text.strip() == expectation
# Let's make sure the DepArch was displayed for our first dep.
arch = root.findall('.//ul[@id="pkgdepslist"]/li')[0]
arch = arch.xpath('./em')[1]
assert arch.text.strip() == "(x86_64)"
broken_node = root.find('.//ul[@id="pkgdepslist"]/li/span')
assert broken_node.text.strip() == broken_dep.DepName
def test_pkgbase_not_found(client: TestClient):
with client as request:
resp = request.get("/pkgbase/not_found")
assert resp.status_code == int(HTTPStatus.NOT_FOUND)
def test_pkgbase_redirect(client: TestClient, package: Package):
with client as request:
resp = request.get(f"/pkgbase/{package.Name}",
allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/packages/{package.Name}"
def test_pkgbase(client: TestClient, package: Package):
with db.begin():
second = db.create(Package, Name="second-pkg",
PackageBase=package.PackageBase)
expected = [package.Name, second.Name]
with client as request:
resp = request.get(f"/pkgbase/{package.Name}",
allow_redirects=False)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
# Check the details box title.
title = root.find('.//div[@id="pkgdetails"]/h2')
title, pkgname = title.text.split(": ")
assert title == "Package Base Details"
assert pkgname == package.Name
pkgs = root.findall('.//div[@id="pkgs"]/ul/li/a')
for i, name in enumerate(expected):
assert pkgs[i].text.strip() == name
def test_packages(client: TestClient, packages: List[Package]):
with client as request:
response = request.get("/packages", params={
"SeB": "X", # "X" isn't valid, defaults to "nd"
"PP": "1 or 1",
"O": "0 or 0"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
stats = root.xpath('//div[@class="pkglist-stats"]/p')[0]
pager_text = re.sub(r'\s+', " ", stats.text.replace("\n", "").strip())
assert pager_text == "55 packages found. Page 1 of 2."
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 50 # Default per-page
def test_packages_search_by_name(client: TestClient, packages: List[Package]):
with client as request:
response = request.get("/packages", params={
"SeB": "n",
"K": "pkg_"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 50 # Default per-page
def test_packages_search_by_exact_name(client: TestClient,
packages: List[Package]):
with client as request:
response = request.get("/packages", params={
"SeB": "N",
"K": "pkg_"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
# There is no package named exactly 'pkg_', we get 0 results.
assert len(rows) == 0
with client as request:
response = request.get("/packages", params={
"SeB": "N",
"K": "pkg_1"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
# There's just one package named 'pkg_1', we get 1 result.
assert len(rows) == 1
def test_packages_search_by_pkgbase(client: TestClient,
packages: List[Package]):
with client as request:
response = request.get("/packages", params={
"SeB": "b",
"K": "pkg_"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 50
def test_packages_search_by_exact_pkgbase(client: TestClient,
packages: List[Package]):
with client as request:
response = request.get("/packages", params={
"SeB": "B",
"K": "pkg_"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 0
with client as request:
response = request.get("/packages", params={
"SeB": "B",
"K": "pkg_1"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_search_by_keywords(client: TestClient,
packages: List[Package]):
# None of our packages have keywords, so this query should return nothing.
with client as request:
response = request.get("/packages", params={
"SeB": "k",
"K": "testKeyword"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 0
# But now, let's create the keyword for the first package.
package = packages[0]
with db.begin():
db.create(PackageKeyword,
PackageBase=package.PackageBase,
Keyword="testKeyword")
# And request packages with that keyword, we should get 1 result.
with client as request:
response = request.get("/packages", params={
"SeB": "k",
"K": "testKeyword"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_search_by_maintainer(client: TestClient,
maintainer: User,
package: Package):
# We should expect that searching by `package`'s maintainer
# returns `package` in the results.
with client as request:
response = request.get("/packages", params={
"SeB": "m",
"K": maintainer.Username
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
# Search again by maintainer with no keywords given.
# This kind of search returns all orphans instead.
# In this first case, there are no orphan packages; assert that.
with client as request:
response = request.get("/packages", params={"SeB": "m"})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 0
# Orphan `package`.
with db.begin():
package.PackageBase.Maintainer = None
# This time, we should get `package` returned, since it's now an orphan.
with client as request:
response = request.get("/packages", params={"SeB": "m"})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_search_by_comaintainer(client: TestClient,
maintainer: User,
package: Package):
# Nobody's a comaintainer yet.
with client as request:
response = request.get("/packages", params={
"SeB": "c",
"K": maintainer.Username
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 0
# Now, we create a comaintainer.
with db.begin():
db.create(PackageComaintainer,
PackageBase=package.PackageBase,
User=maintainer,
Priority=1)
# Then test that it's returned by our search.
with client as request:
response = request.get("/packages", params={
"SeB": "c",
"K": maintainer.Username
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_search_by_co_or_maintainer(client: TestClient,
maintainer: User,
package: Package):
with client as request:
response = request.get("/packages", params={
"SeB": "M",
"SB": "BLAH", # Invalid SB; gets reset to default "n".
"K": maintainer.Username
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
with db.begin():
user = db.create(User, Username="comaintainer",
Email="comaintainer@example.org",
Passwd="testPassword")
db.create(PackageComaintainer,
PackageBase=package.PackageBase,
User=user,
Priority=1)
with client as request:
response = request.get("/packages", params={
"SeB": "M",
"K": user.Username
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_search_by_submitter(client: TestClient,
maintainer: User,
package: Package):
with client as request:
response = request.get("/packages", params={
"SeB": "s",
"K": maintainer.Username
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_sort_by_votes(client: TestClient,
maintainer: User,
packages: List[Package]):
# Set the first package's NumVotes to 1.
with db.begin():
packages[0].PackageBase.NumVotes = 1
# Test that, by default, the first result is what we just set above.
with client as request:
response = request.get("/packages", params={
"SB": "v" # Votes.
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
votes = rows[0].xpath('./td')[2] # The third column of the first row.
assert votes.text.strip() == "1"
# Now, test that with an ascending order, the last result is
# the one we set, since the default (above) is descending.
with client as request:
response = request.get("/packages", params={
"SB": "v", # Votes.
"SO": "a", # Ascending.
"O": "50" # Second page.
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
votes = rows[-1].xpath('./td')[2] # The third column of the last row.
assert votes.text.strip() == "1"
def test_packages_sort_by_popularity(client: TestClient,
maintainer: User,
packages: List[Package]):
# Set the first package's Popularity to 0.50.
with db.begin():
packages[0].PackageBase.Popularity = "0.50"
# Test that, by default, the first result is what we just set above.
with client as request:
response = request.get("/packages", params={
"SB": "p" # Popularity
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
pop = rows[0].xpath('./td')[3] # The fourth column of the first row.
assert pop.text.strip() == "0.50"
def test_packages_sort_by_voted(client: TestClient,
maintainer: User,
packages: List[Package]):
now = int(datetime.utcnow().timestamp())
with db.begin():
db.create(PackageVote, PackageBase=packages[0].PackageBase,
User=maintainer, VoteTS=now)
# Test that, by default, the first result is what we just set above.
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
with client as request:
response = request.get("/packages", params={
"SB": "w", # Voted
"SO": "d" # Descending, Voted first.
}, cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
voted = rows[0].xpath('./td')[5] # The sixth column of the first row.
assert voted.text.strip() == "Yes"
# Conversely, everything else was not voted on.
voted = rows[1].xpath('./td')[5] # The sixth column of the second row.
assert voted.text.strip() == str() # Empty.
def test_packages_sort_by_notify(client: TestClient,
maintainer: User,
packages: List[Package]):
db.create(PackageNotification,
PackageBase=packages[0].PackageBase,
User=maintainer)
# Test that, by default, the first result is what we just set above.
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
with client as request:
response = request.get("/packages", params={
"SB": "o", # Voted
"SO": "d" # Descending, Voted first.
}, cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
notify = rows[0].xpath('./td')[6] # The sixth column of the first row.
assert notify.text.strip() == "Yes"
# Conversely, everything else was not voted on.
notify = rows[1].xpath('./td')[6] # The sixth column of the second row.
assert notify.text.strip() == str() # Empty.
def test_packages_sort_by_maintainer(client: TestClient,
maintainer: User,
package: Package):
""" Sort a package search by the maintainer column. """
# Create a second package, so the two can be ordered and checked.
with db.begin():
maintainer2 = db.create(User, Username="maintainer2",
Email="maintainer2@example.org",
Passwd="testPassword")
base2 = db.create(PackageBase, Name="pkg_2", Maintainer=maintainer2,
Submitter=maintainer2, Packager=maintainer2)
db.create(Package, Name="pkg_2", PackageBase=base2)
# Check the descending order route.
with client as request:
response = request.get("/packages", params={
"SB": "m",
"SO": "d"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
col = rows[0].xpath('./td')[5].xpath('./a')[0] # Last column.
assert col.text.strip() == maintainer.Username
# On the other hand, with ascending, we should get reverse ordering.
with client as request:
response = request.get("/packages", params={
"SB": "m",
"SO": "a"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
col = rows[0].xpath('./td')[5].xpath('./a')[0] # Last column.
assert col.text.strip() == maintainer2.Username
def test_packages_sort_by_last_modified(client: TestClient,
packages: List[Package]):
now = int(datetime.utcnow().timestamp())
# Set the first package's ModifiedTS to be 1000 seconds before now.
package = packages[0]
with db.begin():
package.PackageBase.ModifiedTS = now - 1000
with client as request:
response = request.get("/packages", params={
"SB": "l",
"SO": "a" # Ascending; oldest modification first.
})
assert response.status_code == int(HTTPStatus.OK)
# We should have 50 (default per page) results.
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 50
# Let's assert that the first item returned was the one we modified above.
row = rows[0]
col = row.xpath('./td')[0].xpath('./a')[0]
assert col.text.strip() == package.Name
def test_packages_flagged(client: TestClient, maintainer: User,
packages: List[Package]):
package = packages[0]
now = int(datetime.utcnow().timestamp())
with db.begin():
package.PackageBase.OutOfDateTS = now
package.PackageBase.Flagger = maintainer
with client as request:
response = request.get("/packages", params={
"outdated": "on"
})
assert response.status_code == int(HTTPStatus.OK)
# We should only get one result from this query; the package we flagged.
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
with client as request:
response = request.get("/packages", params={
"outdated": "off"
})
assert response.status_code == int(HTTPStatus.OK)
# In this case, we should get 54 results, which means that the first
# page will have 50 results (55 packages - 1 outdated = 54 not outdated).
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 50
def test_packages_orphans(client: TestClient, packages: List[Package]):
package = packages[0]
with db.begin():
package.PackageBase.Maintainer = None
with client as request:
response = request.get("/packages", params={"submit": "Orphans"})
assert response.status_code == int(HTTPStatus.OK)
# We only have one orphan. Let's make sure that's what is returned.
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_per_page(client: TestClient, maintainer: User):
""" Test the ability for /packages to deal with the PP query
argument specifications (50, 100, 250; default: 50). """
with db.begin():
for i in range(255):
base = db.create(PackageBase, Name=f"pkg_{i}",
Maintainer=maintainer,
Submitter=maintainer,
Packager=maintainer)
db.create(Package, PackageBase=base, Name=base.Name)
# Test default case, PP of 50.
with client as request:
response = request.get("/packages", params={"PP": 50})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 50
# Alright, test the next case, PP of 100.
with client as request:
response = request.get("/packages", params={"PP": 100})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 100
# And finally, the last case, a PP of 250.
with client as request:
response = request.get("/packages", params={"PP": 250})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 250
def test_pkgbase_voters(client: TestClient, maintainer: User, package: Package):
pkgbase = package.PackageBase
endpoint = f"/pkgbase/{pkgbase.Name}/voters"
now = int(datetime.utcnow().timestamp())
with db.begin():
db.create(PackageVote, User=maintainer, PackageBase=pkgbase,
VoteTS=now)
with client as request:
resp = request.get(endpoint)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
rows = root.xpath('//div[@class="box"]//ul/li')
assert len(rows) == 1
def test_pkgbase_comment_not_found(client: TestClient, maintainer: User,
package: Package):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
comment_id = 12345 # A non-existing comment.
endpoint = f"/pkgbase/{package.PackageBase.Name}/comments/{comment_id}"
with client as request:
resp = request.post(endpoint, data={
"comment": "Failure"
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.NOT_FOUND)
def test_pkgbase_comment_form_unauthorized(client: TestClient, user: User,
maintainer: User, package: Package):
now = int(datetime.utcnow().timestamp())
with db.begin():
comment = db.create(PackageComment, PackageBase=package.PackageBase,
User=maintainer, Comments="Test",
RenderedComment=str(), CommentTS=now)
cookies = {"AURSID": user.login(Request(), "testPassword")}
pkgbasename = package.PackageBase.Name
endpoint = f"/pkgbase/{pkgbasename}/comments/{comment.ID}/form"
with client as request:
resp = request.get(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.UNAUTHORIZED)
def test_pkgbase_comment_form_not_found(client: TestClient, maintainer: User,
package: Package):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
comment_id = 12345 # A non-existing comment.
pkgbasename = package.PackageBase.Name
endpoint = f"/pkgbase/{pkgbasename}/comments/{comment_id}/form"
with client as request:
resp = request.get(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.NOT_FOUND)
def test_pkgbase_comments_missing_comment(client: TestClient, maintainer: User,
package: Package):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{package.PackageBase.Name}/comments"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
def test_pkgbase_comments(client: TestClient, maintainer: User, user: User,
package: Package):
""" This test includes tests against the following routes:
- POST /pkgbase/{name}/comments
- GET /pkgbase/{name} (to check comments)
- Tested against a comment created with the POST route
- GET /pkgbase/{name}/comments/{id}/form
- Tested against a comment created with the POST route
"""
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
pkgbasename = package.PackageBase.Name
endpoint = f"/pkgbase/{pkgbasename}/comments"
with client as request:
resp = request.post(endpoint, data={
"comment": "Test comment.",
"enable_notifications": True
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
expected_prefix = f"/pkgbase/{pkgbasename}"
prefix_len = len(expected_prefix)
assert resp.headers.get("location")[:prefix_len] == expected_prefix
with client as request:
resp = request.get(resp.headers.get("location"))
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
headers = root.xpath('//h4[@class="comment-header"]')
bodies = root.xpath('//div[@class="article-content"]/div/p')
assert len(headers) == 1
assert len(bodies) == 1
assert bodies[0].text.strip() == "Test comment."
comment_id = headers[0].attrib["id"].split("-")[-1]
# Test the non-javascript version of comment editing by
# visiting the /pkgbase/{name}/comments/{id}/edit route.
with client as request:
resp = request.get(f"{endpoint}/{comment_id}/edit", cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
# Clear up the PackageNotification. This doubles as testing
# that the notification was created and clears it up so we can
# test enabling it during edit.
pkgbase = package.PackageBase
db_notif = pkgbase.notifications.filter(
PackageNotification.UserID == maintainer.ID
).first()
with db.begin():
db.delete(db_notif)
# Now, let's edit the comment we just created.
comment_id = int(headers[0].attrib["id"].split("-")[-1])
endpoint = f"/pkgbase/{pkgbasename}/comments/{comment_id}"
with client as request:
resp = request.post(endpoint, data={
"comment": "Edited comment.",
"enable_notifications": True
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
with client as request:
resp = request.get(resp.headers.get("location"))
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
headers = root.xpath('//h4[@class="comment-header"]')
bodies = root.xpath('//div[@class="article-content"]/div/p')
assert len(headers) == 1
assert len(bodies) == 1
assert bodies[0].text.strip() == "Edited comment."
# Ensure that a notification was created.
db_notif = pkgbase.notifications.filter(
PackageNotification.UserID == maintainer.ID
).first()
assert db_notif is not None
# Don't supply a comment; should return BAD_REQUEST.
with client as request:
fail_resp = request.post(endpoint, cookies=cookies)
assert fail_resp.status_code == int(HTTPStatus.BAD_REQUEST)
# Now, test the form route, which should return form markup
# via JSON.
endpoint = f"{endpoint}/form"
with client as request:
resp = request.get(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
data = resp.json()
assert "form" in data
def test_pkgbase_comment_delete(client: TestClient,
maintainer: User,
user: User,
package: Package,
comment: PackageComment):
# Test the unauthorized case of comment deletion.
cookies = {"AURSID": user.login(Request(), "testPassword")}
pkgbasename = package.PackageBase.Name
endpoint = f"/pkgbase/{pkgbasename}/comments/{comment.ID}/delete"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
expected = f"/pkgbase/{pkgbasename}"
assert resp.headers.get("location") == expected
# Test the unauthorized case of comment undeletion.
maint_cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{pkgbasename}/comments/{comment.ID}/undelete"
with client as request:
resp = request.post(endpoint, cookies=maint_cookies)
assert resp.status_code == int(HTTPStatus.UNAUTHORIZED)
# And move on to undeleting it.
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
def test_pkgbase_comment_delete_unauthorized(client: TestClient,
maintainer: User,
package: Package,
comment: PackageComment):
# Test the unauthorized case of comment deletion.
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
pkgbasename = package.PackageBase.Name
endpoint = f"/pkgbase/{pkgbasename}/comments/{comment.ID}/delete"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.UNAUTHORIZED)
def test_pkgbase_comment_delete_not_found(client: TestClient,
maintainer: User,
package: Package):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
comment_id = 12345 # Non-existing comment.
pkgbasename = package.PackageBase.Name
endpoint = f"/pkgbase/{pkgbasename}/comments/{comment_id}/delete"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.NOT_FOUND)
def test_pkgbase_comment_undelete_not_found(client: TestClient,
maintainer: User,
package: Package):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
comment_id = 12345 # Non-existing comment.
pkgbasename = package.PackageBase.Name
endpoint = f"/pkgbase/{pkgbasename}/comments/{comment_id}/undelete"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.NOT_FOUND)
def test_pkgbase_comment_pin(client: TestClient,
maintainer: User,
package: Package,
comment: PackageComment):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
comment_id = comment.ID
pkgbasename = package.PackageBase.Name
# Pin the comment.
endpoint = f"/pkgbase/{pkgbasename}/comments/{comment_id}/pin"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
# Assert that PinnedTS got set.
assert comment.PinnedTS > 0
# Unpin the comment we just pinned.
endpoint = f"/pkgbase/{pkgbasename}/comments/{comment_id}/unpin"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
# Let's assert that PinnedTS was unset.
assert comment.PinnedTS == 0
def test_pkgbase_comment_pin_unauthorized(client: TestClient,
user: User,
package: Package,
comment: PackageComment):
cookies = {"AURSID": user.login(Request(), "testPassword")}
comment_id = comment.ID
pkgbasename = package.PackageBase.Name
endpoint = f"/pkgbase/{pkgbasename}/comments/{comment_id}/pin"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.UNAUTHORIZED)
def test_pkgbase_comment_unpin_unauthorized(client: TestClient,
user: User,
package: Package,
comment: PackageComment):
cookies = {"AURSID": user.login(Request(), "testPassword")}
comment_id = comment.ID
pkgbasename = package.PackageBase.Name
endpoint = f"/pkgbase/{pkgbasename}/comments/{comment_id}/unpin"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.UNAUTHORIZED)
def test_pkgbase_comaintainers_not_found(client: TestClient, maintainer: User):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
endpoint = "/pkgbase/fake/comaintainers"
with client as request:
resp = request.get(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.NOT_FOUND)
def test_pkgbase_comaintainers_post_not_found(client: TestClient,
maintainer: User):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
endpoint = "/pkgbase/fake/comaintainers"
with client as request:
resp = request.post(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.NOT_FOUND)
def test_pkgbase_comaintainers_unauthorized(client: TestClient, user: User,
package: Package):
pkgbase = package.PackageBase
endpoint = f"/pkgbase/{pkgbase.Name}/comaintainers"
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.get(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
def test_pkgbase_comaintainers_post_unauthorized(client: TestClient,
user: User,
package: Package):
pkgbase = package.PackageBase
endpoint = f"/pkgbase/{pkgbase.Name}/comaintainers"
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
def test_pkgbase_comaintainers_post_invalid_user(client: TestClient,
maintainer: User,
package: Package):
pkgbase = package.PackageBase
endpoint = f"/pkgbase/{pkgbase.Name}/comaintainers"
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
with client as request:
resp = request.post(endpoint, data={
"users": "\nfake\n"
}, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
error = root.xpath('//ul[@class="errorlist"]/li')[0]
assert error.text.strip() == "Invalid user name: fake"
def test_pkgbase_comaintainers(client: TestClient, user: User,
maintainer: User, package: Package):
pkgbase = package.PackageBase
endpoint = f"/pkgbase/{pkgbase.Name}/comaintainers"
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
# Start off by adding user as a comaintainer to package.
# The maintainer username given should be ignored.
with client as request:
resp = request.post(endpoint, data={
"users": f"\n{user.Username}\n{maintainer.Username}\n"
}, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
# Do it again to exercise the last_priority bump path.
with client as request:
resp = request.post(endpoint, data={
"users": f"\n{user.Username}\n{maintainer.Username}\n"
}, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
# Now that we've added a comaintainer to the pkgbase,
# let's perform a GET request to make sure that the backend produces
# the user we added in the users textarea.
with client as request:
resp = request.get(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
users = root.xpath('//textarea[@id="id_users"]')[0]
assert users.text.strip() == user.Username
# Finish off by removing all the comaintainers.
with client as request:
resp = request.post(endpoint, data={
"users": str()
}, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
with client as request:
resp = request.get(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
users = root.xpath('//textarea[@id="id_users"]')[0]
assert users is not None and users.text is None
def test_requests_unauthorized(client: TestClient):
with client as request:
resp = request.get("/requests", allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
def test_requests(client: TestClient,
maintainer: User,
tu_user: User,
packages: List[Package],
requests: List[PackageRequest]):
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
with client as request:
resp = request.get("/requests", params={
# Pass in url query parameters O, SeB and SB to exercise
# their paths inside of the pager_nav used in this request.
"O": 0, # Page 1
"SeB": "nd",
"SB": "n"
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
assert "Next " in resp.text
assert "Last »" in resp.text
root = parse_root(resp.text)
# We have 55 requests, our defaults.PP is 50, so expect we have 50 rows.
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == defaults.PP
# Request page 2 of the requests page.
with client as request:
resp = request.get("/requests", params={
"O": 50 # Page 2
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
assert " Previous" in resp.text
assert "« First" in resp.text
root = parse_root(resp.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 5 # There are five records left on the second page.
def test_requests_selfmade(client: TestClient, user: User,
requests: List[PackageRequest]):
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.get("/requests", cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
# As the user who creates all of the requests, we should see all of them.
# However, we are not allowed to accept any of them ourselves.
root = parse_root(resp.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == defaults.PP
# Our first and only link in the last row should be "Close".
for row in rows:
last_row = row.xpath('./td')[-1].xpath('./a')[0]
assert last_row.text.strip() == "Close"
def test_pkgbase_request_not_found(client: TestClient, user: User):
pkgbase_name = "fake"
endpoint = f"/pkgbase/{pkgbase_name}/request"
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.get(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.NOT_FOUND)
def test_pkgbase_request(client: TestClient, user: User, package: Package):
pkgbase = package.PackageBase
endpoint = f"/pkgbase/{pkgbase.Name}/request"
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.get(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
def test_pkgbase_request_post_not_found(client: TestClient, user: User):
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post("/pkgbase/fake/request", data={
"type": "fake"
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.NOT_FOUND)
def test_pkgbase_request_post_invalid_type(client: TestClient,
user: User,
package: Package):
endpoint = f"/pkgbase/{package.PackageBase.Name}/request"
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post(endpoint, data={"type": "fake"}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
def test_pkgbase_request_post_no_comment_error(client: TestClient,
user: User,
package: Package):
endpoint = f"/pkgbase/{package.PackageBase.Name}/request"
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post(endpoint, data={
"type": "deletion",
"comments": "" # An empty comment field causes an error.
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
error = root.xpath('//ul[@class="errorlist"]/li')[0]
expected = "The comment field must not be empty."
assert error.text.strip() == expected
def test_pkgbase_request_post_merge_not_found_error(client: TestClient,
user: User,
package: Package):
endpoint = f"/pkgbase/{package.PackageBase.Name}/request"
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post(endpoint, data={
"type": "merge",
"merge_into": "fake", # There is no PackageBase.Name "fake"
"comments": "We want to merge this."
}, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
error = root.xpath('//ul[@class="errorlist"]/li')[0]
expected = "The package base you want to merge into does not exist."
assert error.text.strip() == expected
def test_pkgbase_request_post_merge_no_merge_into_error(client: TestClient,
user: User,
package: Package):
endpoint = f"/pkgbase/{package.PackageBase.Name}/request"
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post(endpoint, data={
"type": "merge",
"merge_into": "", # There is no PackageBase.Name "fake"
"comments": "We want to merge this."
}, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
error = root.xpath('//ul[@class="errorlist"]/li')[0]
expected = 'The "Merge into" field must not be empty.'
assert error.text.strip() == expected
def test_pkgbase_request_post_merge_self_error(client: TestClient, user: User,
package: Package):
endpoint = f"/pkgbase/{package.PackageBase.Name}/request"
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post(endpoint, data={
"type": "merge",
"merge_into": package.PackageBase.Name,
"comments": "We want to merge this."
}, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
error = root.xpath('//ul[@class="errorlist"]/li')[0]
expected = "You cannot merge a package base into itself."
assert error.text.strip() == expected
def test_requests_close(client: TestClient, user: User,
pkgreq: PackageRequest):
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.get(f"/requests/{pkgreq.ID}/close", cookies=cookies,
allow_redirects=False)
assert resp.status_code == int(HTTPStatus.OK)
def test_requests_close_unauthorized(client: TestClient, maintainer: User,
pkgreq: PackageRequest):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
with client as request:
resp = request.get(f"/requests/{pkgreq.ID}/close", cookies=cookies,
allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == "/"
def test_requests_close_post_unauthorized(client: TestClient, maintainer: User,
pkgreq: PackageRequest):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
with client as request:
resp = request.post(f"/requests/{pkgreq.ID}/close", data={
"reason": ACCEPTED_ID
}, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == "/"
def test_requests_close_post(client: TestClient, user: User,
pkgreq: PackageRequest):
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post(f"/requests/{pkgreq.ID}/close",
cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert pkgreq.Status == REJECTED_ID
assert pkgreq.Closer == user
assert pkgreq.ClosureComment == str()
def test_requests_close_post_rejected(client: TestClient, user: User,
pkgreq: PackageRequest):
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post(f"/requests/{pkgreq.ID}/close",
cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert pkgreq.Status == REJECTED_ID
assert pkgreq.Closer == user
assert pkgreq.ClosureComment == str()
def test_pkgbase_flag(client: TestClient, user: User, maintainer: User,
package: Package):
pkgbase = package.PackageBase
# We shouldn't have flagged the package yet; assert so.
assert pkgbase.Flagger is None
cookies = {"AURSID": user.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{pkgbase.Name}/flag"
# Get the flag page.
with client as request:
resp = request.get(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
# Now, let's check the /pkgbase/{name}/flag-comment route.
flag_comment_endpoint = f"/pkgbase/{pkgbase.Name}/flag-comment"
with client as request:
resp = request.get(flag_comment_endpoint, cookies=cookies,
allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
# Try to flag it without a comment.
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
# Flag it with a valid comment.
with client as request:
resp = request.post(endpoint, data={
"comments": "Test"
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert pkgbase.Flagger == user
assert pkgbase.FlaggerComment == "Test"
# Now, let's check the /pkgbase/{name}/flag-comment route.
flag_comment_endpoint = f"/pkgbase/{pkgbase.Name}/flag-comment"
with client as request:
resp = request.get(flag_comment_endpoint, cookies=cookies,
allow_redirects=False)
assert resp.status_code == int(HTTPStatus.OK)
# Now try to perform a get; we should be redirected because
# it's already flagged.
with client as request:
resp = request.get(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
with db.begin():
user2 = db.create(User, Username="test2",
Email="test2@example.org",
Passwd="testPassword",
AccountType=user.AccountType)
# Now, test that the 'user2' user can't unflag it, because they
# didn't flag it to begin with.
user2_cookies = {"AURSID": user2.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{pkgbase.Name}/unflag"
with client as request:
resp = request.post(endpoint, cookies=user2_cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert pkgbase.Flagger == user
# Now, test that the 'maintainer' user can.
maint_cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
with client as request:
resp = request.post(endpoint, cookies=maint_cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert pkgbase.Flagger is None
# Flag it again.
with client as request:
resp = request.post(f"/pkgbase/{pkgbase.Name}/flag", data={
"comments": "Test"
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
# Now, unflag it for real.
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert pkgbase.Flagger is None
def test_pkgbase_flag_vcs(client: TestClient, user: User, package: Package):
# Morph our package fixture into a VCS package (-git).
with db.begin():
package.PackageBase.Name += "-git"
package.Name += "-git"
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.get(f"/pkgbase/{package.PackageBase.Name}/flag",
cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
expected = ("This seems to be a VCS package. Please do "
"<strong>not</strong> flag it out-of-date if the package "
"version in the AUR does not match the most recent commit. "
"Flagging this package should only be done if the sources "
"moved or changes in the PKGBUILD are required because of "
"recent upstream changes.")
assert expected in resp.text
def test_pkgbase_notify(client: TestClient, user: User, package: Package):
pkgbase = package.PackageBase
# We have no notif record yet; assert that.
notif = pkgbase.notifications.filter(
PackageNotification.UserID == user.ID
).first()
assert notif is None
# Enable notifications.
cookies = {"AURSID": user.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{pkgbase.Name}/notify"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
notif = pkgbase.notifications.filter(
PackageNotification.UserID == user.ID
).first()
assert notif is not None
# Disable notifications.
endpoint = f"/pkgbase/{pkgbase.Name}/unnotify"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
notif = pkgbase.notifications.filter(
PackageNotification.UserID == user.ID
).first()
assert notif is None
def test_pkgbase_vote(client: TestClient, user: User, package: Package):
pkgbase = package.PackageBase
# We haven't voted yet.
vote = pkgbase.package_votes.filter(PackageVote.UsersID == user.ID).first()
assert vote is None
# Vote for the package.
cookies = {"AURSID": user.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{pkgbase.Name}/vote"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
vote = pkgbase.package_votes.filter(PackageVote.UsersID == user.ID).first()
assert vote is not None
assert pkgbase.NumVotes == 1
# Remove vote.
endpoint = f"/pkgbase/{pkgbase.Name}/unvote"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
vote = pkgbase.package_votes.filter(PackageVote.UsersID == user.ID).first()
assert vote is None
assert pkgbase.NumVotes == 0
def test_pkgbase_disown_as_sole_maintainer(client: TestClient,
maintainer: User,
package: Package):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
pkgbase = package.PackageBase
endpoint = f"/pkgbase/{pkgbase.Name}/disown"
# But we do here.
with client as request:
resp = request.post(endpoint, data={"confirm": True}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
def test_pkgbase_disown(client: TestClient, user: User, maintainer: User,
package: Package):
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
user_cookies = {"AURSID": user.login(Request(), "testPassword")}
pkgbase = package.PackageBase
endpoint = f"/pkgbase/{pkgbase.Name}/disown"
with db.begin():
db.create(PackageComaintainer,
User=user,
PackageBase=pkgbase,
Priority=1)
# GET as a normal user, which is rejected for lack of credentials.
with client as request:
resp = request.get(endpoint, cookies=user_cookies,
allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
# GET as the maintainer.
with client as request:
resp = request.get(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
# POST as a normal user, which is rejected for lack of credentials.
with client as request:
resp = request.post(endpoint, cookies=user_cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
# POST as the maintainer without "confirm".
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
# POST as the maintainer with "confirm".
with client as request:
resp = request.post(endpoint, data={"confirm": True}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
def test_pkgbase_adopt(client: TestClient, user: User, tu_user: User,
maintainer: User, package: Package):
# Unset the maintainer as if package is orphaned.
with db.begin():
package.PackageBase.Maintainer = None
pkgbasename = package.PackageBase.Name
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{pkgbasename}/adopt"
# Adopt the package base.
with client as request:
resp = request.post(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert package.PackageBase.Maintainer == maintainer
# Try to adopt it when it already has a maintainer; nothing changes.
user_cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post(endpoint, cookies=user_cookies,
allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert package.PackageBase.Maintainer == maintainer
# Steal the package as a TU.
tu_cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
with client as request:
resp = request.post(endpoint, cookies=tu_cookies,
allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert package.PackageBase.Maintainer == tu_user
def test_pkgbase_delete_unauthorized(client: TestClient, user: User,
package: Package):
pkgbase = package.PackageBase
cookies = {"AURSID": user.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{pkgbase.Name}/delete"
# Test GET.
with client as request:
resp = request.get(endpoint, cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
# Test POST.
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
def test_pkgbase_delete(client: TestClient, tu_user: User, package: Package):
pkgbase = package.PackageBase
# Test that the GET request works.
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{pkgbase.Name}/delete"
with client as request:
resp = request.get(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
# Test that POST works and denies us because we haven't confirmed.
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
# Test that we can actually delete the pkgbase.
with client as request:
resp = request.post(endpoint, data={"confirm": True}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
# Let's assert that the package base record got removed.
record = db.query(PackageBase).filter(
PackageBase.Name == pkgbase.Name
).first()
assert record is None
# Two emails should've been sent out; an autogenerated
# request's accepted notification and a deletion notification.
assert Email.count() == 1
req_close = Email(1).parse()
expr = r"^\[PRQ#\d+\] Deletion Request for [^ ]+ Accepted$"
subject = req_close.headers.get("Subject")
assert re.match(expr, subject)
def test_pkgbase_delete_with_request(client: TestClient, tu_user: User,
pkgbase: PackageBase,
pkgreq: PackageRequest):
# TODO: Test that a previously existing request gets Accepted when
# a TU deleted the package.
# Delete the package as `tu_user` via POST request.
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{pkgbase.Name}/delete"
with client as request:
resp = request.post(endpoint, data={"confirm": True}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location") == "/packages"
# We should've just sent one closure email since `pkgreq` exists.
assert Email.count() == 1
# Make sure it was a closure for the deletion request.
email = Email(1).parse()
expr = r"^\[PRQ#\d+\] Deletion Request for [^ ]+ Accepted$"
assert re.match(expr, email.headers.get("Subject"))
def test_packages_post_unknown_action(client: TestClient, user: User,
package: Package):
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post("/packages", data={"action": "unknown"},
cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
def test_packages_post_error(client: TestClient, user: User, package: Package):
async def stub_action(request: Request, **kwargs):
return (False, ["Some error."])
actions = {"stub": stub_action}
with mock.patch.dict("aurweb.routers.packages.PACKAGE_ACTIONS", actions):
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post("/packages", data={"action": "stub"},
cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "Some error."
assert errors[0].text.strip() == expected
def test_packages_post(client: TestClient, user: User, package: Package):
async def stub_action(request: Request, **kwargs):
return (True, ["Some success."])
actions = {"stub": stub_action}
with mock.patch.dict("aurweb.routers.packages.PACKAGE_ACTIONS", actions):
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post("/packages", data={"action": "stub"},
cookies=cookies, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.OK)
errors = get_successes(resp.text)
expected = "Some success."
assert errors[0].text.strip() == expected
def test_pkgbase_merge_unauthorized(client: TestClient, user: User,
package: Package):
cookies = {"AURSID": user.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{package.PackageBase.Name}/merge"
with client as request:
resp = request.get(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.UNAUTHORIZED)
def test_pkgbase_merge(client: TestClient, tu_user: User, package: Package):
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{package.PackageBase.Name}/merge"
with client as request:
resp = request.get(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
assert not get_errors(resp.text)
def test_packages_post_unflag(client: TestClient, user: User,
maintainer: User, package: Package):
# Flag `package` as `user`.
now = int(datetime.utcnow().timestamp())
with db.begin():
package.PackageBase.Flagger = user
package.PackageBase.OutOfDateTS = now
cookies = {"AURSID": user.login(Request(), "testPassword")}
# Don't supply any packages.
post_data = {"action": "unflag", "IDs": []}
with client as request:
resp = request.post("/packages", data=post_data, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "You did not select any packages to unflag."
assert errors[0].text.strip() == expected
# Unflag the package as `user`.
post_data = {"action": "unflag", "IDs": [package.ID]}
with client as request:
resp = request.post("/packages", data=post_data, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
assert package.PackageBase.Flagger is None
successes = get_successes(resp.text)
expected = "The selected packages have been unflagged."
assert successes[0].text.strip() == expected
# Re-flag `package` as `user`.
now = int(datetime.utcnow().timestamp())
with db.begin():
package.PackageBase.Flagger = user
package.PackageBase.OutOfDateTS = now
# Try to unflag the package as `maintainer`, which is not allowed.
maint_cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
post_data = {"action": "unflag", "IDs": [package.ID]}
with client as request:
resp = request.post("/packages", data=post_data, cookies=maint_cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "You did not select any packages to unflag."
assert errors[0].text.strip() == expected
def test_packages_post_notify(client: TestClient, user: User, package: Package):
notif = package.PackageBase.notifications.filter(
PackageNotification.UserID == user.ID
).first()
assert notif is None
# Try to enable notifications but supply no packages, causing
# an error to be rendered.
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post("/packages", data={"action": "notify"},
cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "You did not select any packages to be notified about."
assert errors[0].text.strip() == expected
# Now let's actually enable notifications on `package`.
with client as request:
resp = request.post("/packages", data={
"action": "notify",
"IDs": [package.ID]
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
expected = "The selected packages' notifications have been enabled."
successes = get_successes(resp.text)
assert successes[0].text.strip() == expected
# Try to enable notifications when they're already enabled,
# causing an error to be rendered.
with client as request:
resp = request.post("/packages", data={
"action": "notify",
"IDs": [package.ID]
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "You did not select any packages to be notified about."
assert errors[0].text.strip() == expected
def test_packages_post_unnotify(client: TestClient, user: User,
package: Package):
# Create a notification record.
with db.begin():
notif = db.create(PackageNotification,
PackageBase=package.PackageBase,
User=user)
assert notif is not None
# Request removal of the notification without any IDs.
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post("/packages", data={
"action": "unnotify"
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "You did not select any packages for notification removal."
assert errors[0].text.strip() == expected
# Request removal of the notification; really.
with client as request:
resp = request.post("/packages", data={
"action": "unnotify",
"IDs": [package.ID]
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
successes = get_successes(resp.text)
expected = "The selected packages' notifications have been removed."
assert successes[0].text.strip() == expected
# Let's ensure the record got removed.
notif = package.PackageBase.notifications.filter(
PackageNotification.UserID == user.ID
).first()
assert notif is None
# Try it again. The notif no longer exists.
with client as request:
resp = request.post("/packages", data={
"action": "unnotify",
"IDs": [package.ID]
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "A package you selected does not have notifications enabled."
assert errors[0].text.strip() == expected
def test_packages_post_adopt(client: TestClient, user: User,
package: Package):
# Try to adopt an empty list of packages.
cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post("/packages", data={
"action": "adopt"
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "You did not select any packages to adopt."
assert errors[0].text.strip() == expected
# Now, let's try to adopt a package that's already maintained.
with client as request:
resp = request.post("/packages", data={
"action": "adopt",
"IDs": [package.ID],
"confirm": True
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "You are not allowed to adopt one of the packages you selected."
assert errors[0].text.strip() == expected
# Remove the maintainer from the DB.
with db.begin():
package.PackageBase.Maintainer = None
assert package.PackageBase.Maintainer is None
# Now, let's try to adopt without confirming.
with client as request:
resp = request.post("/packages", data={
"action": "adopt",
"IDs": [package.ID]
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = ("The selected packages have not been adopted, "
"check the confirmation checkbox.")
assert errors[0].text.strip() == expected
# Let's do it again now that there is no maintainer.
with client as request:
resp = request.post("/packages", data={
"action": "adopt",
"IDs": [package.ID],
"confirm": True
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
successes = get_successes(resp.text)
expected = "The selected packages have been adopted."
assert successes[0].text.strip() == expected
def test_packages_post_disown_as_maintainer(client: TestClient, user: User,
maintainer: User,
package: Package):
""" Disown packages as a maintainer. """
# Initially prove that we have a maintainer.
assert package.PackageBase.Maintainer is not None
assert package.PackageBase.Maintainer == maintainer
# Try to run the disown action with no IDs; get an error.
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
with client as request:
resp = request.post("/packages", data={
"action": "disown"
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "You did not select any packages to disown."
assert errors[0].text.strip() == expected
assert package.PackageBase.Maintainer is not None
# Try to disown `package` without giving the confirm argument.
with client as request:
resp = request.post("/packages", data={
"action": "disown",
"IDs": [package.ID]
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
assert package.PackageBase.Maintainer is not None
errors = get_errors(resp.text)
expected = ("The selected packages have not been disowned, "
"check the confirmation checkbox.")
assert errors[0].text.strip() == expected
# Now, try to disown `package` without credentials (as `user`).
user_cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post("/packages", data={
"action": "disown",
"IDs": [package.ID],
"confirm": True
}, cookies=user_cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
assert package.PackageBase.Maintainer is not None
errors = get_errors(resp.text)
expected = "You are not allowed to disown one of the packages you selected."
assert errors[0].text.strip() == expected
# Now, let's really disown `package` as `maintainer`.
with client as request:
resp = request.post("/packages", data={
"action": "disown",
"IDs": [package.ID],
"confirm": True
}, cookies=cookies)
assert package.PackageBase.Maintainer is None
successes = get_successes(resp.text)
expected = "The selected packages have been disowned."
assert successes[0].text.strip() == expected
def test_packages_post_disown(client: TestClient, tu_user: User,
maintainer: User, package: Package):
""" Disown packages as a Trusted User, which cannot bypass idle time. """
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
with client as request:
resp = request.post("/packages", data={
"action": "disown",
"IDs": [package.ID],
"confirm": True
}, cookies=cookies)
errors = get_errors(resp.text)
expected = r"^No due existing orphan requests to accept for .+\.$"
assert re.match(expected, errors[0].text.strip())
def test_packages_post_delete(caplog: pytest.fixture, client: TestClient,
user: User, tu_user: User, package: Package):
# First, let's try to use the delete action with no packages IDs.
user_cookies = {"AURSID": user.login(Request(), "testPassword")}
with client as request:
resp = request.post("/packages", data={
"action": "delete"
}, cookies=user_cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "You did not select any packages to delete."
assert errors[0].text.strip() == expected
# Now, let's try to delete real packages without supplying "confirm".
with client as request:
resp = request.post("/packages", data={
"action": "delete",
"IDs": [package.ID]
}, cookies=user_cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = ("The selected packages have not been deleted, "
"check the confirmation checkbox.")
assert errors[0].text.strip() == expected
# And again, with everything, but `user` doesn't have permissions.
with client as request:
resp = request.post("/packages", data={
"action": "delete",
"IDs": [package.ID],
"confirm": True
}, cookies=user_cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "You do not have permission to delete packages."
assert errors[0].text.strip() == expected
# Now, let's switch over to making the requests as a TU.
# However, this next request will be rejected due to supplying
# an invalid package ID.
tu_cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
with client as request:
resp = request.post("/packages", data={
"action": "delete",
"IDs": [0],
"confirm": True
}, cookies=tu_cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "One of the packages you selected does not exist."
assert errors[0].text.strip() == expected
# Whoo. Now, let's finally make a valid request as `tu_user`
# to delete `package`.
with client as request:
resp = request.post("/packages", data={
"action": "delete",
"IDs": [package.ID],
"confirm": True
}, cookies=tu_cookies)
assert resp.status_code == int(HTTPStatus.OK)
successes = get_successes(resp.text)
expected = "The selected packages have been deleted."
assert successes[0].text.strip() == expected
# Expect that the package deletion was logged.
packages = [package.Name]
expected = (f"Privileged user '{tu_user.Username}' deleted the "
f"following packages: {str(packages)}.")
assert expected in caplog.text
def test_pkgbase_merge_post_unauthorized(client: TestClient, user: User,
package: Package):
cookies = {"AURSID": user.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{package.PackageBase.Name}/merge"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.UNAUTHORIZED)
def test_pkgbase_merge_post_unconfirmed(client: TestClient, tu_user: User,
package: Package):
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{package.PackageBase.Name}/merge"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = ("The selected packages have not been deleted, "
"check the confirmation checkbox.")
assert errors[0].text.strip() == expected
def test_pkgbase_merge_post_invalid_into(client: TestClient, tu_user: User,
package: Package):
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{package.PackageBase.Name}/merge"
with client as request:
resp = request.post(endpoint, data={
"into": "not_real",
"confirm": True
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "Cannot find package to merge votes and comments into."
assert errors[0].text.strip() == expected
def test_pkgbase_merge_post_self_invalid(client: TestClient, tu_user: User,
package: Package):
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{package.PackageBase.Name}/merge"
with client as request:
resp = request.post(endpoint, data={
"into": package.PackageBase.Name,
"confirm": True
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
errors = get_errors(resp.text)
expected = "Cannot merge a package base with itself."
assert errors[0].text.strip() == expected
def test_pkgbase_merge_post(client: TestClient, tu_user: User,
package: Package,
pkgbase: PackageBase,
target: PackageBase,
pkgreq: PackageRequest):
pkgname = package.Name
pkgbasename = pkgbase.Name
# Create a merge request destined for another target.
# This will allow our test code to exercise closing
# such a request after merging the pkgbase in question.
with db.begin():
pkgreq.ReqTypeID = MERGE_ID
pkgreq.MergeBaseName = target.Name
# Vote for the package.
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
endpoint = f"/pkgbase/{package.PackageBase.Name}/vote"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
# Enable notifications.
endpoint = f"/pkgbase/{package.PackageBase.Name}/notify"
with client as request:
resp = request.post(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
# Comment on the package.
endpoint = f"/pkgbase/{package.PackageBase.Name}/comments"
with client as request:
resp = request.post(endpoint, data={
"comment": "Test comment."
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
# Save these relationships for later comparison.
comments = package.PackageBase.comments.all()
notifs = package.PackageBase.notifications.all()
votes = package.PackageBase.package_votes.all()
# Merge the package into target.
endpoint = f"/pkgbase/{package.PackageBase.Name}/merge"
with client as request:
resp = request.post(endpoint, data={
"into": target.Name,
"confirm": True
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
loc = resp.headers.get("location")
assert loc == f"/pkgbase/{target.Name}"
# Two emails should've been sent out.
assert Email.count() == 1
email_body = Email(1).parse().glue()
assert f"Merge Request for {pkgbasename} Accepted" in email_body
# Assert that the original comments, notifs and votes we setup
# got migrated to target as intended.
assert comments == target.comments.all()
assert notifs == target.notifications.all()
assert votes == target.package_votes.all()
# ...and that the package got deleted.
package = db.query(Package).filter(Package.Name == pkgname).first()
assert package is None
# Our previously-made request should have gotten accepted.
assert pkgreq.Status == ACCEPTED_ID
assert pkgreq.Closer is not None
# A PackageRequest is always created when merging this way.
pkgreq = db.query(PackageRequest).filter(
and_(PackageRequest.ReqTypeID == MERGE_ID,
PackageRequest.PackageBaseName == pkgbasename,
PackageRequest.MergeBaseName == target.Name)
).first()
assert pkgreq is not None
def test_account_comments_unauthorized(client: TestClient, user: User):
""" This test may seem out of place, but it requires packages,
so its being included in the packages routes test suite to
leverage existing fixtures. """
endpoint = f"/account/{user.Username}/comments"
with client as request:
resp = request.get(endpoint, allow_redirects=False)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
assert resp.headers.get("location").startswith("/login")
def test_account_comments(client: TestClient, user: User, package: Package):
""" This test may seem out of place, but it requires packages,
so its being included in the packages routes test suite to
leverage existing fixtures. """
now = (datetime.utcnow().timestamp())
with db.begin():
# This comment's CommentTS is `now + 1`, so it is found in rendered
# HTML before the rendered_comment, which has a CommentTS of `now`.
comment = db.create(PackageComment,
PackageBase=package.PackageBase,
User=user, Comments="Test comment",
CommentTS=now + 1)
rendered_comment = db.create(PackageComment,
PackageBase=package.PackageBase,
User=user, Comments="Test comment",
RenderedComment="<p>Test comment</p>",
CommentTS=now)
cookies = {"AURSID": user.login(Request(), "testPassword")}
endpoint = f"/account/{user.Username}/comments"
with client as request:
resp = request.get(endpoint, cookies=cookies)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
comments = root.xpath('//div[@class="article-content"]/div')
# Assert that we got Comments rendered from the first comment.
assert comments[0].text.strip() == comment.Comments
# And from the second, we have rendered content.
rendered = comments[1].xpath('./p')
expected = rendered_comment.RenderedComment.replace(
"<p>", "").replace("</p>", "")
assert rendered[0].text.strip() == expected
def test_pkgbase_keywords(client: TestClient, user: User, package: Package):
endpoint = f"/pkgbase/{package.PackageBase.Name}"
with client as request:
resp = request.get(endpoint)
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
keywords = root.xpath('//a[@class="keyword"]')
assert len(keywords) == 0
cookies = {"AURSID": user.login(Request(), "testPassword")}
post_endpoint = f"{endpoint}/keywords"
with client as request:
resp = request.post(post_endpoint, data={
"keywords": "abc test"
}, cookies=cookies)
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
with client as request:
resp = request.get(resp.headers.get("location"))
assert resp.status_code == int(HTTPStatus.OK)
root = parse_root(resp.text)
keywords = root.xpath('//a[@class="keyword"]')
assert len(keywords) == 2
expected = ["abc", "test"]
for i, keyword in enumerate(keywords):
assert keyword.text.strip() == expected[i]