Source code for pkgcore.ebuild.repository
"""
Ebuild repository, specific to gentoo ebuild trees.
"""
__all__ = ("UnconfiguredTree", "ConfiguredTree", "ProvidesRepo", "tree")
import locale
import os
from functools import partial, wraps
from itertools import chain, filterfalse
from random import shuffle
from sys import intern
from weakref import WeakValueDictionary
from snakeoil import chksum, klass
from snakeoil.bash import read_dict
from snakeoil.containers import InvertedContains
from snakeoil.data_source import local_source
from snakeoil.fileutils import readlines_utf8
from snakeoil.mappings import ImmutableDict
from snakeoil.obj import make_kls
from snakeoil.osutils import listdir_dirs, listdir_files, pjoin
from snakeoil.sequences import iflatten_instance, stable_unique
from snakeoil.strings import pluralism
from .. import fetch
from ..config.hint import ConfigHint, configurable
from ..log import logger
from ..operations import OperationError
from ..operations import repo as _repo_ops
from ..package import errors as pkg_errors
from ..repository import configured, errors, prototype, util
from ..repository.virtual import RestrictionRepo
from ..restrictions import packages
from ..util import packages as pkgutils
from . import cpv, digest, ebd, ebuild_src
from . import eclass_cache as eclass_cache_mod
from . import errors as ebuild_errors
from . import processor, repo_objs, restricts
from .atom import atom
from .eapi import get_eapi
class repo_operations(_repo_ops.operations):
def _cmd_implementation_manifest(
self, domain, restriction, observer, mirrors=False, force=False, distdir=None
):
manifest_config = self.repo.config.manifests
if manifest_config.disabled:
observer.info(f"{self.repo.repo_id} repo has manifests disabled")
return
required_chksums = set(manifest_config.required_hashes)
write_chksums = manifest_config.hashes
if distdir is None:
distdir = domain.distdir
ret = set()
matches = self.repo.itermatch(restriction, sorter=sorted)
for pkgs in map(list, pkgutils.groupby_pkg(matches)):
key = pkgs[0].key
manifest = pkgs[0].manifest
# check for pkgs masked by bad metadata
if bad_metadata := self.repo._bad_masked.match(pkgs[0].unversioned_atom):
for pkg in bad_metadata:
exc = pkg.data
error_str = f"{pkg.cpvstr}: {exc.msg(verbosity=observer.verbosity)}"
observer.error(error_str)
ret.add(pkg.key)
continue
all_pkgdir_fetchables = {
pkg: {
fetchable.filename: fetchable
for fetchable in iflatten_instance(
pkg.generate_fetchables(
allow_missing_checksums=True,
skip_default_mirrors=(not mirrors),
),
fetch.fetchable,
)
}
for pkg in self.repo.itermatch(pkgs[0].unversioned_atom)
}
# all pkgdir fetchables
pkgdir_fetchables = dict(
chain.from_iterable(all_pkgdir_fetchables[pkg].items() for pkg in pkgs)
)
# fetchables targeted for (re-)manifest generation
fetchables = {}
chksum_set = set(write_chksums)
for filename, fetchable in pkgdir_fetchables.items():
if force or not required_chksums.issubset(fetchable.chksums):
fetchable.chksums = {
k: v for k, v in fetchable.chksums.items() if k in chksum_set
}
fetchables[filename] = fetchable
# Manifest files aren't necessary with thin manifests and no distfiles
if manifest_config.thin and not pkgdir_fetchables:
if os.path.exists(manifest.path):
try:
os.remove(manifest.path)
except EnvironmentError as exc:
observer.error(
"failed removing old manifest: "
f"{key}::{self.repo.repo_id}: {exc}"
)
ret.add(key)
continue
# Manifest file is current and not forcing a refresh
if not force and manifest.distfiles.keys() == pkgdir_fetchables.keys():
continue
# fetch distfiles
pkg_ops = domain.get_pkg_operations(pkgs[0], observer=observer)
try:
if not pkg_ops.fetch(
list(fetchables.values()), observer, distdir=distdir
):
ret.add(key)
continue
except OperationError:
# check for special cases of fetch failures
try:
os.makedirs(distdir, exist_ok=True)
except OSError as exc:
observer.error(
f"failed to create distdir {distdir!r}: {exc.strerror}"
)
return ("failed to create distdir",)
if not os.access(distdir, os.W_OK):
observer.error(f"no write access to distdir: {distdir!r}")
return ("no write access to distdir",)
raise
# calculate checksums for fetched distfiles
try:
for fetchable in fetchables.values():
chksums = chksum.get_chksums(
pjoin(distdir, fetchable.filename), *write_chksums
)
fetchable.chksums = dict(zip(write_chksums, chksums))
except chksum.MissingChksumHandler as exc:
observer.error(f"failed generating chksum: {exc}")
ret.add(key)
break
if key not in ret:
all_fetchables = {
filename: fetchable
for fetchables in all_pkgdir_fetchables.values()
for filename, fetchable in fetchables.items()
if required_chksums.issubset(fetchable.chksums)
}
all_fetchables.update(fetchables)
observer.info(f"generating manifest: {key}::{self.repo.repo_id}")
manifest.update(sorted(all_fetchables.values()), chfs=write_chksums)
# edge case: If all ebuilds for a package were masked bad,
# then it was filtered out of the iterator for the above loop,
# so we handle unreported bad packages here.
missed_bad_set = set()
for pkg in self.repo._bad_masked:
if pkg.key not in ret:
observer.error(
f"{pkg.cpvstr}: {pkg.data.msg(verbosity=observer.verbosity)}"
)
missed_bad_set.add(pkg.key)
ret.update(missed_bad_set)
return ret
def _sort_eclasses(config, repo_config):
repo_path = repo_config.location
masters = repo_config.masters
eclasses = []
default = config.get_default("repo_config")
if repo_config._missing_masters and default is not None:
# use default repo's eclasses for overlays with missing masters
location = default.location
else:
location = repo_path
if not masters:
eclasses = [location]
else:
repo_map = {
alias: r.location
for r in config.objects["repo_config"].values()
for alias in r.aliases
}
eclasses = [repo_map[x] for x in masters]
# add the repo's eclass directories if it's not specified.
# do it in this fashion so that the repo's masters can actually interpose
# this repo's eclasses in between others.
# admittedly an odd thing to do, but it has some benefits
if repo_path not in eclasses:
eclasses.append(repo_path)
eclasses = [
eclass_cache_mod.cache(pjoin(x, "eclass"), location=location) for x in eclasses
]
if len(eclasses) == 1:
eclasses = eclasses[0]
else:
eclasses = list(reversed(eclasses))
eclasses = eclass_cache_mod.StackedCaches(
eclasses, location=location, eclassdir=location
)
return eclasses
[docs]
class ProvidesRepo(util.SimpleTree):
"""Fake, installed repo populated with entries from package.provided."""
[docs]
class PkgProvided(ebuild_src.base):
__slots__ = ("arches", "use")
package_is_real = False
__inst_caching__ = True
@property
def keywords(self):
return self.arches
def __init__(self, *args, arches=(), **kwargs):
super().__init__(*args, **kwargs)
object.__setattr__(self, "use", [])
object.__setattr__(self, "arches", arches)
object.__setattr__(self, "data", {"SLOT": "0"})
object.__setattr__(self, "eapi", get_eapi("0"))
def __init__(self, pkgs, arches, repo_id="package.provided"):
d = {}
for pkg in pkgs:
d.setdefault(pkg.category, {}).setdefault(pkg.package, []).append(
pkg.fullver
)
intermediate_parent = self.PkgProvidedParent()
super().__init__(
d,
pkg_klass=partial(self.PkgProvided, intermediate_parent, arches=arches),
livefs=True,
frozen=True,
repo_id=repo_id,
)
intermediate_parent._parent_repo = self
if not d:
self.match = self.itermatch = self._empty_provides_iterable
self.has_match = self._empty_provides_has_match
@staticmethod
def _empty_provides_iterable(*args, **kwds):
return iter(())
@staticmethod
def _empty_provides_has_match(*args, **kwds):
return False
[docs]
class UnconfiguredTree(prototype.tree):
"""Raw implementation supporting standard ebuild tree.
Return packages don't have USE configuration bound to them.
"""
false_categories = frozenset(["eclass", "profiles", "metadata", "licenses"])
configured = False
configurables = ("domain", "settings")
package_factory = staticmethod(ebuild_src.generate_new_factory)
enable_gpg = False
extension = ".ebuild"
operations_kls = repo_operations
pkgcore_config_type = ConfigHint(
types={
"location": "str",
"eclass_cache": "ref:eclass_cache",
"masters": "refs:repo",
"cache": "refs:cache",
"default_mirrors": "list",
"allow_missing_manifests": "bool",
"repo_config": "ref:repo_config",
},
typename="repo",
)
def __init__(
self,
location,
eclass_cache=None,
masters=(),
cache=(),
default_mirrors=None,
allow_missing_manifests=False,
package_cache=True,
repo_config=None,
):
"""
:param location: on disk location of the tree
:param cache: sequence of :obj:`pkgcore.cache.template.database` instances
to use for storing metadata
:param masters: repo masters this repo inherits from
:param eclass_cache: If not None, :obj:`pkgcore.ebuild.eclass_cache`
instance representing the eclasses available,
if None, generates the eclass_cache itself
:param default_mirrors: Either None, or sequence of mirrors to try
fetching from first, then falling back to other uri
:param package_cache: boolean controlling package instance caching
:param repo_config: :obj:`pkgcore.repo_objs.RepoConfig` instance for the related repo
"""
super().__init__()
self.base = self.location = location
self.package_cache = package_cache
if repo_config is None:
repo_config = repo_objs.RepoConfig(location)
self.config = repo_config
# profiles dir is required by PMS
if not os.path.isdir(self.config.profiles_base):
raise errors.InvalidRepo(
f"missing required profiles dir: {self.location!r}"
)
# verify we support the repo's EAPI
if not self.is_supported:
raise errors.UnsupportedRepo(self)
if eclass_cache is None:
eclass_cache = eclass_cache_mod.cache(
pjoin(self.location, "eclass"), location=self.location
)
self.eclass_cache = eclass_cache
self.masters = tuple(masters)
self.trees = self.masters + (self,)
self.licenses = repo_objs.Licenses(self, *self.masters)
self.profiles = self.config.profiles
if masters:
self.profiles = repo_objs.OverlayedProfiles(*self.trees)
# use mirrors from masters if not defined in the repo
mirrors = dict(self.thirdpartymirrors)
for master in masters:
for k, v in master.mirrors.items():
if k not in mirrors:
mirrors[k] = v
if isinstance(cache, (tuple, list)):
cache = tuple(cache)
else:
cache = (cache,)
self.mirrors = mirrors
self.default_mirrors = default_mirrors
self.cache = cache
self._allow_missing_chksums = allow_missing_manifests
self.package_class = self.package_factory(
self, cache, self.eclass_cache, self.mirrors, self.default_mirrors
)
self._shared_pkg_cache = WeakValueDictionary()
self._bad_masked = RestrictionRepo(repo_id="bad_masked")
self.projects_xml = repo_objs.LocalProjectsXml(
pjoin(self.location, "metadata", "projects.xml")
)
repo_id = klass.alias_attr("config.repo_id")
repo_name = klass.alias_attr("config.repo_name")
aliases = klass.alias_attr("config.aliases")
eapi = klass.alias_attr("config.eapi")
is_supported = klass.alias_attr("config.eapi.is_supported")
external = klass.alias_attr("config.external")
pkg_masks = klass.alias_attr("config.pkg_masks")
@klass.jit_attr
def known_arches(self):
"""Return all known arches for a repo (including masters)."""
return frozenset(chain.from_iterable(r.config.known_arches for r in self.trees))
[docs]
def path_restrict(self, path):
"""Return a restriction from a given path in a repo.
:param path: full or partial path to an ebuild
:return: a package restriction matching the given path if possible
:raises ValueError: if the repo doesn't contain the given path, the
path relates to a file that isn't an ebuild, or the ebuild isn't in the
proper directory layout
"""
if path not in self:
raise ValueError(f"{self.repo_id!r} repo doesn't contain: {path!r}")
if not path.startswith(os.sep) and os.path.exists(pjoin(self.location, path)):
path_chunks = path.split(os.path.sep)
else:
path = os.path.realpath(os.path.abspath(path))
if relpath := path[len(os.path.realpath(self.location)) :].strip("/"):
path_chunks = relpath.split(os.path.sep)
else:
path_chunks = ()
if os.path.isfile(path):
if not path.endswith(".ebuild"):
raise ValueError(f"file is not an ebuild: {path!r}")
elif len(path_chunks) != 3:
# ebuild isn't in a category/PN directory
raise ValueError(
f"ebuild not in the correct directory layout: {path!r}"
)
restrictions = []
# add restrictions until path components run out
try:
restrictions.append(restricts.RepositoryDep(self.repo_id))
if path_chunks[0] in self.categories:
restrictions.append(restricts.CategoryDep(path_chunks[0]))
restrictions.append(restricts.PackageDep(path_chunks[1]))
base = cpv.VersionedCPV(
f"{path_chunks[0]}/{os.path.splitext(path_chunks[2])[0]}"
)
restrictions.append(
restricts.VersionMatch("=", base.version, rev=base.revision)
)
except IndexError:
pass
return packages.AndRestriction(*restrictions)
def __getitem__(self, cpv):
cpv_inst = self.package_class(*cpv)
if cpv_inst.fullver not in self.versions[(cpv_inst.category, cpv_inst.package)]:
raise KeyError(cpv)
return cpv_inst
[docs]
def rebind(self, **kwds):
"""Generate a new tree instance with the same location using new keywords.
:param kwds: see __init__ for valid values
"""
o = self.__class__(self.location, **kwds)
o.categories = self.categories
o.packages = self.packages
o.versions = self.versions
return o
@klass.jit_attr
def thirdpartymirrors(self):
mirrors = {}
fp = pjoin(self.location, "profiles", "thirdpartymirrors")
try:
for k, v in read_dict(fp, splitter=None).items():
v = v.split()
# shuffle mirrors so the same ones aren't used every time
shuffle(v)
mirrors[k] = v
except FileNotFoundError:
pass
return ImmutableDict(mirrors)
@klass.jit_attr
def use_expand_desc(self):
"""Inherited USE_EXPAND settings for the repo."""
d = {}
for repo in self.trees:
d.update(repo.config.use_expand_desc)
return ImmutableDict(d)
@klass.jit_attr
def use_expand_sort(self):
"""Inherited mapping of USE_EXPAND sorting keys for the repo."""
d = {}
for repo in self.trees:
d.update(repo.config.use_expand_sort)
return ImmutableDict(d)
[docs]
def use_expand_sorter(self, group):
"""Sorting function for a given USE_EXPAND group."""
try:
ordering = self.use_expand_sort[group.lower()]
return lambda k: ordering.get(k, -1)
except KeyError:
# nonexistent USE_EXPAND group
return lambda k: k
@klass.jit_attr
def category_dirs(self):
try:
return frozenset(
map(
intern,
filterfalse(
self.false_categories.__contains__,
(x for x in listdir_dirs(self.base) if not x.startswith(".")),
),
)
)
except EnvironmentError as e:
logger.error(f"failed listing categories: {e}")
return ()
def _get_categories(self):
categories = frozenset(
chain.from_iterable(repo.config.categories for repo in self.trees)
)
if categories:
return categories
return self.category_dirs
def _get_packages(self, category):
cpath = pjoin(self.base, category.lstrip(os.path.sep))
try:
return tuple(listdir_dirs(cpath))
except FileNotFoundError:
if category in self.categories:
# ignore it, since it's PMS mandated that it be allowed.
return ()
except EnvironmentError as e:
category = pjoin(self.base, category.lstrip(os.path.sep))
raise KeyError(
f"failed fetching packages for category {category}: {e}"
) from e
def _get_versions(self, catpkg):
"""Determine available versions for a given package.
Ebuilds with mismatched or invalid package names are ignored.
"""
cppath = pjoin(self.base, catpkg[0], catpkg[1])
pkg = f"{catpkg[-1]}-"
lp = len(pkg)
extension = self.extension
ext_len = -len(extension)
try:
return tuple(
x[lp:ext_len]
for x in listdir_files(cppath)
if x[ext_len:] == extension and x[:lp] == pkg
)
except EnvironmentError as e:
raise KeyError(
"failed fetching versions for package %s: %s"
% (pjoin(self.base, "/".join(catpkg)), str(e))
) from e
def _pkg_filter(self, raw, error_callback, pkgs):
"""Filter packages with bad metadata."""
while True:
try:
pkg = next(pkgs)
except pkg_errors.PackageError:
# ignore pkgs with invalid CPVs
continue
except StopIteration:
return
if raw:
yield pkg
elif (
self._bad_masked.has_match(pkg.versioned_atom)
and error_callback is not None
):
error_callback(self._bad_masked[pkg.versioned_atom])
else:
# check pkgs for unsupported/invalid EAPIs and bad metadata
try:
if not pkg.is_supported:
exc = pkg_errors.MetadataException(
pkg, "eapi", f"EAPI '{pkg.eapi}' is not supported"
)
self._bad_masked[pkg.versioned_atom] = exc
if error_callback is not None:
error_callback(exc)
continue
# TODO: add a generic metadata validation method to avoid slow metadata checks?
pkg.data
pkg.slot
pkg.required_use
except pkg_errors.MetadataException as e:
self._bad_masked[e.pkg.versioned_atom] = e
if error_callback is not None:
error_callback(e)
continue
yield pkg
[docs]
def itermatch(self, *args, **kwargs):
raw = "raw_pkg_cls" in kwargs or not kwargs.get("versioned", True)
error_callback = kwargs.pop("error_callback", None)
kwargs.setdefault("pkg_filter", partial(self._pkg_filter, raw, error_callback))
return super().itermatch(*args, **kwargs)
def _get_ebuild_path(self, pkg):
return pjoin(
self.base,
pkg.category,
pkg.package,
f"{pkg.package}-{pkg.fullver}{self.extension}",
)
def _get_ebuild_src(self, pkg):
return local_source(self._get_ebuild_path(pkg), encoding="utf8")
def _get_shared_pkg_data(self, category, package):
key = (category, package)
o = self._shared_pkg_cache.get(key)
if o is None:
mxml = self._get_metadata_xml(category, package)
manifest = self._get_manifest(category, package)
o = repo_objs.SharedPkgData(mxml, manifest)
self._shared_pkg_cache[key] = o
return o
def _get_metadata_xml(self, category, package):
return repo_objs.LocalMetadataXml(
pjoin(self.base, category, package, "metadata.xml")
)
def _get_manifest(self, category, package):
return digest.Manifest(
pjoin(self.base, category, package, "Manifest"),
thin=self.config.manifests.thin,
enforce_gpg=self.enable_gpg,
)
def _get_digests(self, pkg, allow_missing=False):
if self.config.manifests.disabled:
return True, {}
try:
manifest = pkg._shared_pkg_data.manifest
manifest.allow_missing = allow_missing
return allow_missing, manifest.distfiles
except pkg_errors.ParseChksumError as e:
if e.missing and allow_missing:
return allow_missing, {}
raise pkg_errors.MetadataException(pkg, "manifest", str(e))
def __repr__(self):
return "<ebuild %s location=%r @%#8x>" % (
self.__class__.__name__,
self.base,
id(self),
)
@klass.jit_attr
def deprecated(self):
"""Base deprecated packages restriction from profiles/package.deprecated."""
pkg_deprecated = set()
for repo in self.trees:
pkg_deprecated.update(repo.config.pkg_deprecated)
return packages.OrRestriction(*pkg_deprecated)
@klass.jit_attr
def stabilization_groups(self):
"""Return a mapping of stabilization groups to packages."""
base_dir = pjoin(self.location, "metadata", "stabilization-groups")
group_files = {
pjoin(dirname, file)
.removeprefix(base_dir + "/")
.removesuffix(".group"): pjoin(dirname, file)
for dirname, _dirs, files in os.walk(base_dir)
for file in files
if file.endswith(".group")
}
stabilization_groups = {}
for group_name, group_file in group_files.items():
pkgs = set()
for lineno, line in enumerate(readlines_utf8(group_file), 1):
try:
if line := line.split("#", maxsplit=1)[0].strip():
pkgs.add(atom(line))
except ebuild_errors.MalformedAtom as exc:
logger.error(
f"{group_file.removeprefix(self.location)}, line {lineno}: parsing error: {exc}"
)
stabilization_groups[group_name] = frozenset(pkgs)
return ImmutableDict(stabilization_groups)
def _regen_operation_helper(self, **kwds):
return _RegenOpHelper(
self,
force=bool(kwds.get("force", False)),
eclass_caching=bool(kwds.get("eclass_caching", True)),
)
def __getstate__(self):
d = self.__dict__.copy()
del d["_shared_pkg_cache"]
return d
def __setstate__(self, state):
self.__dict__ = state.copy()
self.__dict__["_shared_pkg_cache"] = WeakValueDictionary()
[docs]
@configurable(
typename="repo",
types={
"repo_config": "ref:repo_config",
"cache": "refs:cache",
"eclass_cache": "ref:eclass_cache",
"default_mirrors": "list",
"allow_missing_manifests": "bool",
},
requires_config="config",
)
def tree(
config,
repo_config,
cache=(),
eclass_cache=None,
default_mirrors=None,
allow_missing_manifests=False,
tree_cls=UnconfiguredTree,
):
"""Initialize an unconfigured ebuild repository."""
repo_id = repo_config.repo_id
repo_path = repo_config.location
if repo_config.masters is None:
# if it's None, that means it's not a standalone, and is PMS, or misconfigured.
# empty tuple means it's a standalone repository
default = config.get_default("repo_config")
if default is None:
raise errors.InitializationError(
f"repo {repo_id!r} at {repo_path!r} requires missing default repo"
)
# map external repo ids to their config names
config_map = {
r.repo_id: r.location
for r in config.objects["repo_config"].values()
if r.external
}
try:
masters = []
missing = []
for r in repo_config.masters:
if repo := config.objects["repo"].get(config_map.get(r, r)):
masters.append(repo)
else:
missing.append(r)
except RecursionError:
repo_id = repo_config.repo_id
masters = ", ".join(repo_config.masters)
raise errors.InitializationError(
f"{repo_id!r} repo has cyclic masters: {masters}"
)
if missing:
missing = ", ".join(map(repr, sorted(missing)))
raise errors.InitializationError(
f"repo {repo_id!r} at path {repo_path!r} has missing masters: {missing}"
)
if eclass_cache is None:
eclass_cache = _sort_eclasses(config, repo_config)
return tree_cls(
repo_config.location,
eclass_cache=eclass_cache,
masters=masters,
cache=cache,
default_mirrors=default_mirrors,
allow_missing_manifests=allow_missing_manifests,
repo_config=repo_config,
)
class _RegenOpHelper:
def __init__(self, repo, force=False, eclass_caching=True):
self.force = force
self.eclass_caching = eclass_caching
self.ebp = self.request_ebp()
def request_ebp(self):
ebp = processor.request_ebuild_processor()
if self.eclass_caching:
ebp.allow_eclass_caching()
return ebp
def __call__(self, pkg):
try:
return pkg._fetch_metadata(ebp=self.ebp, force_regen=self.force)
except pkg_errors.MetadataException as e:
# ebuild processor is dead, so force a replacement request
self.ebp = self.request_ebp()
raise
def __del__(self):
if self.eclass_caching:
self.ebp.disable_eclass_caching()
processor.release_ebuild_processor(self.ebp)
[docs]
class ConfiguredTree(configured.tree):
"""Wrapper around a :obj:`UnconfiguredTree` binding build/configuration data (USE)."""
configurable = "use"
config_wrappables = {
x: klass.alias_method("evaluate_depset")
for x in (
"bdepend",
"depend",
"rdepend",
"pdepend",
"idepend",
"fetchables",
"license",
"src_uri",
"restrict",
"required_use",
)
}
def __init__(self, raw_repo, domain, domain_settings):
"""
:param raw_repo: :obj:`UnconfiguredTree` instance
:param domain_settings: environment settings to bind
"""
required_settings = {"USE", "CHOST"}
if missing_settings := required_settings.difference(domain_settings):
s = pluralism(missing_settings)
raise errors.InitializationError(
f"{self.__class__} missing required setting{s}: "
f"{', '.join(map(repr, missing_settings))}"
)
chost = domain_settings["CHOST"]
scope_update = {"chost": chost}
scope_update.update(
(x, domain_settings.get(x.upper(), chost)) for x in ("cbuild", "ctarget")
)
scope_update.update(
(x, domain_settings.get(x.upper(), ""))
for x in ("cflags", "cxxflags", "ldflags")
)
scope_update["operations_callback"] = self._generate_pkg_operations
# update wrapped attr funcs requiring access to the class instance
for k, v in self.config_wrappables.items():
if isinstance(v, str):
self.config_wrappables[k] = getattr(self, v)
super().__init__(
raw_repo, self.config_wrappables, pkg_kls_injections=scope_update
)
self.domain = domain
self.domain_settings = domain_settings
self._delayed_iuse = partial(make_kls(InvertedContains), InvertedContains)
def _wrap_attr(config_wrappables):
"""Register wrapped attrs that require class instance access."""
def _wrap_func(func):
@wraps(func)
def wrapped(*args, **kwargs):
return func(*args, **kwargs)
attr = func.__name__.lstrip("_")
config_wrappables[attr] = func.__name__
return wrapped
return _wrap_func
@_wrap_attr(config_wrappables)
def _iuse_effective(self, raw_pkg_iuse_effective, _enabled_use, pkg):
"""IUSE_EFFECTIVE for a package."""
profile_iuse_effective = self.domain.profile.iuse_effective
return frozenset(profile_iuse_effective.union(raw_pkg_iuse_effective))
@_wrap_attr(config_wrappables)
def _distfiles(self, raw_pkg_distfiles, enabled_use, pkg):
"""Distfiles used by a package."""
return tuple(stable_unique(raw_pkg_distfiles.evaluate_depset(enabled_use)))
@_wrap_attr(config_wrappables)
def _user_patches(self, _raw_pkg_patches, _enabled_use, pkg):
"""User patches that will be applied when building a package."""
# determine available user patches for >= EAPI 6
if pkg.eapi.options.user_patches:
patches = []
patchroot = pjoin(self.domain.config_dir, "patches")
patch_dirs = [
pkg.PF,
f"{pkg.PF}:{pkg.slot}",
pkg.P,
f"{pkg.P}:{pkg.slot}",
pkg.PN,
f"{pkg.PN}:{pkg.slot}",
]
for d in patch_dirs:
for root, _dirs, files in os.walk(pjoin(patchroot, pkg.category, d)):
files = (
pjoin(root, f)
for f in sorted(files, key=locale.strxfrm)
if f.endswith((".diff", ".patch"))
)
patches.append((root, tuple(files)))
return tuple(patches)
return None
def _get_delayed_immutable(self, pkg, immutable):
return InvertedContains(set(pkg.iuse).difference(immutable))
def _get_pkg_kwds(self, pkg):
immutable, enabled, _disabled = self.domain.get_package_use_unconfigured(pkg)
return {
"initial_settings": enabled,
"unchangable_settings": self._delayed_iuse(
self._get_delayed_immutable, pkg, immutable
),
}
def _generate_pkg_operations(self, domain, pkg, **kwds):
return ebd.src_operations(domain, pkg, pkg.repo.eclass_cache, **kwds)