mirror of
https://gitlab.archlinux.org/archlinux/aurweb.git
synced 2025-02-03 10:43:03 +01:00
fixup: feat(archives): add .sha256 and construct archives in tmpdir
This commit is contained in:
parent
40a0e866e7
commit
b119db251b
2 changed files with 140 additions and 119 deletions
|
@ -19,14 +19,14 @@ on the following, right-hand side fields are added to each item.
|
|||
"""
|
||||
|
||||
import gzip
|
||||
import hashlib
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
from collections import defaultdict
|
||||
from subprocess import PIPE, Popen
|
||||
from typing import Any, Dict
|
||||
|
||||
import orjson
|
||||
|
@ -169,6 +169,14 @@ def as_dict(package: Package) -> Dict[str, Any]:
|
|||
}
|
||||
|
||||
|
||||
def sha256sum(file_path: str) -> str:
|
||||
hash = hashlib.sha256()
|
||||
with open(file_path, "rb") as f:
|
||||
while chunk := f.read(io.DEFAULT_BUFFER_SIZE):
|
||||
hash.update(chunk)
|
||||
return hash.hexdigest()
|
||||
|
||||
|
||||
def _main():
|
||||
archivedir = aurweb.config.get("mkpkglists", "archivedir")
|
||||
os.makedirs(archivedir, exist_ok=True)
|
||||
|
@ -287,16 +295,13 @@ def _main():
|
|||
files.append((tmp_metaext, META_EXT))
|
||||
|
||||
for src, dst in files:
|
||||
proc = Popen(["cksum", "-a", "sha256", src], stdout=PIPE)
|
||||
out, _ = proc.communicate()
|
||||
assert proc.returncode == 0
|
||||
|
||||
checksum = sha256sum(src)
|
||||
base = os.path.basename(src)
|
||||
checksum = re.sub(r"SHA256 \(.+\)", f"SHA256 ({base})", out.decode())
|
||||
checksum_formatted = f"SHA256 ({base}) = {checksum}"
|
||||
|
||||
checksum_file = f"{dst}.sha256"
|
||||
with open(checksum_file, "w") as f:
|
||||
f.write(checksum)
|
||||
f.write(checksum_formatted)
|
||||
|
||||
# Move the new archive into its rightful place.
|
||||
shutil.move(src, dst)
|
||||
|
|
|
@ -1,65 +1,34 @@
|
|||
import gzip
|
||||
import json
|
||||
import os
|
||||
|
||||
from typing import List, Union
|
||||
from typing import List
|
||||
from unittest import mock
|
||||
|
||||
import py
|
||||
import pytest
|
||||
|
||||
from aurweb import config, db, util
|
||||
from aurweb import config, db
|
||||
from aurweb.models import License, Package, PackageBase, PackageDependency, PackageLicense, User
|
||||
from aurweb.models.account_type import USER_ID
|
||||
from aurweb.models.dependency_type import DEPENDS_ID
|
||||
from aurweb.testing import noop
|
||||
|
||||
|
||||
class FakeFile:
|
||||
data = str()
|
||||
|
||||
def __init__(self, archive: str, modes: str) -> "FakeFile":
|
||||
self.archive = archive
|
||||
self.modes = modes
|
||||
|
||||
def __enter__(self, *args, **kwargs) -> "FakeFile":
|
||||
return self
|
||||
|
||||
def __exit__(self, *args, **kwargs):
|
||||
print(f"Writing {self.archive}....")
|
||||
self.close()
|
||||
|
||||
def write(self, data: Union[str, bytes]) -> None:
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode()
|
||||
self.data += data
|
||||
|
||||
def writelines(self, dataset: List[Union[str, bytes]]) -> None:
|
||||
util.apply_all(dataset, self.write)
|
||||
|
||||
def close(self) -> None:
|
||||
with open(self.archive, "w") as f:
|
||||
f.write(self.data)
|
||||
|
||||
|
||||
class MockGzipOpen:
|
||||
def __init__(self):
|
||||
self.gzips = dict()
|
||||
|
||||
def open(self, archive: str, modes: str):
|
||||
self.gzips[archive] = FakeFile(archive, modes)
|
||||
return self.gzips.get(archive)
|
||||
|
||||
def get(self, key: str) -> FakeFile:
|
||||
return self.gzips.get(key)
|
||||
|
||||
def __getitem__(self, key: str) -> FakeFile:
|
||||
return self.get(key)
|
||||
|
||||
def __contains__(self, key: str) -> bool:
|
||||
print(self.gzips.keys())
|
||||
return key in self.gzips
|
||||
|
||||
def data(self, archive: str):
|
||||
return self.get(archive).data
|
||||
META_KEYS = [
|
||||
"ID",
|
||||
"Name",
|
||||
"PackageBaseID",
|
||||
"PackageBase",
|
||||
"Version",
|
||||
"Description",
|
||||
"URL",
|
||||
"NumVotes",
|
||||
"Popularity",
|
||||
"OutOfDate",
|
||||
"Maintainer",
|
||||
"FirstSubmitted",
|
||||
"LastModified",
|
||||
"URLPath",
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
|
@ -120,48 +89,81 @@ def config_mock(tmpdir: py.path.local) -> None:
|
|||
config.rehash()
|
||||
|
||||
|
||||
def test_mkpkglists(tmpdir: py.path.local, config_mock: None):
|
||||
def test_mkpkglists(tmpdir: py.path.local, config_mock: None, user: User, packages: List[Package]):
|
||||
from aurweb.scripts import mkpkglists
|
||||
mkpkglists.main()
|
||||
|
||||
PACKAGES = config.get("mkpkglists", "packagesfile")
|
||||
META = config.get("mkpkglists", "packagesmetafile")
|
||||
PKGBASE = config.get("mkpkglists", "pkgbasefile")
|
||||
USERS = config.get("mkpkglists", "userfile")
|
||||
|
||||
expectations = [
|
||||
(
|
||||
PACKAGES,
|
||||
"pkg_0\npkg_1\npkg_2\npkg_3\npkg_4\n",
|
||||
),
|
||||
(
|
||||
PKGBASE,
|
||||
"pkgbase_0\npkgbase_1\npkgbase_2\npkgbase_3\npkgbase_4\n",
|
||||
),
|
||||
(
|
||||
USERS,
|
||||
"test\n"
|
||||
),
|
||||
]
|
||||
|
||||
for (file, expected_content) in expectations:
|
||||
with gzip.open(file, "r") as f:
|
||||
file_content = f.read().decode()
|
||||
assert file_content == expected_content
|
||||
|
||||
with gzip.open(META) as f:
|
||||
metadata = json.load(f)
|
||||
|
||||
assert len(metadata) == len(packages)
|
||||
for pkg in metadata:
|
||||
for key in META_KEYS:
|
||||
assert key in pkg, f"{pkg=} record does not have {key=}"
|
||||
|
||||
for file in (PACKAGES, PKGBASE, USERS, META):
|
||||
with open(f"{file}.sha256") as f:
|
||||
file_sig_content = f.read()
|
||||
expected_prefix = f"SHA256 ({os.path.basename(file)}) = "
|
||||
assert file_sig_content.startswith(expected_prefix)
|
||||
assert len(file_sig_content) == len(expected_prefix) + 64
|
||||
|
||||
|
||||
@mock.patch("sys.argv", ["mkpkglists", "--extended"])
|
||||
def test_mkpkglists_extended_empty(config_mock: None):
|
||||
from aurweb.scripts import mkpkglists
|
||||
mkpkglists.main()
|
||||
|
||||
'''
|
||||
archives = config.get_section("mkpkglists")
|
||||
archives.pop("archivedir")
|
||||
PACKAGES = config.get("mkpkglists", "packagesfile")
|
||||
META = config.get("mkpkglists", "packagesmetafile")
|
||||
META_EXT = config.get("mkpkglists", "packagesmetaextfile")
|
||||
PKGBASE = config.get("mkpkglists", "pkgbasefile")
|
||||
USERS = config.get("mkpkglists", "userfile")
|
||||
|
||||
for archive in archives.values():
|
||||
assert archive in gzips
|
||||
expectations = [
|
||||
(PACKAGES, ""),
|
||||
(PKGBASE, ""),
|
||||
(USERS, ""),
|
||||
(META, "[\n]"),
|
||||
(META_EXT, "[\n]"),
|
||||
]
|
||||
|
||||
# Expect that packagesfile got created, but is empty because
|
||||
# we have no DB records.
|
||||
packages_file = archives.get("packagesfile")
|
||||
assert gzips.data(packages_file) == str()
|
||||
for (file, expected_content) in expectations:
|
||||
with gzip.open(file, "r") as f:
|
||||
file_content = f.read().decode()
|
||||
assert file_content == expected_content, f"{file=} contents malformed"
|
||||
|
||||
# Expect that pkgbasefile got created, but is empty because
|
||||
# we have no DB records.
|
||||
users_file = archives.get("pkgbasefile")
|
||||
assert gzips.data(users_file) == str()
|
||||
|
||||
# Expect that userfile got created, but is empty because
|
||||
# we have no DB records.
|
||||
users_file = archives.get("userfile")
|
||||
assert gzips.data(users_file) == str()
|
||||
|
||||
# Expect that packagesmetafile got created, but is empty because
|
||||
# we have no DB records; it's still a valid empty JSON list.
|
||||
meta_file = archives.get("packagesmetafile")
|
||||
assert gzips.data(meta_file) == "[\n]"
|
||||
|
||||
# Expect that packagesmetafile got created, but is empty because
|
||||
# we have no DB records; it's still a valid empty JSON list.
|
||||
meta_file = archives.get("packagesmetaextfile")
|
||||
assert gzips.data(meta_file) == "[\n]"
|
||||
'''
|
||||
for file in (PACKAGES, PKGBASE, USERS, META, META_EXT):
|
||||
with open(f"{file}.sha256") as f:
|
||||
file_sig_content = f.read()
|
||||
expected_prefix = f"SHA256 ({os.path.basename(file)}) = "
|
||||
assert file_sig_content.startswith(expected_prefix)
|
||||
assert len(file_sig_content) == len(expected_prefix) + 64
|
||||
|
||||
|
||||
@mock.patch("sys.argv", ["mkpkglists", "--extended"])
|
||||
|
@ -170,39 +172,53 @@ def test_mkpkglists_extended(config_mock: None, user: User,
|
|||
from aurweb.scripts import mkpkglists
|
||||
mkpkglists.main()
|
||||
|
||||
'''
|
||||
archives = config.get_section("mkpkglists")
|
||||
archives.pop("archivedir")
|
||||
PACKAGES = config.get("mkpkglists", "packagesfile")
|
||||
META = config.get("mkpkglists", "packagesmetafile")
|
||||
META_EXT = config.get("mkpkglists", "packagesmetaextfile")
|
||||
PKGBASE = config.get("mkpkglists", "pkgbasefile")
|
||||
USERS = config.get("mkpkglists", "userfile")
|
||||
|
||||
for archive in archives.values():
|
||||
assert archive in gzips
|
||||
expectations = [
|
||||
(
|
||||
PACKAGES,
|
||||
"pkg_0\npkg_1\npkg_2\npkg_3\npkg_4\n",
|
||||
),
|
||||
(
|
||||
PKGBASE,
|
||||
"pkgbase_0\npkgbase_1\npkgbase_2\npkgbase_3\npkgbase_4\n",
|
||||
),
|
||||
(
|
||||
USERS,
|
||||
"test\n"
|
||||
),
|
||||
]
|
||||
|
||||
# Expect that packagesfile got created, but is empty because
|
||||
# we have no DB records.
|
||||
packages_file = archives.get("packagesfile")
|
||||
expected = "\n".join([p.Name for p in packages]) + "\n"
|
||||
assert gzips.data(packages_file) == expected
|
||||
for (file, expected_content) in expectations:
|
||||
with gzip.open(file, "r") as f:
|
||||
file_content = f.read().decode()
|
||||
assert file_content == expected_content
|
||||
|
||||
# Expect that pkgbasefile got created, but is empty because
|
||||
# we have no DB records.
|
||||
users_file = archives.get("pkgbasefile")
|
||||
expected = "\n".join([p.PackageBase.Name for p in packages]) + "\n"
|
||||
assert gzips.data(users_file) == expected
|
||||
with gzip.open(META) as f:
|
||||
metadata = json.load(f)
|
||||
|
||||
# Expect that userfile got created, but is empty because
|
||||
# we have no DB records.
|
||||
users_file = archives.get("userfile")
|
||||
assert gzips.data(users_file) == "test\n"
|
||||
assert len(metadata) == len(packages)
|
||||
for pkg in metadata:
|
||||
for key in META_KEYS:
|
||||
assert key in pkg, f"{pkg=} record does not have {key=}"
|
||||
|
||||
# Expect that packagesmetafile got created, but is empty because
|
||||
# we have no DB records; it's still a valid empty JSON list.
|
||||
meta_file = archives.get("packagesmetafile")
|
||||
data = json.loads(gzips.data(meta_file))
|
||||
assert len(data) == 5
|
||||
with gzip.open(META_EXT) as f:
|
||||
extended_metadata = json.load(f)
|
||||
|
||||
# Expect that packagesmetafile got created, but is empty because
|
||||
# we have no DB records; it's still a valid empty JSON list.
|
||||
meta_file = archives.get("packagesmetaextfile")
|
||||
data = json.loads(gzips.data(meta_file))
|
||||
assert len(data) == 5
|
||||
'''
|
||||
assert len(extended_metadata) == len(packages)
|
||||
for pkg in extended_metadata:
|
||||
for key in META_KEYS:
|
||||
assert key in pkg, f"{pkg=} record does not have {key=}"
|
||||
assert isinstance(pkg["Depends"], list)
|
||||
assert isinstance(pkg["License"], list)
|
||||
|
||||
for file in (PACKAGES, PKGBASE, USERS, META, META_EXT):
|
||||
with open(f"{file}.sha256") as f:
|
||||
file_sig_content = f.read()
|
||||
expected_prefix = f"SHA256 ({os.path.basename(file)}) = "
|
||||
assert file_sig_content.startswith(expected_prefix)
|
||||
assert len(file_sig_content) == len(expected_prefix) + 64
|
||||
|
|
Loading…
Add table
Reference in a new issue