Merge branch 'pu_packages' into pu

This commit is contained in:
Kevin Morris 2021-09-21 13:42:52 -07:00
commit 836af2d588
No known key found for this signature in database
GPG key ID: F7E46DED420788F3
19 changed files with 1230 additions and 32 deletions

View file

@ -53,10 +53,30 @@ class AnonymousUser:
def is_authenticated():
return False
@staticmethod
def is_trusted_user():
return False
@staticmethod
def is_developer():
return False
@staticmethod
def is_elevated():
return False
@staticmethod
def has_credential(credential):
return False
@staticmethod
def voted_for(package):
return False
@staticmethod
def notified(package):
return False
class BasicAuthBackend(AuthenticationBackend):
async def authenticate(self, conn: HTTPConnection):

50
aurweb/filters.py Normal file
View file

@ -0,0 +1,50 @@
from typing import Any, Dict
import paginate
from jinja2 import pass_context
from aurweb import util
from aurweb.templates import register_filter
@register_filter("pager_nav")
@pass_context
def pager_nav(context: Dict[str, Any],
page: int, total: int, prefix: str) -> str:
page = int(page) # Make sure this is an int.
pp = context.get("PP", 50)
# Setup a local query string dict, optionally passed by caller.
q = context.get("q", dict())
search_by = context.get("SeB", None)
if search_by:
q["SeB"] = search_by
sort_by = context.get("SB", None)
if sort_by:
q["SB"] = sort_by
def create_url(page: int):
nonlocal q
offset = max(page * pp - pp, 0)
qs = util.to_qs(util.extend_query(q, ["O", offset]))
return f"{prefix}?{qs}"
# Use the paginate module to produce our linkage.
pager = paginate.Page([], page=page + 1,
items_per_page=pp,
item_count=total,
url_maker=create_url)
return pager.pager(
link_attr={"class": "page"},
curpage_attr={"class": "page"},
separator="&nbsp",
format="$link_first $link_previous ~5~ $link_next $link_last",
symbol_first="« First",
symbol_previous=" Previous",
symbol_next="Next ",
symbol_last="Last »")

View file

@ -165,6 +165,15 @@ class User(Base):
aurweb.models.account_type.TRUSTED_USER_AND_DEV_ID
}
def is_elevated(self):
""" A User is 'elevated' when they have either a
Trusted User or Developer AccountType. """
return self.AccountType.ID in {
aurweb.models.account_type.TRUSTED_USER_ID,
aurweb.models.account_type.DEVELOPER_ID,
aurweb.models.account_type.TRUSTED_USER_AND_DEV_ID,
}
def can_edit_user(self, user):
""" Can this account record edit the target user? It must either
be the target user or a user with enough permissions to do so.

195
aurweb/packages/search.py Normal file
View file

@ -0,0 +1,195 @@
from sqlalchemy import and_, case, or_, orm
from aurweb import config, db
from aurweb.models.package import Package
from aurweb.models.package_base import PackageBase
from aurweb.models.package_comaintainer import PackageComaintainer
from aurweb.models.package_keyword import PackageKeyword
from aurweb.models.package_notification import PackageNotification
from aurweb.models.package_vote import PackageVote
from aurweb.models.user import User
DEFAULT_MAX_RESULTS = 2500
class PackageSearch:
""" A Package search query builder. """
# A constant mapping of short to full name sort orderings.
FULL_SORT_ORDER = {"d": "desc", "a": "asc"}
def __init__(self, user: User):
""" Construct an instance of PackageSearch.
This constructors performs several steps during initialization:
1. Setup self.query: an ORM query of Package joined by PackageBase.
"""
self.user = user
self.query = db.query(Package).join(PackageBase).join(
PackageVote,
and_(PackageVote.PackageBaseID == PackageBase.ID,
PackageVote.UsersID == self.user.ID),
isouter=True
).join(
PackageNotification,
and_(PackageNotification.PackageBaseID == PackageBase.ID,
PackageNotification.UserID == self.user.ID),
isouter=True
)
self.ordering = "d"
# Setup SeB (Search By) callbacks.
self.search_by_cb = {
"nd": self._search_by_namedesc,
"n": self._search_by_name,
"b": self._search_by_pkgbase,
"N": self._search_by_exact_name,
"B": self._search_by_exact_pkgbase,
"k": self._search_by_keywords,
"m": self._search_by_maintainer,
"c": self._search_by_comaintainer,
"M": self._search_by_co_or_maintainer,
"s": self._search_by_submitter
}
# Setup SB (Sort By) callbacks.
self.sort_by_cb = {
"n": self._sort_by_name,
"v": self._sort_by_votes,
"p": self._sort_by_popularity,
"w": self._sort_by_voted,
"o": self._sort_by_notify,
"m": self._sort_by_maintainer,
"l": self._sort_by_last_modified
}
def _search_by_namedesc(self, keywords: str) -> orm.Query:
self.query = self.query.filter(
or_(Package.Name.like(f"%{keywords}%"),
Package.Description.like(f"%{keywords}%"))
)
return self
def _search_by_name(self, keywords: str) -> orm.Query:
self.query = self.query.filter(Package.Name.like(f"%{keywords}%"))
return self
def _search_by_exact_name(self, keywords: str) -> orm.Query:
self.query = self.query.filter(Package.Name == keywords)
return self
def _search_by_pkgbase(self, keywords: str) -> orm.Query:
self.query = self.query.filter(PackageBase.Name.like(f"%{keywords}%"))
return self
def _search_by_exact_pkgbase(self, keywords: str) -> orm.Query:
self.query = self.query.filter(PackageBase.Name == keywords)
return self
def _search_by_keywords(self, keywords: str) -> orm.Query:
self.query = self.query.join(PackageKeyword).filter(
PackageKeyword.Keyword == keywords
)
return self
def _search_by_maintainer(self, keywords: str) -> orm.Query:
self.query = self.query.join(
User, User.ID == PackageBase.MaintainerUID
).filter(User.Username == keywords)
return self
def _search_by_comaintainer(self, keywords: str) -> orm.Query:
self.query = self.query.join(PackageComaintainer).join(
User, User.ID == PackageComaintainer.UsersID
).filter(User.Username == keywords)
return self
def _search_by_co_or_maintainer(self, keywords: str) -> orm.Query:
self.query = self.query.join(
PackageComaintainer,
isouter=True
).join(
User, or_(User.ID == PackageBase.MaintainerUID,
User.ID == PackageComaintainer.UsersID)
).filter(User.Username == keywords)
return self
def _search_by_submitter(self, keywords: str) -> orm.Query:
self.query = self.query.join(
User, User.ID == PackageBase.SubmitterUID
).filter(User.Username == keywords)
return self
def search_by(self, search_by: str, keywords: str) -> orm.Query:
if search_by not in self.search_by_cb:
search_by = "nd" # Default: Name, Description
callback = self.search_by_cb.get(search_by)
result = callback(keywords)
return result
def _sort_by_name(self, order: str):
column = getattr(Package.Name, order)
self.query = self.query.order_by(column())
return self
def _sort_by_votes(self, order: str):
column = getattr(PackageBase.NumVotes, order)
self.query = self.query.order_by(column())
return self
def _sort_by_popularity(self, order: str):
column = getattr(PackageBase.Popularity, order)
self.query = self.query.order_by(column())
return self
def _sort_by_voted(self, order: str):
# FIXME: Currently, PHP is destroying this implementation
# in terms of performance. We should improve this; there's no
# reason it should take _longer_.
column = getattr(
case([(PackageVote.UsersID == self.user.ID, 1)], else_=0),
order
)
self.query = self.query.order_by(column(), Package.Name.desc())
return self
def _sort_by_notify(self, order: str):
# FIXME: Currently, PHP is destroying this implementation
# in terms of performance. We should improve this; there's no
# reason it should take _longer_.
column = getattr(
case([(PackageNotification.UserID == self.user.ID, 1)], else_=0),
order
)
self.query = self.query.order_by(column(), Package.Name.desc())
return self
def _sort_by_maintainer(self, order: str):
column = getattr(User.Username, order)
self.query = self.query.join(
User, User.ID == PackageBase.MaintainerUID, isouter=True
).order_by(column())
return self
def _sort_by_last_modified(self, order: str):
column = getattr(PackageBase.ModifiedTS, order)
self.query = self.query.order_by(column())
return self
def sort_by(self, sort_by: str, ordering: str = "d") -> orm.Query:
if sort_by not in self.sort_by_cb:
sort_by = "n" # Default: Name.
callback = self.sort_by_cb.get(sort_by)
if ordering not in self.FULL_SORT_ORDER:
ordering = "d" # Default: Descending.
ordering = self.FULL_SORT_ORDER.get(ordering)
return callback(ordering)
def results(self) -> orm.Query:
# Store the total count of all records found up to limit.
limit = (config.getint("options", "max_search_results")
or DEFAULT_MAX_RESULTS)
self.total_count = self.query.limit(limit).count()
# Return the query to the user.
return self.query

View file

@ -5,6 +5,7 @@ from fastapi import APIRouter, Request, Response
from fastapi.responses import RedirectResponse
from sqlalchemy import and_
import aurweb.filters
import aurweb.models.package_comment
import aurweb.models.package_keyword
import aurweb.packages.util
@ -21,12 +22,92 @@ from aurweb.models.package_request import PackageRequest
from aurweb.models.package_source import PackageSource
from aurweb.models.package_vote import PackageVote
from aurweb.models.relation_type import CONFLICTS_ID
from aurweb.packages.util import get_pkgbase
from aurweb.packages.search import PackageSearch
from aurweb.packages.util import get_pkgbase, query_notified, query_voted
from aurweb.templates import make_context, render_template
router = APIRouter()
async def packages_get(request: Request, context: Dict[str, Any]):
# Query parameters used in this request.
context["q"] = dict(request.query_params)
# Per page and offset.
per_page = context["PP"] = int(request.query_params.get("PP", 50))
offset = context["O"] = int(request.query_params.get("O", 0))
# Query search by.
search_by = context["SeB"] = request.query_params.get("SeB", "nd")
# Query sort by.
sort_by = context["SB"] = request.query_params.get("SB", "n")
# Query sort order.
sort_order = request.query_params.get("SO", None)
# Apply ordering, limit and offset.
search = PackageSearch(request.user)
# For each keyword found in K, apply a search_by filter.
# This means that for any sentences separated by spaces,
# they are used as if they were ANDed.
keywords = context["K"] = request.query_params.get("K", str())
keywords = keywords.split(" ")
for keyword in keywords:
search.search_by(search_by, keyword)
flagged = request.query_params.get("outdated", None)
if flagged:
# If outdated was given, set it up in the context.
context["outdated"] = flagged
# When outdated is set to "on," we filter records which do have
# an OutOfDateTS. When it's set to "off," we filter out any which
# do **not** have OutOfDateTS.
criteria = None
if flagged == "on":
criteria = PackageBase.OutOfDateTS.isnot
else:
criteria = PackageBase.OutOfDateTS.is_
# Apply the flag criteria to our PackageSearch.query.
search.query = search.query.filter(criteria(None))
submit = request.query_params.get("submit", "Go")
if submit == "Orphans":
# If the user clicked the "Orphans" button, we only want
# orphaned packages.
search.query = search.query.filter(PackageBase.MaintainerUID.is_(None))
# Apply user-specified specified sort column and ordering.
search.sort_by(sort_by, sort_order)
# If no SO was given, default the context SO to 'a' (Ascending).
# By default, if no SO is given, the search should sort by 'd'
# (Descending), but display "Ascending" for the Sort order select.
if sort_order is None:
sort_order = "a"
context["SO"] = sort_order
# Insert search results into the context.
results = search.results()
context["packages"] = results.limit(per_page).offset(offset)
context["packages_voted"] = query_voted(
context.get("packages"), request.user)
context["packages_notified"] = query_notified(
context.get("packages"), request.user)
context["packages_count"] = search.total_count
return render_template(request, "packages.html", context)
@router.get("/packages")
async def packages(request: Request) -> Response:
context = make_context(request, "Packages")
return await packages_get(request, context)
async def make_single_context(request: Request,
pkgbase: PackageBase) -> Dict[str, Any]:
""" Make a basic context for package or pkgbase.

View file

@ -1,5 +1,6 @@
import copy
import functools
import math
import os
import zoneinfo
@ -35,6 +36,7 @@ _env.filters["urlencode"] = util.to_qs
_env.filters["quote_plus"] = quote_plus
_env.filters["get_vote"] = util.get_vote
_env.filters["number_format"] = util.number_format
_env.filters["ceil"] = math.ceil
# Add captcha filters.
_env.filters["captcha_salt"] = captcha.captcha_salt_filter

View file

@ -22,6 +22,7 @@ aur_location = https://aur.archlinux.org
git_clone_uri_anon = https://aur.archlinux.org/%s.git
git_clone_uri_priv = ssh://aur@aur.archlinux.org/%s.git
max_rpc_results = 5000
max_search_results = 2500
max_depends = 1000
aur_request_ml = aur-requests@lists.archlinux.org
request_idle_time = 1209600

13
poetry.lock generated
View file

@ -513,6 +513,14 @@ python-versions = ">=3.6"
[package.dependencies]
pyparsing = ">=2.0.2"
[[package]]
name = "paginate"
version = "0.5.6"
description = "Divides large result sets into pages for easier browsing"
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "pluggy"
version = "0.13.1"
@ -921,7 +929,7 @@ h11 = ">=0.9.0,<1"
[metadata]
lock-version = "1.1"
python-versions = "*"
content-hash = "96112731ca21a6ff5d0657c6c40979642bb992ae660ba8d6135421718737c6b0"
content-hash = "c262ac1160b83593377fb7520d35c4b8ad81e5acff9d0a2060b2b048e3865b78"
[metadata.files]
aiofiles = [
@ -1328,6 +1336,9 @@ packaging = [
{file = "packaging-21.0-py3-none-any.whl", hash = "sha256:c86254f9220d55e31cc94d69bade760f0847da8000def4dfe1c6b872fd14ff14"},
{file = "packaging-21.0.tar.gz", hash = "sha256:7dc96269f53a4ccec5c0670940a4281106dd0bb343f47b7471f779df49c2fbe7"},
]
paginate = [
{file = "paginate-0.5.6.tar.gz", hash = "sha256:5e6007b6a9398177a7e1648d04fdd9f8c9766a1a945bceac82f1929e8c78af2d"},
]
pluggy = [
{file = "pluggy-0.13.1-py2.py3-none-any.whl", hash = "sha256:966c145cd83c96502c3c3868f50408687b38434af77734af1e9ca461a4081d2d"},
{file = "pluggy-0.13.1.tar.gz", hash = "sha256:15b2acde666561e1298d71b523007ed7364de07029219b604cf808bfa1c765b0"},

View file

@ -70,6 +70,7 @@ python-multipart = { version = "0.0.5", python = "^3.9" }
redis = { version = "3.5.3", python = "^3.9" }
requests = { version = "2.26.0", python = "^3.9" }
werkzeug = { version = "2.0.1", python = "^3.9" }
paginate = { version = "0.5.6", python = "^3.9" }
# SQL
alembic = { version = "1.6.5", python = "^3.9" }

View file

@ -1,5 +1,6 @@
[pycodestyle]
max-line-length = 127
ignore = E741, W503
[flake8]
max-line-length = 127
@ -25,6 +26,7 @@ max-complexity = 10
per-file-ignores =
aurweb/routers/accounts.py:E741,C901
test/test_ssh_pub_key.py:E501
aurweb/routers/packages.py:E741
[isort]
line_length = 127

84
templates/packages.html Normal file
View file

@ -0,0 +1,84 @@
{% extends "partials/layout.html" %}
{% block pageContent %}
{% if errors %}
<ul class="errorlist">
{% for error in errors %}
<li>{{ error | tr }}</li>
{% endfor %}
</ul>
{% include "partials/packages/search.html" %}
{% else %}
{% set pages = (packages_count / PP) | ceil %}
{% set page = O / PP %}
{% if success %}
<ul class="success">
{% for message in success %}
<li>{{ message | tr }}</li>
{% endfor %}
</ul>
{% endif %}
{# Search form #}
{% include "partials/packages/search.html" %}
<div id="pkglist-results" class="box">
{# /packages does things a bit roundabout-wise:
If SeB is not given, "nd" is the default.
If SB is not given, "n" is the default.
If SO is not given, "d" is the default.
However, we depend on flipping SO for column sorting.
This section sets those defaults for the context if
they are not already setup. #}
{% if not SeB %}
{% set SeB = "nd" %}
{% endif %}
{% if not SB %}
{% set SB = "n" %}
{% endif %}
{% if not SO %}
{% set SO = "d" %}
{% endif %}
{# Pagination widget #}
{% with total = packages_count,
singular = "%d package found.",
plural = "%d packages found.",
prefix = "/packages" %}
{% include "partials/widgets/pager.html" %}
{% endwith %}
{# Package action form #}
<form id="pkglist-results-form"
action="/packages/?{{ q | urlencode }}"
method="post">
{# Search results #}
{% with voted = packages_voted, notified = packages_notified %}
{% include "partials/packages/search_results.html" %}
{% endwith %}
{# Pagination widget #}
{% with total = packages_count,
singular = "%d package found.",
plural = "%d packages found.",
prefix = "/packages" %}
{% include "partials/widgets/pager.html" %}
{% endwith %}
{% if request.user.is_authenticated() %}
{# Package actions #}
{% include "partials/packages/search_actions.html" %}
{% endif %}
</form>
</div>
{% endif %}
{% endblock %}

View file

@ -8,61 +8,65 @@
<div>
<label for="id_method">{{ "Search by" | tr }}</label>
<select name='SeB'>
<option value="nd">{{ "Name, Description" | tr }}</option>
<option value="n">{{ "Name Only" | tr }}</option>
<option value="b">{{ "Package Base" | tr }}</option>
<option value="N">{{ "Exact Name" | tr }}</option>
<option value="B">{{ "Exact Package Base" | tr }}</option>
<option value="k">{{ "Keywords" | tr }}</option>
<option value="m">{{ "Maintainer" | tr }}</option>
<option value="c">{{ "Co-maintainer" | tr }}</option>
<option value="M">{{ "Maintainer, Co-maintainer" | tr }}</option>
<option value="s">{{ "Submitter" | tr }}</option>
<option value="nd" {% if SeB == "nd" %}selected{% endif %}>{{ "Name, Description" | tr }}</option>
<option value="n" {% if SeB == "n" %}selected{% endif %}>{{ "Name Only" | tr }}</option>
<option value="b" {% if SeB == "b" %}selected{% endif%}>{{ "Package Base" | tr }}</option>
<option value="N" {% if SeB == "N" %}selected{% endif %}>{{ "Exact Name" | tr }}</option>
<option value="B" {% if SeB == "B" %}selected{% endif %}>{{ "Exact Package Base" | tr }}</option>
<option value="k" {% if SeB == "k" %}selected{% endif %}>{{ "Keywords" | tr }}</option>
<option value="m" {% if SeB == "m" %}selected{% endif %}>{{ "Maintainer" | tr }}</option>
<option value="c" {% if SeB == "c" %}selected{% endif %}>{{ "Co-maintainer" | tr }}</option>
<option value="M" {% if SeB == "M" %}selected{% endif %}>{{ "Maintainer, Co-maintainer" | tr }}</option>
<option value="s" {% if SeB == "s" %}selected{% endif %}>{{ "Submitter" | tr }}</option>
</select>
</div>
<div>
<label for="id_q">{{ "Keywords" | tr }}</label>
<input type='text' name='K' size='30' value="" maxlength='35'/>
<input type='text' name='K' size='30' value="{{ K or '' }}" maxlength='35'/>
</div>
<div>
<label for="id_out_of_date">{{ "Out of Date" | tr }}</label>
<select name='outdated'>
<option value=''>{{ "All" | tr }}</option>
<option value='on'>{{ "Flagged" | tr }}</option>
<option value='off'>{{ "Not Flagged" | tr }}</option>
<option value='on' {% if outdated == "on" %}selected{% endif %}>{{ "Flagged" | tr }}</option>
<option value='off' {% if outdated == "off" %}selected{% endif %}>{{ "Not Flagged" | tr }}</option>
</select>
</div>
<div>
<label for="id_sort_by">{{ "Sort by" | tr }}</label>
<select name='SB'>
<option value='n'>{{ "Name" | tr }}</option>
<option value='v'>{{ "Votes" | tr }}</option>
<option value='p'>{{ "Popularity" | tr }}</option>
<option value='w'>{{ "Voted" | tr }}</option>
<option value='o'>{{ "Notify" | tr }}</option>
<option value='m'>{{ "Maintainer" | tr }}</option>
<option value='l'>{{ "Last modified" | tr }}</option>
<option value='n' {% if SB == "n" %}selected{% endif %}>{{ "Name" | tr }}</option>
<option value='v' {% if SB == "v" %}selected{% endif %}>{{ "Votes" | tr }}</option>
<option value='p' {% if SB == "p" %}selected{% endif %}>{{ "Popularity" | tr }}</option>
<option value='w' {% if SB == "w" %}selected{% endif %}>{{ "Voted" | tr }}</option>
<option value='o' {% if SB == "o" %}selected{% endif %}>{{ "Notify" | tr }}</option>
<option value='m' {% if SB == "m" %}selected{% endif %}>{{ "Maintainer" | tr }}</option>
<option value='l' {% if SB == "l" %}selected{% endif %}>{{ "Last modified" | tr }}</option>
</select>
</div>
<div>
<label for="id_order_by">{{ "Sort order" | tr }}</label>
<select name='SO'>
<option value='a'>{{ "Ascending" | tr }}</option>
<option value='d'>{{ "Descending" | tr }}</option>
<option value='a' {% if SO == "a" %}selected{% endif %}>{{ "Ascending" | tr }}</option>
<option value='d' {% if SO == "d" %}selected{% endif %}>{{ "Descending" | tr }}</option>
</select>
</div>
<div>
<label for="id_per_page">{{ "Per page" | tr }}</label>
<select name='PP'>
<option value="50">50</option>
<option value="100">100</option>
<option value="250">250</option>
<option value="50" {% if PP == 50 %}selected{% endif %}>50</option>
<option value="100" {% if PP == 100 %}selected{% endif %}>100</option>
<option value="250" {% if PP == 250 %}selected{% endif %}>250</option>
</select>
</div>
<div>
<label>&nbsp;</label>
<input type='submit' class='button' name='do_Search' value='{{ "Go" | tr }}'/>
<input type='submit' class='button' name='do_Orphans' value='{{ "Orphans" | tr }}'/>
<button type='submit' class='button' name='submit' value='Go'>
{{ "Go" | tr }}
</button>
<button type='submit' class='button' name='submit' value='Orphans'>
{{ "Orphans" | tr }}
</button>
</div>
</fieldset>
</form>

View file

@ -0,0 +1,25 @@
<p>
<select name="action">
<option value="">{{ "Actions" | tr }}</option>
<option value="unflag">{{ "Unflag Out-of-date" | tr }}</option>
<option value="adopt">{{ "Adopt Packages" | tr }}</option>
<option value="disown">{{ "Disown Packages" | tr }}</option>
{% if request.user.is_trusted_user() or request.user.is_developer() %}
<option value="delete">{{ "Delete Packages" | tr }}</option>
{% endif %}
<option value="notify">{{ "Notify" | tr }}</option>
<option value="unnotify">{{ "UnNotify" | tr }}</option>
</select>
{% if request.user.is_trusted_user() or request.user.is_developer() %}
<label for="merge_into">{{ "Merge into" | tr }}</label>
<input id="merge_into" type="text" name="merge_into" />
{% endif %}
<label class="confirmation">
<input type="checkbox" name="confirm" />
Confirm
</label>
<input id="search-action-submit" class="button" type="submit" value="Go" />
</p>

View file

@ -0,0 +1,114 @@
<table {% if table_id %}id="{{ table_id }}"{% endif %} class="results">
<thead>
<tr>
{% if request.user.is_authenticated() %}
<th></th>
{% endif %}
<th>
{% set order = SO %}
{% if SB == "n" %}
{% set order = "d" if order == "a" else "a" %}
{% endif %}
<a href="/packages/?SB=n&SO={{ order }}">
{{ "Name" | tr }}
</a>
</th>
<th>{{ "Version" | tr }}</th>
<th>
{% set order = SO %}
{% if SB == "v" %}
{% set order = "d" if order == "a" else "a" %}
{% endif %}
<a href="/packages/?SB=v&SO={{ order }}">
{{ "Votes" | tr }}
</a>
</th>
<th>
{% set order = SO %}
{% if SB == "p" %}
{% set order = "d" if order == "a" else "a" %}
{% endif %}
<a href="/packages/?SB=p&SO={{ order }}">{{ "Popularity" | tr }}</a><span title="{{ 'Popularity is calculated as the sum of all votes with each vote being weighted with a factor of %.2f per day since its creation.' | format(0.98) }}" class="hover-help"><sup>?</sup></span>
</th>
{% if request.user.is_authenticated() %}
<th>
{% set order = SO %}
{% if SB == "w" %}
{% set order = "d" if order == "a" else "a" %}
{% endif %}
<a href="/packages/?SB=w&SO={{ order }}">
{{ "Voted" | tr }}
</a>
</th>
<th>
{% set order = SO %}
{% if SB == "o" %}
{% set order = "d" if order == "a" else "a" %}
{% endif %}
<a href="/packages/?SB=o&SO={{ order }}">
{{ "Notify" | tr }}
</a>
</th>
{% endif %}
<th>{{ "Description" | tr }}</th>
<th>
{% set order = SO %}
{% if SB == "m" %}
{% set order = "d" if order == "a" else "a" %}
{% endif %}
<a href="/packages/?SB=m&SO={{ order }}">
{{ "Maintainer" | tr }}
</a>
</th>
</tr>
</thead>
<tbody>
{% for pkg in packages %}
{% set flagged = pkg.PackageBase.OutOfDateTS %}
<tr>
{% if request.user.is_authenticated() %}
<td>
<input type="checkbox" name="IDs" value="{{ pkg.ID }}" />
</td>
{% endif %}
<td>
<a href="/packages/{{ pkg.Name }}">
{{ pkg.Name }}
</a>
</td>
{% if flagged %}
<td class="flagged">{{ pkg.Version }}</td>
{% else %}
<td>{{ pkg.Version }}</td>
{% endif %}
<td>{{ pkg.PackageBase.NumVotes }}</td>
<td>
{{ pkg.PackageBase.Popularity | number_format(2) }}
</td>
{% if request.user.is_authenticated() %}
<td>
{% if pkg.PackageBase.ID in voted %}
{{ "Yes" | tr }}
{% endif %}
</td>
<td>
{% if pkg.PackageBase.ID in notified %}
{{ "Yes" | tr }}
{% endif %}
</td>
{% endif %}
<td class="wrap">{{ pkg.Description or '' }}</td>
<td>
{% set maintainer = pkg.PackageBase.Maintainer %}
{% if maintainer %}
<a href="/account/{{ maintainer.Username }}">
{{ maintainer.Username }}
</a>
{% else %}
<span class="error">{{ "orphan" | tr }}</span>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>

View file

@ -0,0 +1,26 @@
{# A pager widget that can be used for navigation of a number of results.
Inputs required:
prefix: Request URI prefix used to produce navigation offsets
singular: Singular sentence to be translated via tn
plural: Plural sentence to be translated via tn
PP: The number of results per page
O: The current offset value
total: The total number of results
#}
{% set page = ((O / PP) | int) %}
{% set pages = ((total / PP) | ceil) %}
<div class="pkglist-stats">
<p>
{{ total | tn(singular, plural) | format(total) }}
{{ "Page %d of %d." | tr | format(page + 1, pages) }}
</p>
{% if pages > 1 %}
<p class="pkglist-nav">
{{ page | pager_nav(total, prefix) | safe }}
<p>
{% endif %}
</div>

View file

@ -5,7 +5,7 @@ import pytest
from sqlalchemy.exc import IntegrityError
from aurweb import db
from aurweb.auth import BasicAuthBackend, account_type_required, has_credential
from aurweb.auth import AnonymousUser, BasicAuthBackend, account_type_required, has_credential
from aurweb.db import create, query
from aurweb.models.account_type import USER, USER_ID, AccountType
from aurweb.models.session import Session
@ -92,3 +92,28 @@ def test_account_type_required():
# But this one should! We have no "FAKE" key.
with pytest.raises(KeyError):
account_type_required({'FAKE'})
def test_is_trusted_user():
user_ = AnonymousUser()
assert not user_.is_trusted_user()
def test_is_developer():
user_ = AnonymousUser()
assert not user_.is_developer()
def test_is_elevated():
user_ = AnonymousUser()
assert not user_.is_elevated()
def test_voted_for():
user_ = AnonymousUser()
assert not user_.voted_for(None)
def test_notified():
user_ = AnonymousUser()
assert not user_.notified(None)

View file

@ -1,5 +1,8 @@
import re
from datetime import datetime
from http import HTTPStatus
from typing import List
import pytest
@ -11,11 +14,14 @@ from aurweb.models.dependency_type import DependencyType
from aurweb.models.official_provider import OfficialProvider
from aurweb.models.package import Package
from aurweb.models.package_base import PackageBase
from aurweb.models.package_comaintainer import PackageComaintainer
from aurweb.models.package_comment import PackageComment
from aurweb.models.package_dependency import PackageDependency
from aurweb.models.package_keyword import PackageKeyword
from aurweb.models.package_notification import PackageNotification
from aurweb.models.package_relation import PackageRelation
from aurweb.models.package_request import PackageRequest
from aurweb.models.package_vote import PackageVote
from aurweb.models.relation_type import PROVIDES_ID, RelationType
from aurweb.models.request_type import DELETION_ID, RequestType
from aurweb.models.user import User
@ -64,6 +70,9 @@ def setup():
PackageDependency.__tablename__,
PackageRelation.__tablename__,
PackageKeyword.__tablename__,
PackageVote.__tablename__,
PackageNotification.__tablename__,
PackageComaintainer.__tablename__,
OfficialProvider.__tablename__
)
@ -101,16 +110,41 @@ def maintainer() -> User:
@pytest.fixture
def package(maintainer: User) -> Package:
""" Yield a Package created by user. """
now = int(datetime.utcnow().timestamp())
with db.begin():
pkgbase = db.create(PackageBase,
Name="test-package",
Maintainer=maintainer)
Maintainer=maintainer,
Packager=maintainer,
Submitter=maintainer,
ModifiedTS=now)
package = db.create(Package,
PackageBase=pkgbase,
Name=pkgbase.Name)
yield package
@pytest.fixture
def packages(maintainer: User) -> List[Package]:
""" Yield 55 packages named pkg_0 .. pkg_54. """
packages_ = []
now = int(datetime.utcnow().timestamp())
with db.begin():
for i in range(55):
pkgbase = db.create(PackageBase,
Name=f"pkg_{i}",
Maintainer=maintainer,
Packager=maintainer,
Submitter=maintainer,
ModifiedTS=now)
package = db.create(Package,
PackageBase=pkgbase,
Name=f"pkg_{i}")
packages_.append(package)
yield packages_
def test_package_not_found(client: TestClient):
with client as request:
resp = request.get("/packages/not_found")
@ -133,7 +167,7 @@ def test_package_official_not_found(client: TestClient, package: Package):
def test_package(client: TestClient, package: Package):
""" Test a single /packages/{name} route. """
""" Test a single / packages / {name} route. """
with client as request:
resp = request.get(package_endpoint(package))
@ -376,3 +410,505 @@ def test_pkgbase(client: TestClient, package: Package):
pkgs = root.findall('.//div[@id="pkgs"]/ul/li/a')
for i, name in enumerate(expected):
assert pkgs[i].text.strip() == name
def test_packages(client: TestClient, packages: List[Package]):
""" Test the / packages route with defaults.
Defaults:
50 results per page
offset of 0
"""
with client as request:
response = request.get("/packages", params={
"SeB": "X" # "X" isn't valid, defaults to "nd"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
stats = root.xpath('//div[@class="pkglist-stats"]/p')[0]
pager_text = re.sub(r'\s+', " ", stats.text.replace("\n", "").strip())
assert pager_text == "55 packages found. Page 1 of 2."
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 50 # Default per-page
def test_packages_search_by_name(client: TestClient, packages: List[Package]):
with client as request:
response = request.get("/packages", params={
"SeB": "n",
"K": "pkg_"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 50 # Default per-page
def test_packages_search_by_exact_name(client: TestClient,
packages: List[Package]):
with client as request:
response = request.get("/packages", params={
"SeB": "N",
"K": "pkg_"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
# There is no package named exactly 'pkg_', we get 0 results.
assert len(rows) == 0
with client as request:
response = request.get("/packages", params={
"SeB": "N",
"K": "pkg_1"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
# There's just one package named 'pkg_1', we get 1 result.
assert len(rows) == 1
def test_packages_search_by_pkgbase(client: TestClient,
packages: List[Package]):
with client as request:
response = request.get("/packages", params={
"SeB": "b",
"K": "pkg_"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 50
def test_packages_search_by_exact_pkgbase(client: TestClient,
packages: List[Package]):
with client as request:
response = request.get("/packages", params={
"SeB": "B",
"K": "pkg_"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 0
with client as request:
response = request.get("/packages", params={
"SeB": "B",
"K": "pkg_1"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_search_by_keywords(client: TestClient,
packages: List[Package]):
# None of our packages have keywords, so this query should return nothing.
with client as request:
response = request.get("/packages", params={
"SeB": "k",
"K": "testKeyword"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 0
# But now, let's create the keyword for the first package.
package = packages[0]
with db.begin():
db.create(PackageKeyword,
PackageBase=package.PackageBase,
Keyword="testKeyword")
# And request packages with that keyword, we should get 1 result.
with client as request:
response = request.get("/packages", params={
"SeB": "k",
"K": "testKeyword"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_search_by_maintainer(client: TestClient,
maintainer: User,
package: Package):
with client as request:
response = request.get("/packages", params={
"SeB": "m",
"K": maintainer.Username
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_search_by_comaintainer(client: TestClient,
maintainer: User,
package: Package):
# Nobody's a comaintainer yet.
with client as request:
response = request.get("/packages", params={
"SeB": "c",
"K": maintainer.Username
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 0
# Now, we create a comaintainer.
with db.begin():
db.create(PackageComaintainer,
PackageBase=package.PackageBase,
User=maintainer,
Priority=1)
# Then test that it's returned by our search.
with client as request:
response = request.get("/packages", params={
"SeB": "c",
"K": maintainer.Username
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_search_by_co_or_maintainer(client: TestClient,
maintainer: User,
package: Package):
with client as request:
response = request.get("/packages", params={
"SeB": "M",
"SB": "BLAH", # Invalid SB; gets reset to default "n".
"K": maintainer.Username
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
with db.begin():
user = db.create(User, Username="comaintainer",
Email="comaintainer@example.org",
Passwd="testPassword")
db.create(PackageComaintainer,
PackageBase=package.PackageBase,
User=user,
Priority=1)
with client as request:
response = request.get("/packages", params={
"SeB": "M",
"K": user.Username
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_search_by_submitter(client: TestClient,
maintainer: User,
package: Package):
with client as request:
response = request.get("/packages", params={
"SeB": "s",
"K": maintainer.Username
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_sort_by_votes(client: TestClient,
maintainer: User,
packages: List[Package]):
# Set the first package's NumVotes to 1.
with db.begin():
packages[0].PackageBase.NumVotes = 1
# Test that, by default, the first result is what we just set above.
with client as request:
response = request.get("/packages", params={
"SB": "v" # Votes.
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
votes = rows[0].xpath('./td')[2] # The third column of the first row.
assert votes.text.strip() == "1"
# Now, test that with an ascending order, the last result is
# the one we set, since the default (above) is descending.
with client as request:
response = request.get("/packages", params={
"SB": "v", # Votes.
"SO": "a", # Ascending.
"O": "50" # Second page.
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
votes = rows[-1].xpath('./td')[2] # The third column of the last row.
assert votes.text.strip() == "1"
def test_packages_sort_by_popularity(client: TestClient,
maintainer: User,
packages: List[Package]):
# Set the first package's Popularity to 0.50.
with db.begin():
packages[0].PackageBase.Popularity = "0.50"
# Test that, by default, the first result is what we just set above.
with client as request:
response = request.get("/packages", params={
"SB": "p" # Popularity
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
pop = rows[0].xpath('./td')[3] # The fourth column of the first row.
assert pop.text.strip() == "0.50"
def test_packages_sort_by_voted(client: TestClient,
maintainer: User,
packages: List[Package]):
now = int(datetime.utcnow().timestamp())
with db.begin():
db.create(PackageVote, PackageBase=packages[0].PackageBase,
User=maintainer, VoteTS=now)
# Test that, by default, the first result is what we just set above.
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
with client as request:
response = request.get("/packages", params={
"SB": "w", # Voted
"SO": "d" # Descending, Voted first.
}, cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
voted = rows[0].xpath('./td')[5] # The sixth column of the first row.
assert voted.text.strip() == "Yes"
# Conversely, everything else was not voted on.
voted = rows[1].xpath('./td')[5] # The sixth column of the second row.
assert voted.text.strip() == str() # Empty.
def test_packages_sort_by_notify(client: TestClient,
maintainer: User,
packages: List[Package]):
db.create(PackageNotification,
PackageBase=packages[0].PackageBase,
User=maintainer)
# Test that, by default, the first result is what we just set above.
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
with client as request:
response = request.get("/packages", params={
"SB": "o", # Voted
"SO": "d" # Descending, Voted first.
}, cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
notify = rows[0].xpath('./td')[6] # The sixth column of the first row.
assert notify.text.strip() == "Yes"
# Conversely, everything else was not voted on.
notify = rows[1].xpath('./td')[6] # The sixth column of the second row.
assert notify.text.strip() == str() # Empty.
def test_packages_sort_by_maintainer(client: TestClient,
maintainer: User,
package: Package):
""" Sort a package search by the maintainer column. """
# Create a second package, so the two can be ordered and checked.
with db.begin():
maintainer2 = db.create(User, Username="maintainer2",
Email="maintainer2@example.org",
Passwd="testPassword")
base2 = db.create(PackageBase, Name="pkg_2", Maintainer=maintainer2,
Submitter=maintainer2, Packager=maintainer2)
db.create(Package, Name="pkg_2", PackageBase=base2)
# Check the descending order route.
with client as request:
response = request.get("/packages", params={
"SB": "m",
"SO": "d"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
col = rows[0].xpath('./td')[5].xpath('./a')[0] # Last column.
assert col.text.strip() == maintainer.Username
# On the other hand, with ascending, we should get reverse ordering.
with client as request:
response = request.get("/packages", params={
"SB": "m",
"SO": "a"
})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
col = rows[0].xpath('./td')[5].xpath('./a')[0] # Last column.
assert col.text.strip() == maintainer2.Username
def test_packages_sort_by_last_modified(client: TestClient,
packages: List[Package]):
now = int(datetime.utcnow().timestamp())
# Set the first package's ModifiedTS to be 1000 seconds before now.
package = packages[0]
with db.begin():
package.PackageBase.ModifiedTS = now - 1000
with client as request:
response = request.get("/packages", params={
"SB": "l",
"SO": "a" # Ascending; oldest modification first.
})
assert response.status_code == int(HTTPStatus.OK)
# We should have 50 (default per page) results.
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 50
# Let's assert that the first item returned was the one we modified above.
row = rows[0]
col = row.xpath('./td')[0].xpath('./a')[0]
assert col.text.strip() == package.Name
def test_packages_flagged(client: TestClient, maintainer: User,
packages: List[Package]):
package = packages[0]
now = int(datetime.utcnow().timestamp())
with db.begin():
package.PackageBase.OutOfDateTS = now
package.PackageBase.Flagger = maintainer
with client as request:
response = request.get("/packages", params={
"outdated": "on"
})
assert response.status_code == int(HTTPStatus.OK)
# We should only get one result from this query; the package we flagged.
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
with client as request:
response = request.get("/packages", params={
"outdated": "off"
})
assert response.status_code == int(HTTPStatus.OK)
# In this case, we should get 54 results, which means that the first
# page will have 50 results (55 packages - 1 outdated = 54 not outdated).
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 50
def test_packages_orphans(client: TestClient, packages: List[Package]):
package = packages[0]
with db.begin():
package.PackageBase.Maintainer = None
with client as request:
response = request.get("/packages", params={"submit": "Orphans"})
assert response.status_code == int(HTTPStatus.OK)
# We only have one orphan. Let's make sure that's what is returned.
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 1
def test_packages_per_page(client: TestClient, maintainer: User):
""" Test the ability for /packages to deal with the PP query
argument specifications (50, 100, 250; default: 50). """
with db.begin():
for i in range(255):
base = db.create(PackageBase, Name=f"pkg_{i}",
Maintainer=maintainer,
Submitter=maintainer,
Packager=maintainer)
db.create(Package, PackageBase=base, Name=base.Name)
# Test default case, PP of 50.
with client as request:
response = request.get("/packages", params={"PP": 50})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 50
# Alright, test the next case, PP of 100.
with client as request:
response = request.get("/packages", params={"PP": 100})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 100
# And finally, the last case, a PP of 250.
with client as request:
response = request.get("/packages", params={"PP": 250})
assert response.status_code == int(HTTPStatus.OK)
root = parse_root(response.text)
rows = root.xpath('//table[@class="results"]/tbody/tr')
assert len(rows) == 250

View file

@ -214,6 +214,11 @@ def test_user_credential_types():
assert aurweb.auth.developer(user)
assert aurweb.auth.trusted_user_or_dev(user)
# Some model authorization checks.
assert user.is_elevated()
assert user.is_trusted_user()
assert user.is_developer()
def test_user_json():
data = json.loads(user.json())

View file

@ -222,3 +222,10 @@ button[type="reset"] {
text-align: right;
}
input#search-action-submit {
width: 80px;
}
.success {
color: green;
}