Source code for pkgcore.repository.prototype

"""
base repository template
"""

__all__ = ("CategoryLazyFrozenSet", "PackageMapping", "VersionMapping", "tree")

from pathlib import Path
import typing

from snakeoil.klass import jit_attr
from snakeoil.mappings import DictMixin, LazyValDict
from snakeoil.sequences import iflatten_instance

from ..ebuild.atom import atom
from ..operations import repo
from ..restrictions import boolean, packages, restriction, values
from ..restrictions.util import collect_package_restrictions


[docs] class CategoryLazyFrozenSet: """Lazy frozenset for holding categories""" __slots__ = ("_get_values", "_values") def __init__(self, get_values: typing.Callable[[], typing.Iterable[str]]): self._get_values = get_values self._values = None # type: typing.Union[None, frozenset] def __iter__(self): if self._values is None: self._values = frozenset(self._get_values()) return iter(self._values) def __contains__(self, cat: str): if self._values is None: self._values = frozenset(self._get_values()) return cat in self._values
[docs] def force_regen(self): """wipe cached values to trigger a refresh""" self._values = None
[docs] class PackageMapping(DictMixin): def __init__(self, parent_mapping, pull_vals): self._cache = {} self._parent = parent_mapping self._pull_vals = pull_vals def __getitem__(self, key): o = self._cache.get(key) if o is not None: return o if key not in self._parent: raise KeyError(key) self._cache[key] = vals = tuple(self._pull_vals(key)) return vals
[docs] def keys(self): return iter(self._parent)
def __contains__(self, key): return key in self._cache or key in self._parent
[docs] def force_regen(self, cat): self._cache.pop(cat, None)
[docs] class VersionMapping(DictMixin): def __init__(self, parent_mapping, pull_vals): self._cache = {} self._parent = parent_mapping self._pull_vals = pull_vals def __getitem__(self, key): o = self._cache.get(key) if o is not None: return o if not key[1] in self._parent.get(key[0], ()): raise KeyError(key) val = tuple(self._pull_vals(key)) self._cache[key] = val return val
[docs] def keys(self): for cat, pkgs in self._parent.items(): for pkg in pkgs: yield (cat, pkg)
[docs] def force_regen(self, key, val): if val: self._cache[key] = val else: self._cache.pop(key, None)
[docs] class tree: """Template for all repository variants. Args: frozen (bool): controls whether the repository is mutable or immutable Attributes: raw_repo: if wrapping a repo, set raw_repo per instance to it livefs (bool): set it to True if it's a repository representing a livefs package_class: callable to generate a package instance, must override configured (bool): if a repo is unusable for merging/unmerging without being configured, set it to False frozen_settable (bool): controls whether frozen is able to be set on initialization operations_kls: callable to generate a repo operations instance categories (dict): available categories in the repo packages (dict): mapping of packages to categories in the repo versions (dict): mapping of versions to packages in the repo frozen (bool): repository mutability status lock: TODO """ raw_repo = None is_supported = True livefs = False package_class = None configured = True frozen_settable = True operations_kls = repo.operations pkg_masks = frozenset() def __init__(self, frozen=False): self.categories = CategoryLazyFrozenSet(self._get_categories) self.packages = PackageMapping(self.categories, self._get_packages) self.versions = VersionMapping(self.packages, self._get_versions) if self.frozen_settable: self.frozen = frozen self.lock = None
[docs] def configure(self, *args): """Return a configured form of the repository.""" raise NotImplementedError(self, "configure")
def _get_categories(self) -> frozenset[str] | typing.Iterable[str]: """this must return an iterable or tuple of this repo's categories""" raise NotImplementedError(self, "_get_categories") def _get_packages(self, category: str) -> tuple[str] | typing.Iterable[str]: """Receives category and must return the packages of that cat. Converted to tuple""" raise NotImplementedError(self, "_get_packages") def _get_versions( self, package: tuple[str, str] ) -> tuple[str] | typing.Iterable[str]: """Receives (cat/pkg) and must return the cp versions. Converted to tuple""" raise NotImplementedError(self, "_get_versions") def __getitem__(self, cpv): cpv_inst = self.package_class(*cpv) if cpv_inst.fullver not in self.versions[(cpv_inst.category, cpv_inst.package)]: raise KeyError(cpv) return cpv_inst def __setitem__(self, *vals): raise AttributeError def __delitem__(self, cpv): raise AttributeError def __iter__(self): """Filtered iterator over all the repo's packages. All packages with metadata issues are skipped."" """ return self.itermatch(packages.AlwaysTrue) def __len__(self): return sum(len(v) for v in self.versions.values()) def __contains__(self, obj): """Determine if a path or a package is in a repo.""" if isinstance(obj, str): path = Path(obj) try: repo_path = Path(getattr(self, "location")).resolve() except AttributeError: return False # existing relative path if not path.is_absolute() and (repo_path / path).exists(): return True # existing full path try: fullpath = path.resolve() if fullpath.relative_to(repo_path) and fullpath.exists(): return True except ValueError: pass return False else: for pkg in self.itermatch(obj): return True return False
[docs] def has_match(self, atom, **kwds): kwds.pop("sorter", None) kwds.pop("yield_none", None) return any(True for _ in self.itermatch(atom, **kwds))
[docs] def match(self, atom, **kwds): return list(self.itermatch(atom, **kwds))
[docs] def itermatch( self, restrict, sorter=None, pkg_filter=None, versioned=True, raw_pkg_cls=None, pkg_cls=None, force=None, yield_none=False, ): """Generator that yields packages match a restriction. :type restrict: :obj:`pkgcore.restrictions.packages.PackageRestriction` instance. :param restrict: restriction to search via :param sorter: callable to do sorting during searching- if sorting the results, use this instead of sorting externally. :param pkg_filter: callable to do package filtering :param versioned: boolean controlling returning versioned or unversioned pkgs :param raw_pkg_cls: custom package class to use for generating raw pkg instances :param pkg_cls: custom package class to override raw pkg instances with :param yield_none: if True then itermatch will yield None for every non-matching package. This is meant for use in combination with C{twisted.task.cooperate} or other async uses where itermatch should not wait many (wallclock) seconds between yielding packages. If you override this method you should yield None in long-running loops, strictly calling it for every package is not necessary. """ if not isinstance(restrict, restriction.base): raise TypeError( f"restrict must be a pkgcore.restriction.restrictions.base instance: " f"got {restrict!r}" ) if sorter is None: sorter = iter if pkg_filter is None: pkg_filter = iter if raw_pkg_cls is None: if versioned: raw_pkg_cls = self.package_class else: raw_pkg_cls = lambda *args: args if isinstance(restrict, atom): candidates = [(restrict.category, restrict.package)] else: candidates = self._identify_candidates(restrict, sorter) if force is None: match = restrict.match elif force: match = restrict.force_True else: match = restrict.force_False return self._internal_match( candidates, match, raw_pkg_cls=raw_pkg_cls, pkg_cls=pkg_cls, yield_none=yield_none, sorter=sorter, pkg_filter=pkg_filter, versioned=versioned, )
def _internal_gen_candidates( self, candidates, sorter, raw_pkg_cls, pkg_filter, versioned ): for cp in sorter(candidates): if versioned: pkgs = ( raw_pkg_cls(cp[0], cp[1], ver) for ver in self.versions.get(cp, ()) ) else: if self.versions.get(cp, ()): pkgs = (raw_pkg_cls(cp[0], cp[1]),) else: pkgs = () pkgs = iter(pkgs) yield from sorter(pkg_filter(pkgs)) def _internal_match( self, candidates, match_func, pkg_cls, yield_none=False, **kwargs ): for pkg in self._internal_gen_candidates(candidates, **kwargs): if pkg_cls is not None: pkg = pkg_cls(pkg) if match_func(pkg): yield pkg elif yield_none: yield None def _identify_candidates(self, restrict, sorter): # full expansion if not isinstance(restrict, boolean.base) or isinstance(restrict, atom): return self._fast_identify_candidates(restrict, sorter) dsolutions = [ ( [c.restriction for c in collect_package_restrictions(x, ("category",))], [p.restriction for p in collect_package_restrictions(x, ("package",))], ) for x in restrict.iter_dnf_solutions(True) ] # see if any solution state isn't dependent on cat/pkg in anyway. # if so, search whole search space. for x in dsolutions: if not x[0] and not x[1]: if sorter is iter: return self.versions return ( (c, p) for c in sorter(self.categories) for p in sorter(self.packages.get(c, ())) ) # simple cases first. # if one specifies categories, and one doesn't cat_specified = bool(dsolutions[0][0]) pkg_specified = bool(dsolutions[0][1]) pgetter = self.packages.get if any(True for x in dsolutions[1:] if bool(x[0]) != cat_specified): if any(True for x in dsolutions[1:] if bool(x[1]) != pkg_specified): # merde. so we've got a mix- some specify cats, some # don't, some specify pkgs, some don't. # this may be optimizable return self.versions # ok. so... one doesn't specify a category, but they all # specify packages (or don't) pr = values.OrRestriction( *tuple( iflatten_instance((x[1] for x in dsolutions if x[1]), values.base) ) ) return ( (c, p) for c in sorter(self.categories) for p in sorter(pgetter(c, [])) if pr.match(p) ) elif any(True for x in dsolutions[1:] if bool(x[1]) != pkg_specified): # one (or more) don't specify pkgs, but they all specify cats. cr = values.OrRestriction( *tuple(iflatten_instance((x[0] for x in dsolutions), values.base)) ) cats_iter = (c for c in sorter(self.categories) if cr.match(c)) return ((c, p) for c in cats_iter for p in sorter(pgetter(c, []))) return self._fast_identify_candidates(restrict, sorter) def _fast_identify_candidates(self, restrict, sorter): pkg_restrict = set() cat_restrict = set() cat_exact = set() pkg_exact = set() for x in collect_package_restrictions( restrict, ( "category", "package", ), ): if x.attr == "category": cat_restrict.add(x.restriction) elif x.attr == "package": pkg_restrict.add(x.restriction) for e, s in ((pkg_exact, pkg_restrict), (cat_exact, cat_restrict)): l = [x for x in s if isinstance(x, values.StrExactMatch) and not x.negate] s.difference_update(l) e.update(x.exact for x in l) del l if restrict.negate: cat_exact = pkg_exact = () if cat_exact: if not cat_restrict and len(cat_exact) == 1: # Cannot use pop here, cat_exact is reused below. c = next(iter(cat_exact)) if not pkg_restrict and len(pkg_exact) == 1: cp = (c, pkg_exact.pop()) if cp in self.versions: return [cp] return [] cats_iter = [c] else: cat_restrict.add(values.ContainmentMatch(frozenset(cat_exact))) cats_iter = sorter(self._cat_filter(cat_restrict)) elif cat_restrict: cats_iter = self._cat_filter(cat_restrict, negate=restrict.negate) else: cats_iter = sorter(self.categories) if pkg_exact: if not pkg_restrict: if sorter is iter: pkg_exact = tuple(pkg_exact) else: pkg_exact = sorter(pkg_exact) return ((c, p) for c in cats_iter for p in pkg_exact) else: pkg_restrict.add(values.ContainmentMatch(frozenset(pkg_exact))) if pkg_restrict: return self._package_filter(cats_iter, pkg_restrict, negate=restrict.negate) elif not cat_restrict: if sorter is iter and not cat_exact: return self.versions else: return ( (c, p) for c in cats_iter for p in sorter(self.packages.get(c, ())) ) return ((c, p) for c in cats_iter for p in sorter(self.packages.get(c, ()))) def _cat_filter(self, cat_restricts, negate=False): sentinel = not negate cats = [x.match for x in cat_restricts] for x in self.categories: for match in cats: if match(x) == sentinel: yield x break def _package_filter(self, cats_iter, pkg_restricts, negate=False): sentinel = not negate restricts = [x.match for x in pkg_restricts] pkgs_dict = self.packages for cat in cats_iter: for pkg in pkgs_dict.get(cat, ()): for match in restricts: if match(pkg) == sentinel: yield (cat, pkg) break
[docs] def notify_remove_package(self, pkg): """internal function notify the repository that a pkg it provides is being removed """ ver_key = (pkg.category, pkg.package) l = [x for x in self.versions[ver_key] if x != pkg.fullver] if not l: # dead package wipe = list(self.packages[pkg.category]) == [pkg.package] self.packages.force_regen(pkg.category) if wipe: self.categories.force_regen() self.versions.force_regen(ver_key, tuple(l))
[docs] def notify_add_package(self, pkg): """internal function notify the repository that a pkg is being added to it """ ver_key = (pkg.category, pkg.package) s = set(self.versions.get(ver_key, ())) s.add(pkg.fullver) self.categories.force_regen() self.packages.force_regen(pkg.category) self.versions.force_regen(ver_key, tuple(s))
@property def operations(self): return self.get_operations()
[docs] def get_operations(self, observer=None): return self.operations_kls(self)
def __bool__(self): try: next(iter(self.versions)) return True except StopIteration: return False def __str__(self): if self.aliases: return str(self.aliases[0]) return repr(self) @property def aliases(self): potentials = (getattr(self, key, None) for key in ("repo_id", "location")) return tuple(x for x in potentials if x is not None) @jit_attr def masked(self): """Base package mask restriction.""" return packages.OrRestriction(*self.pkg_masks)