"""make.conf translator.
Converts portage config files into :obj:`pkgcore.config` form.
"""
__all__ = (
"PortageConfig",
"SecurityUpgradesViaProfile",
)
import configparser
import errno
import os
import sys
from collections import OrderedDict
from snakeoil.bash import read_bash_dict
from snakeoil.compatibility import IGNORED_EXCEPTIONS
from snakeoil.mappings import DictMixin, ImmutableDict
from snakeoil.osutils import listdir_files, pjoin
from .. import const
from .. import exceptions as base_errors
from ..config import basics
from ..config import errors as config_errors
from ..config.hint import configurable
from ..fs.livefs import sorted_scan
from ..log import logger
from ..pkgsets.glsa import SecurityUpgrades
from . import const as econst
from . import profiles, repo_objs
from .misc import optimize_incrementals
from .repository import errors as repo_errors
def my_convert_hybrid(manager, val, arg_type):
"""Modified convert_hybrid using a sequence of strings for section_refs."""
if arg_type.startswith("refs:"):
subtype = "ref:" + arg_type.split(":", 1)[1]
return [basics.LazyNamedSectionRef(manager, subtype, name) for name in val]
return basics.convert_hybrid(manager, val, arg_type)
[docs]
@configurable(
types={"ebuild_repo": "ref:repo", "vdb": "ref:repo", "profile": "ref:profile"},
typename="pkgset",
)
def SecurityUpgradesViaProfile(ebuild_repo, vdb, profile):
"""generate a GLSA vuln. pkgset limited by profile
Args:
ebuild_repo (:obj:`pkgcore.ebuild.repository.UnconfiguredTree`): target repo
vdb (:obj:`pkgcore.repository.prototype.tree`): livefs
profile (:obj:`pkgcore.ebuild.profiles`): target profile
Returns:
pkgset of relevant security upgrades
"""
arch = profile.arch
if arch is None:
raise config_errors.ComplexInstantiationError("arch wasn't set in profiles")
return SecurityUpgrades(ebuild_repo, vdb, arch)
class ParseConfig(configparser.ConfigParser):
"""Custom ConfigParser class to support returning dict objects."""
def parse_file(self, f, reset=True):
"""Parse config data from a given file handle.
By default the underlying config data is reset on each call if it
exists. This allows multiple files to be easily parsed by a single instance
without combining all the data in one instance.
Args:
f: iterable yielding unicode strings (opened file handle)
reset (boolean): reset config data if it exists before parsing
Returns:
dict: default settings
dict: regular section settings
"""
if self._defaults and reset:
self._defaults = self._dict()
if self._sections and reset:
self._sections = self._dict()
# currently we don't reset section proxies as they should affect
# this direct data dumping
self.read_file(f)
return self._defaults, self._sections
[docs]
class PortageConfig(DictMixin):
"""Support for portage's config file layout."""
_supported_repo_types = {}
def __init__(self, location=None, profile_override=None, **kwargs):
"""
Args:
location (optional[str]): path to the portage config directory,
(defaults to /etc/portage)
profile_override (optional[str]): profile to use instead of the current system
profile, i.e. the target of the /etc/portage/make.profile symlink
root (optional[str]): target root filesystem (defaults to /)
buildpkg (optional[bool]): forcibly disable/enable building binpkgs, otherwise
FEATURES=buildpkg from make.conf is used
Returns:
dict: config settings
"""
self._config = {}
stubconfig = pjoin(const.DATA_PATH, "stubconfig")
if location is None:
path = os.path.abspath(sys.prefix)
while (parent := os.path.dirname(path)) != path:
config_root = pjoin(parent, "etc/portage")
if os.path.exists(config_root):
location = config_root
break
path = parent
else:
# fallback to stub config non-Gentoo systems
location = stubconfig
# override profile when using stub config
if location == stubconfig:
profile_override = pjoin(const.DATA_PATH, "stubrepo/profiles/default")
self.dir = location
# this actually differs from portage parsing- we allow
# make.globals to provide vars used in make.conf, portage keeps
# them separate (kind of annoying)
#
# this isn't preserving incremental behaviour for features/use unfortunately
make_conf = {}
try:
self.load_make_conf(make_conf, pjoin(const.CONFIG_PATH, "make.globals"))
except IGNORED_EXCEPTIONS:
raise
except Exception as e:
raise config_errors.ParsingError("failed to load make.globals") from e
self.load_make_conf(
make_conf,
pjoin(self.dir, "make.conf"),
required=False,
allow_sourcing=True,
incrementals=True,
)
self.root = kwargs.pop("root", make_conf.get("ROOT", "/"))
gentoo_mirrors = [
f"{x.rstrip('/')}/distfiles"
for x in make_conf.pop("GENTOO_MIRRORS", "").split()
]
self.features = frozenset(
optimize_incrementals(make_conf.get("FEATURES", "").split())
)
self._add_sets()
self._add_profile(profile_override)
self["vdb"] = basics.AutoConfigSection(
{
"class": "pkgcore.vdb.ondisk.tree",
"location": pjoin(self.root, "var", "db", "pkg"),
"cache_location": "/var/cache/edb/dep/var/db/pkg",
}
)
repos_conf_defaults, repos_conf = self.load_repos_conf()
self["ebuild-repo-common"] = basics.AutoConfigSection(
{
"class": "pkgcore.ebuild.repository.tree",
"default_mirrors": gentoo_mirrors,
"inherit-only": True,
}
)
repo_map = {}
repos = []
for repo_name, repo_opts in list(repos_conf.items()):
repo_cls = repo_opts.pop("repo-type")
try:
repo = repo_cls(
self,
repo_name=repo_name,
repo_opts=repo_opts,
repo_map=repo_map,
defaults=repos_conf_defaults,
)
except repo_errors.UnsupportedRepo as e:
logger.warning(
f"skipping {repo_name!r} repo: unsupported EAPI {str(e.repo.eapi)!r}"
)
del repos_conf[repo_name]
continue
# only register existent repos
if os.path.exists(repo_opts["location"]):
self[repo_name] = basics.AutoConfigSection(repo)
repos.append(repo_name)
# XXX: Hack for portage-2 profile format support. We need to figure out how
# to dynamically create this from the config at runtime on attr access.
profiles.ProfileNode._repo_map = ImmutableDict(repo_map)
self._make_repo_syncers(repos_conf, make_conf)
if repos:
self["repo-stack"] = basics.DictConfigSection(
my_convert_hybrid,
{
"class": "pkgcore.repository.multiplex.tree",
"repos": tuple(repos),
},
)
self["vuln"] = basics.AutoConfigSection(
{
"class": SecurityUpgradesViaProfile,
"ebuild_repo": "repo-stack",
"vdb": "vdb",
"profile": "profile",
}
)
# check if package building was forced on by the user
forced_buildpkg = kwargs.pop("buildpkg", False)
if forced_buildpkg:
make_conf["FEATURES"] += " buildpkg"
# finally... domain.
make_conf.update(
{
"class": "pkgcore.ebuild.domain.domain",
"repos": tuple(repos),
"default": True,
"vdb": ("vdb",),
"profile": "profile",
"root": self.root,
"config_dir": self.dir,
}
)
self["livefs"] = basics.DictConfigSection(my_convert_hybrid, make_conf)
def __setitem__(self, key, value):
self._config[key] = value
def __getitem__(self, key):
return self._config[key]
def __delitem__(self, key):
del self._config[key]
[docs]
def keys(self):
return iter(self._config.keys())
[docs]
@staticmethod
def load_make_conf(
vars_dict,
path,
allow_sourcing=False,
required=True,
allow_recurse=True,
incrementals=False,
):
"""parse make.conf files
Args:
vars_dict (dict): dictionary to add parsed variables to
path (str): path to the make.conf which can be a regular file or
directory, if a directory is passed all the non-hidden files within
that directory are parsed in alphabetical order.
"""
sourcing_command = "source" if allow_sourcing else None
if allow_recurse:
files = sorted_scan(
os.path.realpath(path),
follow_symlinks=True,
nonexistent=True,
hidden=False,
backup=False,
)
else:
files = (path,)
for fp in files:
try:
new_vars = read_bash_dict(
fp, vars_dict=vars_dict, sourcing_command=sourcing_command
)
except PermissionError as e:
raise base_errors.PermissionDenied(fp, write=False) from e
except EnvironmentError as e:
if e.errno != errno.ENOENT or required:
raise config_errors.ParsingError(
f"parsing {fp!r}", exception=e
) from e
return
if incrementals:
for key in econst.incrementals:
if key in vars_dict and key in new_vars:
new_vars[key] = f"{vars_dict[key]} {new_vars[key]}"
# quirk of read_bash_dict; it returns only what was mutated.
vars_dict.update(new_vars)
[docs]
def load_repos_conf(self) -> tuple[dict, dict]:
"""parse and return repos.conf content, tracing the default and the fallback location"""
try:
repos_conf_defaults, repos_conf = self.parse_repos_conf_path(
pjoin(self.dir, "repos.conf")
)
except config_errors.ParsingError as e:
if not getattr(getattr(e, "exc", None), "errno", None) == errno.ENOENT:
raise
try:
# fallback to defaults provided by pkgcore
repos_conf_defaults, repos_conf = self.parse_repos_conf_path(
pjoin(const.CONFIG_PATH, "repos.conf")
)
except IGNORED_EXCEPTIONS:
raise
except Exception as e:
raise config_errors.ParsingError(
"failed to find a usable repos.conf"
) from e
return repos_conf_defaults, repos_conf
[docs]
@classmethod
def parse_repos_conf_path(cls, path: str):
"""parse repos.conf files from a given entrypoint
Args:
path (str): path to the repos.conf which can be a regular file or
directory, if a directory is passed all the non-hidden files within
that directory are parsed in alphabetical order.
Returns:
dict: global repo settings
dict: repo settings
"""
main_defaults = {}
repos = {}
parser = ParseConfig()
for fp in sorted_scan(
os.path.realpath(path),
follow_symlinks=True,
nonexistent=True,
hidden=False,
backup=False,
):
had_repo_conf = False
try:
with open(fp) as f:
had_repo_conf = True
defaults, repo_confs = parser.parse_file(f)
except PermissionError as e:
raise base_errors.PermissionDenied(fp, write=False) from e
except EnvironmentError as e:
raise config_errors.ParsingError(f"parsing {fp!r}", exception=e) from e
except configparser.Error as e:
raise config_errors.ParsingError(
f"repos.conf: {fp!r}", exception=e
) from e
if defaults and main_defaults:
logger.warning("repos.conf: parsing %r: overriding DEFAULT section", fp)
main_defaults.update(defaults)
if not had_repo_conf and not repo_confs:
logger.warning(
"repos.conf: not found, but should exist for modern support"
)
for name, repo_conf in repo_confs.items():
if name in repos:
logger.warning(
"repos.conf: parsing %r: overriding %r repo", fp, name
)
# ignore repo if location is unset
location = repo_conf.get("location", None)
if location is None:
logger.warning(
"repos.conf: parsing %r: %r repo missing location setting, ignoring repo",
fp,
name,
)
continue
location = os.path.expanduser(location)
if os.path.isabs(location):
repo_conf["location"] = location
else:
# support relative paths based on where repos.conf is located
repo_conf["location"] = os.path.abspath(
pjoin(os.path.dirname(path), location)
)
# repo type defaults to ebuild for compat with portage
repo_type = repo_conf.get("repo-type", "ebuild-v1")
try:
repo_conf["repo-type"] = cls._supported_repo_types[repo_type]
except KeyError:
logger.warning(
f"repos.conf: parsing {fp!r}: "
f"{name!r} repo has unsupported repo-type {repo_type!r}, "
"ignoring repo"
)
continue
# Priority defaults to zero if unset or invalid for ebuild repos
# while binpkg repos have the lowest priority by default.
priority = repo_conf.get("priority", None)
if priority is None:
if repo_type.startswith("binpkg"):
priority = -10000
else:
priority = 0
try:
priority = int(priority)
except ValueError:
logger.warning(
f"repos.conf: parsing {fp!r}: {name!r} repo has invalid priority "
f"setting: {priority!r} (defaulting to 0)"
)
priority = 0
finally:
repo_conf["priority"] = priority
# register repo
repos[name] = repo_conf
if repos:
# the default repo is gentoo if unset and gentoo exists
default_repo = main_defaults.get("main-repo", "gentoo")
if default_repo not in repos:
raise config_errors.UserConfigError(
f"repos.conf: default repo {default_repo!r} is undefined or invalid"
)
if "main-repo" not in main_defaults:
main_defaults["main-repo"] = default_repo
# the default repo has a low priority if unset or zero
if repos[default_repo]["priority"] == 0:
repos[default_repo]["priority"] = -1000
# sort repos via priority, in this case high values map to high priorities
repos = OrderedDict(
(k, v)
for k, v in sorted(
repos.items(), key=lambda d: d[1]["priority"], reverse=True
)
)
return main_defaults, repos
def _make_repo_syncers(self, repos_conf, make_conf, allow_timestamps=True):
"""generate syncing configs for known repos"""
rsync_opts = None
usersync = "usersync" in self.features
for repo_name, repo_opts in repos_conf.items():
d = {"basedir": repo_opts["location"], "usersync": usersync}
sync_type = repo_opts.get("sync-type", None)
sync_uri = repo_opts.get("sync-uri", None)
if sync_uri:
# prefix non-native protocols
if sync_type is not None and not sync_uri.startswith(sync_type):
sync_uri = f"{sync_type}+{sync_uri}"
d["uri"] = sync_uri
d["opts"] = repo_opts.get("sync-opts", "")
if sync_type == "rsync":
if rsync_opts is None:
# various make.conf options used by rsync-based syncers
rsync_opts = self._isolate_rsync_opts(make_conf)
d.update(rsync_opts)
if allow_timestamps:
d["class"] = "pkgcore.sync.rsync.rsync_timestamp_syncer"
else:
d["class"] = "pkgcore.sync.rsync.rsync_syncer"
else:
d["class"] = "pkgcore.sync.base.GenericSyncer"
elif sync_uri is None:
# try to autodetect syncing mechanism if sync-uri is missing
d["class"] = "pkgcore.sync.base.AutodetectSyncer"
else:
# disable syncing if sync-uri is explicitly unset
d["class"] = "pkgcore.sync.base.DisabledSync"
name = "sync:" + repo_name
self[name] = basics.AutoConfigSection(d)
def _add_sets(self):
self["world"] = basics.AutoConfigSection(
{
"class": "pkgcore.pkgsets.filelist.WorldFile",
"location": pjoin(self.root, econst.WORLD_FILE.lstrip("/")),
}
)
self["system"] = basics.AutoConfigSection(
{"class": "pkgcore.pkgsets.system.SystemSet", "profile": "profile"}
)
self["installed"] = basics.AutoConfigSection(
{"class": "pkgcore.pkgsets.installed.Installed", "vdb": "vdb"}
)
self["versioned-installed"] = basics.AutoConfigSection(
{"class": "pkgcore.pkgsets.installed.VersionedInstalled", "vdb": "vdb"}
)
set_fp = pjoin(self.dir, "sets")
try:
for setname in listdir_files(set_fp):
# Potential for name clashes here, those will just make
# the set not show up in config.
if setname in ("system", "world"):
logger.warning(
"user defined set %r is disallowed; ignoring",
pjoin(set_fp, setname),
)
continue
self[setname] = basics.AutoConfigSection(
{
"class": "pkgcore.pkgsets.filelist.FileList",
"location": pjoin(set_fp, setname),
}
)
except FileNotFoundError:
pass
def _find_profile_path(self, profile_override) -> tuple[str, bool]:
if profile_override is None:
make_profile = pjoin(self.dir, "make.profile")
if not os.path.islink(make_profile):
return make_profile, True
path = os.path.realpath(make_profile)
else:
path = os.path.realpath(profile_override)
if not os.path.exists(path):
if profile_override is None:
raise config_errors.UserConfigError(f"broken symlink: {make_profile!r}")
else:
raise config_errors.UserConfigError(
f"nonexistent profile: {profile_override!r}"
)
return path, False
def _add_profile(self, profile_override=None):
profile, was_symlink = self._find_profile_path(profile_override)
if was_symlink:
paths = profile.rsplit(os.path.sep, 1)
else:
paths = profiles.OnDiskProfile.split_abspath(profile)
if paths is None:
raise config_errors.UserConfigError(
f"{pjoin(self.dir, 'make.profile')!r} expands to {profile!r}, but no profile detected"
)
user_profile_path = pjoin(self.dir, "profile")
if os.path.isdir(user_profile_path):
self["profile"] = basics.AutoConfigSection(
{
"class": "pkgcore.ebuild.profiles.UserProfile",
"parent_path": paths[0],
"parent_profile": paths[1],
"user_path": user_profile_path,
"load_profile_base": not was_symlink,
}
)
else:
self["profile"] = basics.AutoConfigSection(
{
"class": "pkgcore.ebuild.profiles.OnDiskProfile",
"basepath": paths[0],
"profile": paths[1],
"load_profile_base": not was_symlink,
}
)
def _isolate_rsync_opts(self, options):
"""
pop the misc RSYNC related options littered in make.conf, returning
a base rsync dict
"""
base = {}
opts = []
extra_opts = []
opts.extend(options.pop("PORTAGE_RSYNC_OPTS", "").split())
extra_opts.extend(options.pop("PORTAGE_RSYNC_EXTRA_OPTS", "").split())
timeout = options.pop("PORTAGE_RSYNC_INITIAL_TIMEOUT", None)
if timeout is not None:
base["conn_timeout"] = timeout
retries = options.pop("PORTAGE_RSYNC_RETRIES", None)
if retries is not None:
try:
retries = int(retries)
if retries < 0:
retries = 10000
base["retries"] = str(retries)
except ValueError:
pass
proxy = options.pop("RSYNC_PROXY", None)
if proxy is not None:
base["proxy"] = proxy.strip()
if opts:
base["opts"] = tuple(opts)
if extra_opts:
base["extra_opts"] = tuple(extra_opts)
return base
def _make_cache(self, cache_format, repo_path):
"""Configure repo cache."""
# Use md5 cache if it exists or the option is selected, otherwise default
# to the old flat hash format in /var/cache/edb/dep/*.
if (
os.path.exists(pjoin(repo_path, "metadata", "md5-cache"))
or cache_format == "md5-dict"
):
kls = "pkgcore.cache.flat_hash.md5_cache"
cache_parent_dir = pjoin(repo_path, "metadata", "md5-cache")
else:
kls = "pkgcore.cache.flat_hash.database"
repo_path = pjoin("/var/cache/edb/dep", repo_path.lstrip("/"))
cache_parent_dir = repo_path
while not os.path.exists(cache_parent_dir):
cache_parent_dir = os.path.dirname(cache_parent_dir)
readonly = not os.access(cache_parent_dir, os.W_OK | os.X_OK)
return basics.AutoConfigSection(
{"class": kls, "location": repo_path, "readonly": readonly}
)
def _register_repo_type(supported_repo_types):
"""Decorator to register supported repo types."""
def _wrap_func(func):
def wrapped(*args, **kwargs):
return func(*args, **kwargs)
name = func.__name__[6:].replace("_", "-")
supported_repo_types[name] = func
return wrapped
return _wrap_func
@_register_repo_type(_supported_repo_types)
def _repo_ebuild_v1(
self, repo_name, repo_opts, repo_map, defaults, repo_obj=None, repo_dict=None
):
"""Create ebuild repo v1 configuration."""
repo_path = repo_opts["location"]
# XXX: Hack for portage-2 profile format support.
if repo_obj is None:
repo_obj = repo_objs.RepoConfig(repo_path, repo_name)
repo_map[repo_obj.repo_id] = repo_path
# repo configs
repo_conf = {
"class": "pkgcore.ebuild.repo_objs.RepoConfig",
"config_name": repo_name,
"location": repo_path,
"syncer": "sync:" + repo_name,
}
if repo_dict is not None:
repo_conf.update(repo_dict)
# repo trees
repo = {
"inherit": ("ebuild-repo-common",),
"repo_config": "conf:" + repo_name,
}
# metadata cache
if repo_obj.cache_format is not None:
cache_name = "cache:" + repo_name
self[cache_name] = self._make_cache(repo_obj.cache_format, repo_path)
repo["cache"] = cache_name
if repo_name == defaults["main-repo"]:
repo_conf["default"] = True
repo["default"] = True
self["conf:" + repo_name] = basics.AutoConfigSection(repo_conf)
return repo
@_register_repo_type(_supported_repo_types)
def _repo_sqfs_v1(self, *args, **kwargs):
"""Create ebuild squashfs repo v1 configuration."""
repo_name = kwargs["repo_name"]
repo_opts = kwargs["repo_opts"]
repo_path = repo_opts["location"]
sqfs_file = os.path.basename(repo_opts["sync-uri"])
# XXX: Hack for portage-2 profile format support.
kwargs["repo_obj"] = repo_objs.SquashfsRepoConfig(
sqfs_file, repo_path, repo_name
)
repo_dict = {
"class": "pkgcore.ebuild.repo_objs.SquashfsRepoConfig",
"sqfs_file": sqfs_file,
}
kwargs["repo_dict"] = repo_dict
return self._repo_ebuild_v1(*args, **kwargs)
@_register_repo_type(_supported_repo_types)
def _repo_binpkg_v1(self, repo_name, repo_opts, **kwargs):
"""Create binpkg repo v1 configuration."""
repo = {
"class": "pkgcore.binpkg.repository.tree",
"repo_id": repo_name,
"location": repo_opts["location"],
}
return repo