"""
remote binpkg support
Currently this primarily just holds the Packages cache used for remote, and
local binpkg repositories
"""
__all__ = ("PackagesCacheV0", "PackagesCacheV1")
import os
from operator import itemgetter
from time import time
from snakeoil.chksum import get_chksums
from snakeoil.containers import RefCountingSet
from snakeoil.fileutils import AtomicWriteFile, readlines
from snakeoil.mappings import ImmutableDict, StackedDict
from .. import cache
from ..log import logger
from ..restrictions import packages
def _iter_till_empty_newline(data):
for x in data:
if not x:
return
k, v = x.split(":", 1)
yield k, v.strip()
class CacheEntry(StackedDict):
"""Customized version of StackedDict blocking pop from modifying the target.
Note that this pop doesn't through KeyError if something is missing- just
returns None instead. This is likely to be changed.
"""
def pop(self, key, default=None):
try:
return self[key]
except KeyError:
return default
def find_best_savings(stream, line_prefix):
rcs = RefCountingSet(stream)
line_overhead = len(line_prefix)
stream = ((k, v) for k, v in rcs.items() if v != 1)
return max(stream, key=lambda x: (len(x[0]) + line_overhead) * x[1])[0]
[docs]
class PackagesCacheV0(cache.bulk):
"""Cache backend for writing binpkg Packages caches
Note this follows version 0 semantics- not the most efficient, and
doesn't bundle certain useful keys like RESTRICT
"""
_header_mangling_map = ImmutableDict(
{
"FEATURES": "UPSTREAM_FEATURES",
"ACCEPT_KEYWORDS": "KEYWORDS",
}
)
# this maps from literal keys in the cache to .data[key] expected forms
_deserialize_map = {
"DESC": "DESCRIPTION",
"MTIME": "mtime",
"repo": "REPO",
}
# this maps from .attr to data items.
_serialize_map = {
"DESCRIPTION": "DESC",
"mtime": "MTIME",
"source_repository": "REPO",
}
deserialized_inheritable = frozenset(("CBUILD", "CHOST", "source_repository"))
_pkg_attr_sequences = ("use", "keywords", "iuse")
_deserialized_defaults = dict.fromkeys(
(
"BDEPEND",
"DEPEND",
"RDEPEND",
"PDEPEND",
"IDEPEND",
"BUILD_TIME",
"IUSE",
"KEYWORDS",
"LICENSE",
"PATH",
"PROPERTIES",
"USE",
"DEFINED_PHASES",
"CHOST",
"CBUILD",
"DESC",
"REPO",
"DESCRIPTION",
),
"",
)
_deserialized_defaults.update({"EAPI": "0", "SLOT": "0"})
_deserialized_defaults = ImmutableDict(_deserialized_defaults)
_stored_chfs = ("size", "sha1", "md5", "mtime")
version = 0
def __init__(self, location, *args, **kwds):
self._location = location
vkeys = {"CPV"}
vkeys.update(self._deserialized_defaults)
vkeys.update(x.upper() for x in self._stored_chfs)
kwds["auxdbkeys"] = vkeys
super().__init__(*args, **kwds)
def _handle(self):
return readlines(self._location, True, False, False)
[docs]
def read_preamble(self, handle):
return ImmutableDict(
(self._header_mangling_map.get(k, k), v)
for k, v in _iter_till_empty_newline(handle)
)
def _read_data(self):
try:
handle = self._handle()
except FileNotFoundError:
return {}
self.preamble = self.read_preamble(handle)
defaults = dict(self._deserialized_defaults.items())
defaults.update(
(k, v)
for k, v in self.preamble.items()
if k in self.deserialized_inheritable
)
defaults = ImmutableDict(defaults)
pkgs = {}
count = 0
vkeys = self._known_keys
while True:
raw_d = dict(_iter_till_empty_newline(handle))
d = {k: v for k, v in raw_d.items() if k in vkeys}
if not d:
break
count += 1
cpv = d.pop("CPV", None)
if cpv is None:
cpv = f"{d.pop('CATEGORY')}/{d.pop('PF')}"
if "USE" in d:
d.setdefault("IUSE", d.get("USE", ""))
for src, dst in self._deserialize_map.items():
if src in d:
d.setdefault(dst, d.pop(src))
pkgs[cpv] = CacheEntry(d, defaults)
assert count == int(self.preamble.get("PACKAGES", count))
return pkgs
@classmethod
def _assemble_preamble_dict(cls, target_dicts):
preamble = {
"VERSION": cls.version,
"PACKAGES": len(target_dicts),
"TIMESTAMP": str(int(time())),
}
for key in cls.deserialized_inheritable:
try:
preamble[key] = find_best_savings(
(d[1].get(key, "") for d in target_dicts), key
)
except ValueError:
# empty iterable handed to max
pass
return preamble
@classmethod
def _assemble_pkg_dict(cls, pkg):
d = {}
sequences = cls._pkg_attr_sequences
for key in cls._stored_attrs:
value = getattr(pkg, key)
if key in sequences:
value = " ".join(sorted(value))
else:
value = str(getattr(pkg, key)).strip()
key = key.upper()
d[cls._serialize_map.get(key, key)] = value
for key, value in zip(
cls._stored_chfs, get_chksums(pkg.path, *cls._stored_chfs)
):
if key != "size":
value = "%x" % (value,)
d[key.upper()] = value
d["MTIME"] = str(os.stat(pkg.path).st_mtime)
return d
def _write_data(self):
handler = None
try:
try:
handler = AtomicWriteFile(self._location)
self._serialize_to_handle(list(self.data.items()), handler)
handler.close()
except PermissionError as e:
logger.error(f"failed writing binpkg cache to {self._location!r}: {e}")
finally:
if handler is not None:
handler.discard()
def _serialize_to_handle(self, data, handler):
preamble = self._assemble_preamble_dict(data)
convert_key = self._serialize_map.get
for key in sorted(preamble):
handler.write(f"{convert_key(key, key)}: {preamble[key]}\n")
handler.write("\n")
spacer = " "
if self.version != 0:
spacer = ""
vkeys = self._known_keys
for cpv, pkg_data in sorted(data, key=itemgetter(0)):
handler.write(f"CPV:{spacer}{cpv}\n")
data = [(convert_key(key, key), value) for key, value in pkg_data.items()]
for write_key, value in sorted(data):
if write_key not in vkeys:
continue
value = str(value).strip()
if write_key in preamble:
if value != preamble[write_key]:
if value:
handler.write(f"{write_key}:{spacer}{value}\n")
else:
handler.write(f"{write_key}:\n")
elif value:
handler.write(f"{write_key}:{spacer}{value}\n")
handler.write("\n")
[docs]
def update_from_xpak(self, pkg, xpak):
# invert the lookups here; if you do .items() on an xpak,
# it'll load up the contents in full.
new_dict = {k: xpak[k] for k in self._known_keys if k in xpak}
new_dict["_chf_"] = xpak._chf_
chfs = [x for x in self._stored_chfs if x != "mtime"]
for key, value in zip(chfs, get_chksums(pkg.path, *chfs)):
if key != "size":
value = "%x" % (value,)
new_dict[key.upper()] = value
self[pkg.cpvstr] = new_dict
return new_dict
[docs]
def update_from_repo(self, repo):
# try to collapse certain keys down to the profile preamble
targets = repo.match(packages.AlwaysTrue, sorter=sorted)
if not targets:
# just open/trunc the target instead, and bail
open(self._location, "wb").close()
return
[docs]
class PackagesCacheV1(PackagesCacheV0):
"""Cache backend for writing binpkg Packages caches in format version 1.
See :py:class:`PackagesCacheV0` for usage information; this just writes
a better ondisk format.
"""
deserialized_inheritable = PackagesCacheV0.deserialized_inheritable.union(
("SLOT", "EAPI", "LICENSE", "KEYWORDS", "USE", "RESTRICT")
)
_deserialized_defaults = ImmutableDict(
list(PackagesCacheV0._deserialized_defaults.items()) + [("RESTRICT", "")]
)
@classmethod
def _assemble_pkg_dict(cls, pkg):
# not the most efficient...
d = PackagesCacheV0._assemble_pkg_dict(pkg)
use = set(pkg.use).intersection(pkg.iuse_stripped)
d.pop("IUSE", None)
iuse_bits = [f"-{x}" for x in pkg.iuse_stripped if x not in use]
use.update(iuse_bits)
d["USE"] = " ".join(sorted(use))
return d
version = 1
def get_cache_kls(version):
version = str(version)
if version == "0":
return PackagesCacheV0
elif version in ("1", "-1"):
return PackagesCacheV1
raise KeyError(f"cache version {version} unsupported")