Source code for pkgcore.ebuild.domain

"""
gentoo configuration domain
"""

__all__ = ("domain",)

# XXX doc this up better...

import copy
import os
import re
import tempfile
from collections import defaultdict
from functools import partial
from itertools import chain
from multiprocessing import cpu_count
from operator import itemgetter

from snakeoil import klass
from snakeoil.bash import iter_read_bash, read_bash_dict
from snakeoil.cli.exceptions import find_user_exception
from snakeoil.data_source import local_source
from snakeoil.log import suppress_logging
from snakeoil.mappings import ImmutableDict, ProtectedDict
from snakeoil.osutils import pjoin
from snakeoil.process.spawn import spawn_get_output
from snakeoil.sequences import predicate_split, split_negations, stable_unique

from ..binpkg import repository as binary_repo
from ..cache.flat_hash import md5_cache
from ..config import basics
from ..config import errors as config_errors
from ..config.domain import Failure
from ..config.domain import domain as config_domain
from ..config.hint import ConfigHint
from ..fs.livefs import iter_scan, sorted_scan
from ..log import logger
from ..repository import errors as repo_errors
from ..repository import filtered
from ..repository.util import RepositoryGroup
from ..restrictions import packages, values
from ..restrictions.delegated import delegate
from ..util.parserestrict import ParseError, parse_match
from . import const
from . import repository as ebuild_repo
from .atom import atom as _atom
from .eapi import get_latest_PMS_eapi
from .misc import (
    ChunkedDataDict,
    chunked_data,
    collapsed_restrict_to_data,
    incremental_expansion,
    incremental_expansion_license,
    non_incremental_collapsed_restrict_to_data,
    optimize_incrementals,
)
from .portage_conf import PortageConfig
from .repo_objs import Licenses, RepoConfig
from .triggers import GenerateTriggers


def package_masks(iterable):
    for line, lineno, path in iterable:
        try:
            yield parse_match(line), line, lineno, path
        except ParseError as e:
            logger.warning(f"{path!r}, line {lineno}: parsing error: {e}")


def restriction_payload_splitter(iterable, post_process=lambda x: x):
    for line, lineno, path in iterable:
        pkg, *flags = line.split()
        try:
            # TODO: expand this invocation to allow threading token level validation down.
            # things like "is this a valid use flag?"
            yield parse_match(pkg), tuple(post_process(flags)), line, lineno, path
        except ParseError as e:
            logger.warning(f"{path!r}, line {lineno}: parsing error: {e}")


def package_use_splitter(iterable):
    """Parse package.use user configuration files

    Basic syntax is <query> (?:-?use_f )* (?:USE_EXPAND: (?:flags)*)
    IE, a restriction to match, use flags to turn on.  If a 'flag' ends in ':'
    then it's considered a USE_EXPAND directive, and all that follow are values of that
    USE_EXPAND target and should be expanded into their normalized/long form.
    """

    eapi_obj = get_latest_PMS_eapi()

    def f(tokens: list[str]):
        start_idx = 0
        i = iter(tokens)
        for idx, flag in enumerate(i):
            if flag == "-*":
                start_idx = idx
            elif flag.endswith(":"):
                # we encountered `USE_EXPAND:` , thus all following tokens
                # are values of that.
                use_expand = flag.lower()[:-1]
                yield from tokens[start_idx:idx]
                buffer: list[str] = []
                for flag in i:
                    if flag.endswith(":"):
                        use_expand = flag.lower()[:-1]
                        yield from buffer
                        buffer.clear()
                        continue
                    if flag == "-*":
                        buffer.clear()
                        yield f"-{use_expand}_*"
                        continue
                    if flag.startswith("-"):
                        flag = f"-{use_expand}_{flag[1:]}"
                    else:
                        flag = f"{use_expand}_{flag}"
                    if not eapi_obj.is_valid_use_flag(flag.lstrip("-")):
                        raise ParseError(f"token {flag} is not a valid use flag")
                    buffer.append(flag)
                yield from buffer
                return
            elif not eapi_obj.is_valid_use_flag(flag.lstrip("-")):
                raise ParseError(f"token {flag} is not a valid use flag")
        # if we made it here, there's no USE_EXPAND; thus just return the original sequence
        yield from tokens[start_idx:]

    return restriction_payload_splitter(iterable, post_process=f)


def package_env_splitter(basedir, iterable):
    for line, lineno, path in iterable:
        val = line.split()
        if len(val) == 1:
            logger.warning(f"{path!r}, line {lineno}: missing file reference: {line!r}")
            continue
        paths = []
        for env_file in val[1:]:
            fp = pjoin(basedir, env_file)
            if os.path.exists(fp):
                paths.append(fp)
            else:
                logger.warning(f"{path!r}, line {lineno}: nonexistent file: {fp!r}")
        try:
            yield parse_match(val[0]), tuple(paths), line, lineno, path
        except ParseError as e:
            logger.warning(f"{path!r}, line {lineno}: parsing error: {e}")


def apply_mask_filter(globs, atoms, pkg, mode):
    # mode is ignored; non applicable.
    for r in chain(globs, atoms.get(pkg.key, ())):
        if r.match(pkg):
            return True
    return False


def make_mask_filter(masks, negate=False):
    atoms = defaultdict(list)
    globs = []
    for m in masks:
        if isinstance(m, _atom):
            atoms[m.key].append(m)
        else:
            globs.append(m)
    return delegate(partial(apply_mask_filter, globs, atoms), negate=negate)


def generate_filter(masks, unmasks, *extra):
    # note that we ignore unmasking if masking isn't specified.
    # no point, mainly
    masking = make_mask_filter(masks, negate=True)
    unmasking = make_mask_filter(unmasks, negate=False)
    r = ()
    if masking:
        if unmasking:
            r = (packages.OrRestriction(masking, unmasking, disable_inst_caching=True),)
        else:
            r = (masking,)
    return packages.AndRestriction(
        disable_inst_caching=True, finalize=True, *(r + extra)
    )


def _read_config_file(path):
    """Read all the data files under a given path."""
    try:
        # sort based on location by default; this is to ensure 00 is before 01, and before a
        for fs_obj in sorted(iter_scan(path, follow_symlinks=True)):
            if not fs_obj.is_reg or "/." in fs_obj.location:
                continue
            for lineno, line in iter_read_bash(
                fs_obj.location, allow_line_cont=True, enum_line=True
            ):
                yield line, lineno, fs_obj.location
    except FileNotFoundError:
        pass
    except EnvironmentError as e:
        raise Failure(f"failed reading {path!r}: {e}") from e


def load_property(
    filename, *, read_func=_read_config_file, parse_func=lambda x: x, fallback=()
):
    """Decorator for parsing files using specified read/parse methods.

    :param filename: The filename to parse within the config directory.
    :keyword read_func: An invokable used to read the specified file.
    :keyword parse_func: An invokable used to parse the data.
    :keyword fallback: What to return if the file does not exist.
    :return: A :py:`klass.jit.attr_named` property instance.
    """

    def f(func):
        def _load_and_invoke(func, fallback, self, *args, **kwargs):
            if filename.startswith(os.path.sep):
                # translate root fs calls to prefixed root fs
                path = pjoin(self.root, filename.lstrip(os.path.sep))
            else:
                # assume relative files are inside the config dir
                path = pjoin(self.config_dir, filename)
            if os.path.exists(path):
                data = parse_func(read_func(path))
            else:
                data = fallback
            return func(self, data, *args, **kwargs)

        doc = getattr(func, "__doc__", None)
        jit_attr_named = klass.jit_attr_named(f"_jit_{func.__name__}", doc=doc)
        return jit_attr_named(partial(_load_and_invoke, func, fallback))

    return f


# ow ow ow ow ow ow....
# this manages a *lot* of crap.  so... this is fun.
#
# note also, that this is rather ebuild centric. it shouldn't be, and
# should be redesigned to be a seperation of configuration
# instantiation manglers, and then the ebuild specific chunk (which is
# selected by config)
[docs] class domain(config_domain): # XXX ouch, verify this crap and add defaults and stuff _types = dict.fromkeys( ( "root", "config_dir", "CHOST", "CBUILD", "CTARGET", "CFLAGS", "PATH", "PORTAGE_TMPDIR", "DISTCC_PATH", "DISTCC_DIR", "CCACHE_DIR", ), "str", ) _types["profile"] = "ref:profile" _types["repos"] = "lazy_refs:repo" _types["vdb"] = "lazy_refs:repo" # TODO this is missing defaults pkgcore_config_type = ConfigHint( types=_types, typename="domain", required=["repos", "profile", "vdb"], allow_unknowns=True, ) del _types def __init__( self, profile, repos, vdb, root="/", prefix="/", config_dir="/etc/portage", **settings, ): self.root = settings["ROOT"] = root self.config_dir = config_dir self.prefix = prefix self.ebuild_hook_dir = pjoin(self.config_dir, "env") self.profile = profile self.__repos = repos self.__vdb = vdb # prevent critical variables from being changed in make.conf for k in self.profile.profile_only_variables.intersection(settings.keys()): del settings[k] # Protect original settings from being overridden so matching # package.env settings can be overlaid properly. self._settings = ProtectedDict(settings) @load_property("/etc/profile.env", read_func=read_bash_dict) def system_profile(self, data): # prepend system profile $PATH if it exists if "PATH" in data: path = stable_unique( data["PATH"].split(os.pathsep) + os.environ["PATH"].split(os.pathsep) ) os.environ["PATH"] = os.pathsep.join(path) return ImmutableDict(data) @klass.jit_attr_named("_jit_reset_settings", uncached_val=None) def settings(self): settings = self._settings if "CHOST" in settings and "CBUILD" not in settings: settings["CBUILD"] = settings["CHOST"] # if unset, MAKEOPTS defaults to CPU thread count if "MAKEOPTS" not in settings: settings["MAKEOPTS"] = f"-j{cpu_count()}" # reformat env.d and make.conf incrementals system_profile_settings = {} for x in const.incrementals: system_profile_val = self.system_profile.get(x, ()) make_conf_val = settings.get(x, ()) if isinstance(system_profile_val, str): system_profile_val = tuple(system_profile_val.split()) if isinstance(make_conf_val, str): make_conf_val = tuple(make_conf_val.split()) system_profile_settings[x] = system_profile_val settings[x] = make_conf_val # roughly... all incremental stacks should be interpreted left -> right # as such we start with the env.d settings, append profile settings, # and finally append make.conf settings onto that. for k, v in self.profile.default_env.items(): if k not in settings: settings[k] = v continue if k in const.incrementals: settings[k] = system_profile_settings[k] + v + settings[k] # next we finalize incrementals. for incremental in const.incrementals: # Skip USE/ACCEPT_LICENSE for the time being; hack; we need the # negations currently so that pkg iuse induced enablings can be # disabled by negations. For example, think of the profile doing # USE=-cdr for brasero w/ IUSE=+cdr. Similarly, ACCEPT_LICENSE is # skipped because negations are required for license filtering. if incremental not in settings or incremental in ("USE", "ACCEPT_LICENSE"): continue settings[incremental] = tuple( incremental_expansion( settings[incremental], msg_prefix=f"while expanding {incremental}" ) ) if "ACCEPT_KEYWORDS" not in settings: raise Failure( "No ACCEPT_KEYWORDS setting detected from profile, or user config" ) settings["ACCEPT_KEYWORDS"] = incremental_expansion( settings["ACCEPT_KEYWORDS"], msg_prefix="while expanding ACCEPT_KEYWORDS" ) # pull trigger options from the env self._triggers = GenerateTriggers(self, settings) return ImmutableDict(settings)
[docs] def get_settings_envvar(self, key: str, default=None): if (val := self.settings.get(key)) is not None: return val return os.environ.get(key, default)
@property def arch(self): try: return self.settings["ARCH"] except KeyError: raise Failure("No ARCH setting detected from profile, or user config") @property def distdir(self): try: return self.settings["DISTDIR"] except KeyError: raise Failure("No DISTDIR setting detected from config") @property def stable_arch(self): return self.arch @property def unstable_arch(self): return f"~{self.arch}" @klass.jit_attr_named("_jit_reset_features", uncached_val=None) def features(self): conf_features = list(self.settings.get("FEATURES", ())) env_features = os.environ.get("FEATURES", "").split() return frozenset(optimize_incrementals(conf_features + env_features)) @klass.jit_attr_named("_jit_reset_use", uncached_val=None) def use(self): # append expanded use, FEATURES, and environment defined USE flags use = list(self.settings.get("USE", ())) + list( self.profile.expand_use(self.settings) ) # hackish implementation; if test is on, flip on the flag if "test" in self.features: use.append("test") if "prefix" in self.features: use.append("prefix") return frozenset(optimize_incrementals(use + os.environ.get("USE", "").split())) @klass.jit_attr_named("_jit_reset_enabled_use", uncached_val=None) def enabled_use(self): use = ChunkedDataDict() use.add_bare_global(*split_negations(self.use)) use.merge(self.profile.pkg_use) use.update_from_stream(chunked_data(k, *v) for k, v in self.pkg_use) use.freeze() return use @klass.jit_attr_none def forced_use(self): use = ChunkedDataDict() use.merge(getattr(self.profile, "forced_use")) use.add_bare_global((), (self.arch,)) use.freeze() return use @klass.jit_attr_none def stable_forced_use(self): use = ChunkedDataDict() use.merge(getattr(self.profile, "stable_forced_use")) use.add_bare_global((), (self.arch,)) use.freeze() return use @load_property("package.mask", parse_func=package_masks) def pkg_masks(self, data, debug=False): if debug: return tuple(data) return tuple(x[0] for x in data) @load_property("package.unmask", parse_func=package_masks) def pkg_unmasks(self, data, debug=False): if debug: return tuple(data) return tuple(x[0] for x in data) # TODO: deprecated, remove in 0.11 @load_property("package.keywords", parse_func=restriction_payload_splitter) def pkg_keywords(self, data, debug=False): if debug: return tuple(data) return tuple((x[0], stable_unique(x[1])) for x in data) @load_property("package.accept_keywords", parse_func=restriction_payload_splitter) def pkg_accept_keywords(self, data, debug=False): if debug: return tuple(data) return tuple((x[0], stable_unique(x[1])) for x in data) @load_property("package.license", parse_func=restriction_payload_splitter) def pkg_licenses(self, data, debug=False): if debug: return tuple(data) return tuple((x[0], stable_unique(x[1])) for x in data) @load_property("package.use", parse_func=package_use_splitter) def pkg_use(self, data, debug=False): if debug: return tuple(data) return tuple((x[0], split_negations(stable_unique(x[1]))) for x in data) @load_property("package.env") def pkg_env(self, data, debug=False): func = partial(package_env_splitter, self.ebuild_hook_dir) data = func(data) if debug: return tuple(data) return tuple((x[0], x[1]) for x in data) @klass.jit_attr def bashrcs(self): files = sorted_scan(pjoin(self.config_dir, "bashrc"), follow_symlinks=True) return tuple(local_source(x) for x in files) def _pkg_filters(self, pkg_accept_keywords=None, pkg_keywords=None): if pkg_accept_keywords is None: pkg_accept_keywords = self.pkg_accept_keywords if pkg_keywords is None: pkg_keywords = self.pkg_keywords # ~amd64 -> [amd64, ~amd64] default_keywords = set([self.arch]) default_keywords.update(self.settings["ACCEPT_KEYWORDS"]) for x in self.settings["ACCEPT_KEYWORDS"]: if x.startswith("~"): default_keywords.add(x.lstrip("~")) # create keyword filters accept_keywords = ( pkg_keywords + pkg_accept_keywords + self.profile.accept_keywords ) filters = [ self._make_keywords_filter( default_keywords, accept_keywords, incremental="package.keywords" in const.incrementals, ) ] # add license filters master_license = [] master_license.extend(self.settings.get("ACCEPT_LICENSE", ())) if master_license or self.pkg_licenses: # restrict that matches iff the licenses are allowed restrict = delegate(partial(self._apply_license_filter, master_license)) filters.append(restrict) return tuple(filters) @klass.jit_attr_none def _default_licenses_manager(self): return Licenses(*self.source_repos_raw) def _apply_license_filter(self, master_licenses, pkg, mode): """Determine if a package's license is allowed.""" # note we're not honoring mode; it's always match. # reason is that of not turning on use flags to get acceptable license # pairs, maybe change this down the line? matched_pkg_licenses = [] for atom, licenses in self.pkg_licenses: if atom.match(pkg): matched_pkg_licenses += licenses raw_accepted_licenses = master_licenses + matched_pkg_licenses license_manager = getattr(pkg.repo, "licenses", self._default_licenses_manager) for and_pair in pkg.license.dnf_solutions(): accepted = incremental_expansion_license( pkg, and_pair, license_manager.groups, raw_accepted_licenses, msg_prefix="while checking ACCEPT_LICENSE ", ) if accepted.issuperset(and_pair): return True return False def _make_keywords_filter(self, default_keys, accept_keywords, incremental=False): """Generates a restrict that matches iff the keywords are allowed.""" if not accept_keywords and not self.profile.keywords: return packages.PackageRestriction( "keywords", values.ContainmentMatch(frozenset(default_keys)) ) if self.unstable_arch not in default_keys: # stable; thus empty entries == ~arch def f(r, v): if not v: return r, self.unstable_arch return r, v data = collapsed_restrict_to_data( ((packages.AlwaysTrue, default_keys),), (f(*i) for i in accept_keywords) ) else: if incremental: f = collapsed_restrict_to_data else: f = non_incremental_collapsed_restrict_to_data data = f(((packages.AlwaysTrue, default_keys),), accept_keywords) if incremental: raise NotImplementedError(self._incremental_apply_keywords_filter) else: f = self._apply_keywords_filter return delegate(partial(f, data)) @staticmethod def _incremental_apply_keywords_filter(data, pkg, mode): # note we ignore mode; keywords aren't influenced by conditionals. # note also, we're not using a restriction here. this is faster. allowed = data.pull_data(pkg) return any(True for x in pkg.keywords if x in allowed) def _apply_keywords_filter(self, data, pkg, mode): # note we ignore mode; keywords aren't influenced by conditionals. # note also, we're not using a restriction here. this is faster. pkg_keywords = pkg.keywords for atom, keywords in self.profile.keywords: if atom.match(pkg): pkg_keywords += keywords allowed = data.pull_data(pkg) if "**" in allowed: return True if "*" in allowed: for k in pkg_keywords: if k[0] not in "-~": return True if "~*" in allowed: for k in pkg_keywords: if k[0] == "~": return True return any(True for x in pkg_keywords if x in allowed) @klass.jit_attr_none def use_expand_re(self): expands = "|".join(x.lower() for x in self.profile.use_expand) return re.compile(rf"^(?:[+-])?({expands})_(.*)$") def _split_use_expand_flags(self, use_stream): stream = ((self.use_expand_re.match(x), x) for x in use_stream) flags, ue_flags = predicate_split(bool, stream, itemgetter(0)) return list(map(itemgetter(1), flags)), [ (x[0].groups(), x[1]) for x in ue_flags ]
[docs] def get_package_use_unconfigured(self, pkg, for_metadata=True): """Determine use flags for a given package. Roughly, this should result in the following, evaluated l->r: non USE_EXPAND; profiles, pkg iuse, global configuration, package.use configuration, commandline? stack profiles + pkg iuse; split it into use and use_expanded use; do global configuration + package.use configuration overriding of non-use_expand use if global configuration has a setting for use_expand. Args: pkg: package object for_metadata (bool): if True, we're doing use flag retrieval for metadata generation; otherwise, we're just requesting the raw use flags Returns: Three groups of use flags for the package in the following order: immutable flags, enabled flags, and disabled flags. """ pre_defaults = [x[1:] for x in pkg.iuse if x[0] == "+"] if pre_defaults: pre_defaults, ue_flags = self._split_use_expand_flags(pre_defaults) pre_defaults.extend( x[1] for x in ue_flags if x[0][0].upper() not in self.settings ) attr = ( "stable_" if self.stable_arch in pkg.keywords and self.unstable_arch not in self.settings["ACCEPT_KEYWORDS"] else "" ) disabled = getattr(self.profile, attr + "masked_use").pull_data(pkg) immutable = getattr(self, attr + "forced_use").pull_data(pkg) # lock the configurable use flags to only what's in IUSE, and what's forced # from the profiles (things like userland_GNU and arch) enabled = self.enabled_use.pull_data(pkg, pre_defaults=pre_defaults) # support globs for USE_EXPAND vars use_globs = [u for u in enabled if u.endswith("*")] enabled_use_globs = [] for glob in use_globs: for u in pkg.iuse_stripped: if u.startswith(glob[:-1]): enabled_use_globs.append(u) enabled.difference_update(use_globs) enabled.update(enabled_use_globs) if for_metadata: preserves = pkg.iuse_stripped enabled.intersection_update(preserves) enabled.update(immutable) enabled.difference_update(disabled) return immutable, enabled, disabled
[docs] def get_package_domain(self, pkg): """Get domain object with altered settings from matching package.env entries.""" if getattr(pkg, "_domain", None) is not None: return pkg._domain files = [] for restrict, paths in self.pkg_env: if restrict.match(pkg): files.extend(paths) if files: pkg_settings = dict(self._settings.orig.items()) for path in files: PortageConfig.load_make_conf( pkg_settings, path, allow_sourcing=True, allow_recurse=False, incrementals=True, ) # TODO: Improve pkg domain vs main domain proxying, e.g. static # jitted attrs should always be generated and pulled from the main # domain obj; however, currently each pkg domain instance gets its # own copy so values collapsed on the pkg domain instance aren't # propagated back to the main domain leading to regen per pkg if # requested. pkg_domain = copy.copy(self) pkg_domain._settings = ProtectedDict(pkg_settings) # reset jitted attrs that can pull updated settings for attr in (x for x in dir(self) if x.startswith("_jit_reset_")): setattr(pkg_domain, attr, None) # store altered domain on the pkg obj to avoid recreating pkg domain object.__setattr__(pkg, "_domain", pkg_domain) return pkg_domain return self
[docs] def get_package_bashrcs(self, pkg): yield from self.profile.bashrcs for restrict, bashrcs in self.profile.pkg_bashrcs: if restrict.match(pkg): yield from bashrcs yield from self.bashrcs if not os.path.exists(self.ebuild_hook_dir): return # matching portage behavior... it's whacked. base = pjoin(self.ebuild_hook_dir, pkg.category) dirs = ( pkg.package, f"{pkg.package}:{pkg.slot}", getattr(pkg, "P", None), getattr(pkg, "PF", None), ) for fp in filter(None, dirs): fp = pjoin(base, fp) if os.path.exists(fp): yield local_source(fp)
def _wrap_repo(self, repo, filtered=True): """Create a filtered, wrapped repo object for the domain.""" wrapped_repo = self._configure_repo(repo) if filtered: wrapped_repo = self.filter_repo(wrapped_repo) return wrapped_repo
[docs] def add_repo(self, path, config, configure=False, **kwargs): """Add an external repo to the domain.""" path = os.path.abspath(path) # return repo if it's already registered try: return next(r for r in self.source_repos_raw if r.location == path) except StopIteration: pass # forcibly create repo_config object, otherwise cached version might be used try: repo_config = RepoConfig(path, disable_inst_caching=True) except OSError as e: raise repo_errors.InvalidRepo(str(e)) if repo_config.cache_format is not None: # default to using md5 cache kwargs["cache"] = (md5_cache(path),) repo = ebuild_repo.tree(config, repo_config, **kwargs) self.source_repos_raw += repo # inject repo objects into config to dynamically register repo data = {} repo_conf_data = { "class": "pkgcore.ebuild.repo_objs.RepoConfig", "location": path, } repo_data = { "inherit": ("ebuild-repo-common",), "repo_config": f"conf:{path}", } data[f"conf:{path}"] = basics.AutoConfigSection(repo_conf_data) data[path] = basics.AutoConfigSection(repo_data) config.add_config_source(data) # reset repo-related jit attrs for attr in (x for x in dir(self) if x.startswith("_jit_repo_")): setattr(self, attr, None) if configure: repo = self._wrap_repo(repo) return repo
[docs] def find_repo(self, path, config, configure=False): """Find and add an external repo to the domain given a path.""" path = os.path.abspath(path) with suppress_logging(): while (parent := os.path.dirname(path)) != path: try: return self.add_repo(path, config=config, configure=configure) except repo_errors.InvalidRepo: path = parent
def _configure_repo(self, repo): """Configure a raw repo.""" if not repo.configured: args = [] try: for x in repo.configurables: if x == "domain": args.append(self) elif x == "settings": args.append(self.settings) elif x == "profile": args.append(self.profile) else: args.append(getattr(self, x)) except AttributeError as e: raise Failure( f"failed configuring repo {repo!r}: " f"configurable missing: {e}" ) from e repo = repo.configure(*args) return repo
[docs] def filter_repo( self, repo, pkg_masks=None, pkg_unmasks=None, pkg_filters=None, pkg_accept_keywords=None, pkg_keywords=None, profile=True, ): """Filter a configured repo.""" if pkg_masks is None: pkg_masks = self.pkg_masks if pkg_unmasks is None: pkg_unmasks = self.pkg_unmasks if pkg_filters is None: pkg_filters = self._pkg_filters(pkg_accept_keywords, pkg_keywords) global_masks = [((), repo.pkg_masks)] if profile: global_masks.extend(self.profile._incremental_masks) masks = set() for neg, pos in global_masks: masks.difference_update(neg) masks.update(pos) masks.update(pkg_masks) unmasks = set() if profile: for neg, pos in self.profile._incremental_unmasks: unmasks.difference_update(neg) unmasks.update(pos) unmasks.update(pkg_unmasks) filters = generate_filter(masks, unmasks, *pkg_filters) return filtered.tree(repo, filters, True)
@klass.jit_attr_named("_jit_reset_tmpdir", uncached_val=None) def tmpdir(self): """Temporary directory for the system. Uses PORTAGE_TMPDIR setting and falls back to using the system's TMPDIR if unset. """ path = self.settings.get("PORTAGE_TMPDIR", "") if not os.path.exists(path): try: os.mkdir(path) except EnvironmentError: path = tempfile.gettempdir() logger.warning( f"nonexistent PORTAGE_TMPDIR path, defaulting to {path!r}" ) return os.path.normpath(path) @property def pm_tmpdir(self): """Temporary directory for the package manager.""" return pjoin(self.tmpdir, "portage") @property def repo_configs(self): """All defined repo configs.""" return tuple(r.config for r in self.repos if hasattr(r, "config")) @klass.jit_attr def KV(self): """The version of the running kernel.""" ret, version = spawn_get_output(["uname", "-r"]) if ret == 0: return version[0].strip() raise ValueError("unknown kernel version") @klass.jit_attr_named("_jit_repo_source_repos_raw", uncached_val=None) def source_repos_raw(self): """Group of package repos without filtering.""" repos = [] for r in self.__repos: try: repo = r.instantiate() if not repo.is_supported: logger.warning( f"skipping {r.name!r} repo: unsupported EAPI {str(repo.eapi)!r}" ) continue repos.append(repo) except config_errors.InstantiationError as e: # roll back the exception chain to a meaningful error message exc = find_user_exception(e) if exc is None: exc = e logger.warning(f"skipping {r.name!r} repo: {exc}") return RepositoryGroup(repos) @klass.jit_attr_named("_jit_repo_installed_repos_raw", uncached_val=None) def installed_repos_raw(self): """Group of installed repos without filtering.""" repos = [r.instantiate() for r in self.__vdb] if self.profile.provides_repo is not None: repos.append(self.profile.provides_repo) return RepositoryGroup(repos) @klass.jit_attr_named("_jit_repo_repos_raw", uncached_val=None) def repos_raw(self): """Group of all repos without filtering.""" return RepositoryGroup(chain(self.source_repos_raw, self.installed_repos_raw)) @klass.jit_attr_named("_jit_repo_source_repos", uncached_val=None) def source_repos(self): """Group of configured, filtered package repos.""" repos = [] for repo in self.source_repos_raw: try: repos.append(self._wrap_repo(repo, filtered=True)) except repo_errors.RepoError as e: logger.warning(f"skipping {repo.repo_id!r} repo: {e}") return RepositoryGroup(repos) @klass.jit_attr_named("_jit_repo_installed_repos", uncached_val=None) def installed_repos(self): """Group of configured, installed package repos.""" repos = [] for repo in self.installed_repos_raw: try: repos.append(self._wrap_repo(repo, filtered=False)) except repo_errors.RepoError as e: logger.warning(f"skipping {repo.repo_id!r} repo: {e}") return RepositoryGroup(repos) @klass.jit_attr_named("_jit_repo_unfiltered_repos", uncached_val=None) def unfiltered_repos(self): """Group of all configured repos without filtering.""" repos = chain(self.source_repos, self.installed_repos) return RepositoryGroup( (r.raw_repo if r.raw_repo is not None else r) for r in repos ) @klass.jit_attr_named("_jit_repo_repos", uncached_val=None) def repos(self): """Group of all repos.""" return RepositoryGroup(chain(self.source_repos, self.installed_repos)) @klass.jit_attr_named("_jit_repo_ebuild_repos", uncached_val=None) def ebuild_repos(self): """Group of all ebuild repos bound with configuration data.""" return RepositoryGroup( x for x in self.source_repos if isinstance(x.raw_repo, ebuild_repo.ConfiguredTree) ) @klass.jit_attr_named("_jit_repo_ebuild_repos_unfiltered", uncached_val=None) def ebuild_repos_unfiltered(self): """Group of all ebuild repos without package filtering.""" return RepositoryGroup( x for x in self.unfiltered_repos if isinstance(x, ebuild_repo.ConfiguredTree) ) @klass.jit_attr_named("_jit_repo_ebuild_repos_raw", uncached_val=None) def ebuild_repos_raw(self): """Group of all ebuild repos without filtering.""" return RepositoryGroup( x for x in self.source_repos_raw if isinstance(x, ebuild_repo.UnconfiguredTree) ) @klass.jit_attr_named("_jit_repo_binary_repos", uncached_val=None) def binary_repos(self): """Group of all binary repos bound with configuration data.""" return RepositoryGroup( x for x in self.source_repos if isinstance(x.raw_repo, binary_repo.ConfiguredTree) ) @klass.jit_attr_named("_jit_repo_binary_repos_unfiltered", uncached_val=None) def binary_repos_unfiltered(self): """Group of all binary repos without package filtering.""" return RepositoryGroup( x for x in self.unfiltered_repos if isinstance(x, binary_repo.ConfiguredTree) ) @klass.jit_attr_named("_jit_repo_binary_repos_raw", uncached_val=None) def binary_repos_raw(self): """Group of all binary repos without filtering.""" return RepositoryGroup( x for x in self.source_repos_raw if isinstance(x, binary_repo.tree) ) # multiplexed repos all_repos = klass.alias_attr("repos.combined") all_repos_raw = klass.alias_attr("repos_raw.combined") all_source_repos = klass.alias_attr("source_repos.combined") all_source_repos_raw = klass.alias_attr("source_repos_raw.combined") all_installed_repos = klass.alias_attr("installed_repos.combined") all_installed_repos_raw = klass.alias_attr("installed_repos_raw.combined") all_unfiltered_repos = klass.alias_attr("unfiltered_repos.combined") all_ebuild_repos = klass.alias_attr("ebuild_repos.combined") all_ebuild_repos_unfiltered = klass.alias_attr("ebuild_repos_unfiltered.combined") all_ebuild_repos_raw = klass.alias_attr("ebuild_repos_raw.combined") all_binary_repos = klass.alias_attr("binary_repos.combined") all_binary_repos_unfiltered = klass.alias_attr("binary_repos_unfiltered.combined") all_binary_repos_raw = klass.alias_attr("binary_repos_raw.combined")