"""
repository that combines multiple repos together
"""
__all__ = ("tree", "operations")
import os
from functools import partial
from itertools import chain
from operator import itemgetter
from snakeoil import klass
from snakeoil.compatibility import sorted_cmp
from snakeoil.currying import post_curry
from snakeoil.iterables import iter_sort
from ..config.hint import ConfigHint
from ..operations import repo as repo_interface
from . import errors, prototype
[docs]
class operations(repo_interface.operations_proxy):
ops_stop_after_first_supported = frozenset(["install", "uninstall", "replace"])
@klass.cached_property
def raw_operations(self):
return frozenset(
chain.from_iterable(
tree.operations.raw_operations for tree in self.repo.trees
)
)
@klass.cached_property
def enabled_operations(self):
s = set(
chain.from_iterable(
tree.operations.enabled_operations for tree in self.repo.trees
)
)
return frozenset(self._apply_overrides(s))
def _setup_api(self):
for op in self.enabled_operations:
setattr(self, op, partial(self._proxy_op, op))
def _proxy_op(self, op_name, *args, **kwds):
ret = singleton = object()
for tree in self.repo.trees:
ops = tree.operations
if not ops.supports(op_name):
continue
# track the success for return.
ret2 = getattr(ops, op_name)(*args, **kwds)
if ret is singleton:
ret = ret2
else:
ret = ret and ret2
if op_name in self.ops_stop_after_first_supported:
return ret
if ret is singleton:
raise NotImplementedError(self, op_name)
return ret
[docs]
class tree(prototype.tree):
"""Repository combining multiple repos together.
Args:
trees (list): :obj:`pkgcore.repository.prototype.tree` instances
Attributes:
frozen_settable (bool): controls whether frozen is able to be set
on initialization
operations_kls: callable to generate a repo operations instance
trees (list): :obj:`pkgcore.repository.prototype.tree` instances
"""
frozen_settable = False
operations_kls = operations
pkgcore_config_type = ConfigHint(types={"repos": "refs:repo"}, typename="repo")
def __init__(self, *trees, repos=()):
super().__init__()
trees = trees + tuple(repos)
for x in trees:
if not hasattr(x, "itermatch"):
raise errors.InitializationError(
f"{x} is not a repository tree derivative"
)
self.trees = trees
def _get_categories(self):
d = set()
for x in self.trees:
try:
d.update(x.categories)
except (errors.RepoError, KeyError):
pass
if not d:
raise KeyError("failed getting categories")
return tuple(d)
def _get_packages(self, category):
d = set()
for x in self.trees:
try:
d.update(x.packages[category])
except (errors.RepoError, KeyError):
pass
if not d:
raise KeyError(f"category {category!r} not found")
return tuple(d)
def _get_versions(self, package):
d = set()
for x in self.trees:
try:
d.update(x.versions[package])
except (errors.RepoError, KeyError):
pass
if not d:
raise KeyError(f"category {package!r} not found")
return tuple(d)
[docs]
def path_restrict(self, path):
"""Create a package restriction from a given path within a repo.
Args:
path (str): file path, usually to an ebuild or binpkg
Returns:
package restriction
Raises:
ValueError: path doesn't conform to correct repo layout
format or isn't within the repo
"""
for repo in self.trees:
if path not in repo:
continue
try:
return repo.path_restrict(path)
except ValueError:
raise
raise ValueError(f"no repo contains: {path!r}")
[docs]
def itermatch(self, restrict, **kwds):
sorter = kwds.get("sorter", iter)
if sorter is iter:
return (
match
for repo in self.trees
for match in repo.itermatch(restrict, **kwds)
)
# ugly, and a bit slow, but works.
def f(x, y):
l = sorter([x, y])
if l[0] == y:
return 1
return -1
f = post_curry(sorted_cmp, f, key=itemgetter(0))
return iter_sort(f, *[repo.itermatch(restrict, **kwds) for repo in self.trees])
itermatch.__doc__ = prototype.tree.itermatch.__doc__.replace(
"@param", "@keyword"
).replace(":keyword restrict:", ":param restrict:")
def __iter__(self):
return (pkg for repo in self.trees for pkg in repo)
def __len__(self):
return sum(len(repo) for repo in self.trees)
def __contains__(self, obj):
if isinstance(obj, str):
# check by repo id
if obj in map(str, self.trees):
return True
# check by path
path = os.path.realpath(obj)
for repo in self.trees:
try:
repo_path = os.path.realpath(repo.location)
except AttributeError:
continue
if path.startswith(repo_path):
return True
return False
elif isinstance(obj, prototype.tree):
return obj in self.trees
else:
for pkg in self.itermatch(obj):
return True
return False
def __getitem__(self, key):
for t in self.trees:
try:
p = t[key]
return p
except KeyError:
pass
# made it here, no match.
raise KeyError(f"package {key} not found")
def __add__(self, other):
if isinstance(other, prototype.tree):
if other not in self.trees:
self.trees += (other,)
return self
elif isinstance(other, tree):
return tree(*(self.trees + other.trees))
raise TypeError(
f"cannot add {other.__class__.__name__!r} and {self.__class__.__name__!r} objects"
)
def __radd__(self, other):
if isinstance(other, prototype.tree):
if other not in self.trees:
self.trees = (other,) + self.trees
return self
elif isinstance(other, tree):
return tree(*(other.trees + self.trees))
raise TypeError(
f"cannot add {other.__class__.__name__!r} and {self.__class__.__name__!r} objects"
)
def __repr__(self):
return f"<{self.__class__.__module__}.{self.__class__.__name__} trees={getattr(self, 'trees', 'unset')!r} @{id(self):#8x}>"
@property
def pkg_masks(self):
return frozenset(chain.from_iterable(repo.pkg_masks for repo in self.trees))
@property
def location(self):
return tuple(x.location for x in self.trees)
@property
def frozen(self):
"""bool: Repository mutability status."""
return all(x.frozen for x in self.trees)