Source code for pkgcore.ebuild.ebd

"""
EBuild Daemon (ebd), main high level interface to ebuild execution env.

Wraps :obj:`pkgcore.ebuild.processor` functionality into a higher level
api, for example per phase methods.
"""

__all__ = (
    "ebd",
    "setup_mixin",
    "install_op",
    "uninstall_op",
    "replace_op",
    "buildable",
    "binpkg_localize",
)

import errno
import os
import re
import shutil
import sys
import time
from collections import defaultdict
from functools import partial
from itertools import chain
from tempfile import TemporaryFile

from snakeoil import data_source, klass
from snakeoil.compatibility import IGNORED_EXCEPTIONS
from snakeoil.contexts import chdir
from snakeoil.currying import post_curry, pretty_docs
from snakeoil.fileutils import touch
from snakeoil.osutils import ensure_dirs, listdir_files, normpath, pjoin
from snakeoil.process.spawn import (
    is_sandbox_capable,
    is_userpriv_capable,
    spawn,
    spawn_bash,
)
from snakeoil.sequences import iflatten_instance, iter_stable_unique

from .. import const
from ..log import logger
from ..operations import format, observer
from ..os_data import portage_gid, portage_uid, xargs
from ..package.mutated import MutatedPkg
from . import ebd_ipc, ebuild_built, errors
from .processor import (
    ProcessorError,
    chuck_UnhandledCommand,
    expected_ebuild_env,
    inherit_handler,
    release_ebuild_processor,
    request_ebuild_processor,
)


[docs] class ebd: def __init__( self, pkg, initial_env=None, env_data_source=None, observer=None, clean=True, tmp_offset=None, ): """ :param pkg: :class:`pkgcore.ebuild.ebuild_src.package` instance this env is being setup for :param initial_env: initial environment to use for this ebuild :param env_data_source: a :obj:`snakeoil.data_source.base` instance to restore the environment from- used for restoring the state of an ebuild processing, whether for unmerging, or walking phases during building """ self.pkg = pkg self.eapi = pkg.eapi if not hasattr(self, "observer"): self.observer = observer if not self.eapi.is_supported: raise TypeError(f"package {pkg} uses unsupported EAPI: {str(self.eapi)!r}") if initial_env is not None: # copy. self.env = dict(initial_env) for x in ("USE", "ACCEPT_LICENSE"): self.env.pop(x, None) else: self.env = {} # Drop all USE_EXPAND variables from the exported environment. for u in self.domain.profile.use_expand: self.env.pop(u, None) # Only export USE_EXPAND variables for the package's enabled USE flags. d = defaultdict(list) for u in pkg.use: m = self.domain.use_expand_re.match(u) if m: use_expand, value = m.groups() d[use_expand.upper()].append(value) for k, v in d.items(): self.env[k] = " ".join(sorted(v)) self.bashrc = self.env.pop("bashrc", ()) self.features = set(x.lower() for x in self.domain.features) self.env["FEATURES"] = " ".join(sorted(self.features)) self.set_path_vars(self.env, self.pkg, self.domain) # internally implemented EAPI specific functions to skip when exporting env self.env["PKGCORE_EAPI_FUNCS"] = " ".join(self.eapi.bash_funcs) self.env_data_source = env_data_source if env_data_source is not None and not isinstance( env_data_source, data_source.base ): raise TypeError( "env_data_source must be None, or a pkgcore.data_source.base " f"derivative: {env_data_source.__class__}: {env_data_source}" ) iuse_effective_regex = ( f"^({'|'.join(re.escape(x) for x in pkg.iuse_effective)})$" ) self.env["PKGCORE_IUSE_EFFECTIVE"] = iuse_effective_regex.replace( "\\.\\*", ".*" ) expected_ebuild_env(pkg, self.env, env_source_override=self.env_data_source) self.env["PKGCORE_FINALIZED_RESTRICT"] = " ".join(str(x) for x in pkg.restrict) self.restrict = pkg.restrict for x in ("sandbox", "userpriv"): setattr(self, x, self.feat_or_bool(x) and not (x in self.restrict)) if self.userpriv and os.getuid() != 0: self.userpriv = False if "PORT_LOGDIR" in self.env: self.logging = pjoin( self.env["PORT_LOGDIR"], "%s:%s:%s.log" % ( pkg.cpvstr, self.__class__.__name__, time.strftime("%Y%m%d-%H%M%S", time.localtime()), ), ) del self.env["PORT_LOGDIR"] else: self.logging = False self.env["PKGCORE_PKG_REPO"] = pkg.source_repository self.env["XARGS"] = xargs # wipe variables listed in ENV_UNSET for supporting EAPIs if self.eapi.options.has_env_unset: for x in self.env.pop("ENV_UNSET", ()): self.env.pop(x, None) # wipe any remaining internal settings from the exported env wipes = [k for k, v in self.env.items() if not isinstance(v, str)] for k in wipes: del self.env[k] self._set_op_vars(tmp_offset) self.clean_at_start = clean self.clean_needed = False # various IPC command support self._ipc_helpers = { # bash helpers "doins": ebd_ipc.Doins(self), "dodoc": ebd_ipc.Dodoc(self), "dohtml": ebd_ipc.Dohtml(self), "doinfo": ebd_ipc.Doinfo(self), "dodir": ebd_ipc.Dodir(self), "doexe": ebd_ipc.Doexe(self), "dobin": ebd_ipc.Dobin(self), "dosbin": ebd_ipc.Dosbin(self), "dolib": ebd_ipc.Dolib(self), "dolib.so": ebd_ipc.Dolib_so(self), "dolib.a": ebd_ipc.Dolib_a(self), "doman": ebd_ipc.Doman(self), "domo": ebd_ipc.Domo(self), "dosym": ebd_ipc.Dosym(self), "dohard": ebd_ipc.Dohard(self), "keepdir": ebd_ipc.Keepdir(self), # bash functions "has_version": ebd_ipc.Has_Version(self), "best_version": ebd_ipc.Best_Version(self), "unpack": ebd_ipc.Unpack(self), "eapply": ebd_ipc.Eapply(self), "eapply_user": ebd_ipc.Eapply_User(self), "docompress": ebd_ipc.Docompress(self), "dostrip": ebd_ipc.Dostrip(self), # internals "filter_env": ebd_ipc.FilterEnv(self), }
[docs] def start(self): if self.clean_at_start: self.clean_needed = True if not self.cleanup(): return False self.setup_workdir() self._setup_env_data_source() self.clean_needed = True return True
[docs] @staticmethod def set_path_vars(env, pkg, domain): # XXX: note this is just EAPI 3 and EAPI 7 compatibility; not full prefix, soon.. trailing_slash = pkg.eapi.options.trailing_slash env["ROOT"] = domain.root.rstrip(os.sep) + trailing_slash env["PKGCORE_PREFIX_SUPPORT"] = "false" if pkg.eapi.options.prefix_capable: env["EPREFIX"] = domain.prefix.rstrip(os.sep) env["EROOT"] = ( pjoin(env["ROOT"].rstrip(trailing_slash), env["EPREFIX"]) + trailing_slash ) env["PKGCORE_PREFIX_SUPPORT"] = "true" if pkg.eapi.options.has_sysroot: env["SYSROOT"] = env["ROOT"] env["ESYSROOT"] = pjoin(env["SYSROOT"], env["EPREFIX"]) env["BROOT"] = env["EPREFIX"]
def _set_op_vars(self, tmp_offset): # don't fool with this, without fooling with setup. self.tmpdir = self.domain.pm_tmpdir if tmp_offset: self.tmpdir = pjoin(self.tmpdir, tmp_offset.strip(os.sep)) self.builddir = pjoin(self.tmpdir, self.env["CATEGORY"], self.env["PF"]) for x, y in ( ("T", "temp"), ("WORKDIR", "work"), ("D", "image"), ("HOME", "homedir"), ("PKGCORE_EMPTYDIR", "empty"), ): self.env[x] = normpath(pjoin(self.builddir, y)) self.env["D"] += self.eapi.options.trailing_slash self.env["PORTAGE_LOGFILE"] = normpath(pjoin(self.env["T"], "build.log")) # XXX: Note that this is just EAPI 3 support, not yet prefix # full awareness. if self.pkg.eapi.options.prefix_capable: self.env["ED"] = ( normpath(pjoin(self.env["D"].rstrip(os.sep), self.env["EPREFIX"])) + self.eapi.options.trailing_slash ) # temporary install dir correct for all EAPIs self.ED = self.env.get("ED", self.env["D"])
[docs] def get_env_source(self): with open(pjoin(self.env["T"], "environment"), "rb") as f: return data_source.bytes_data_source(f.read())
def _setup_env_data_source(self): if not ensure_dirs(self.env["T"], mode=0o770, gid=portage_gid, minimal=True): raise format.FailedDirectory( self.env["T"], "%s doesn't fulfill minimum mode %o and gid %i" % (self.env["T"], 0o770, portage_gid), ) if self.env_data_source is not None: fp = pjoin(self.env["T"], "environment") # load data first (might be a local_source), *then* write # if it's a src_ebuild being installed, trying to do two steps # stomps the local_sources data. data = self.env_data_source.bytes_fileobj().read() with open(fp, "wb") as f: f.write(data) del data def _set_per_phase_env(self, phase, env): self._setup_merge_type(phase, env) # add phase specific helper paths to PATH if they exist ebuild_phase = self.eapi.phases.get(phase, "") if ebuild_phase in self.eapi.helpers: path = chain.from_iterable( ( const.PATH_FORCED_PREPEND, self.pkg.eapi.helpers.get("global", ()), self.eapi.helpers[ebuild_phase], os.environ.get("PATH", "").split(os.pathsep), ) ) env["PATH"] = os.pathsep.join(path) def _setup_merge_type(self, phase, env): # only allowed in pkg_ phases. if ( not self.eapi.phases.get(phase, "").startswith("pkg_") and not phase == "setup-binpkg" ): return # note all pkgs have this attribute is_source = getattr(self.pkg, "_is_from_source", True) if self.eapi.options.has_merge_type: env["MERGE_TYPE"] = (is_source and "source") or "binary" else: # we still must export this, just via the portage var name w/ # different values. if we didn't, spec or not, kernel binpkg # merging would be broke. env["EMERGE_FROM"] = (is_source and "ebuild") or "binary"
[docs] def setup_logging(self): if self.logging and not ensure_dirs( os.path.dirname(self.logging), mode=0o2770, gid=portage_gid ): raise format.FailedDirectory( os.path.dirname(self.logging), "PORT_LOGDIR, desired mode 02770 and gid %i" % portage_gid, )
[docs] def setup_workdir(self): # ensure dirs. for k in ("HOME", "T", "WORKDIR", "D"): if not ensure_dirs(self.env[k], mode=0o4770, gid=portage_gid, minimal=True): raise format.FailedDirectory( self.env[k], "%s doesn't fulfill minimum mode %o and gid %i" % (k, 0o770, portage_gid), ) # XXX hack, just 'til pkgcore controls these directories if os.stat(self.env[k]).st_mode & 0o2000: logger.warning(f"{self.env[k]} ( {k} ) is setgid")
def _generic_phase( self, phase, userpriv, sandbox, extra_handlers={}, failure_allowed=False, suppress_bashrc=False, ): """ :param phase: phase to execute :param userpriv: will we drop to :obj:`pkgcore.os_data.portage_uid` and :obj:`pkgcore.os_data.portage_gid` access for this phase? :param sandbox: should this phase be sandboxed? """ if phase not in self.pkg.mandatory_phases: # TODO(ferringb): Note the preinst hack; this will be removed once dyn_pkg_preinst # is dead in full (currently it has a selinux labelling and suidctl ran from there) if phase != "preinst": return True if "selinux" not in self.features and "suidctl" not in self.features: return True shutil.rmtree(self.env["PKGCORE_EMPTYDIR"], ignore_errors=True) os.mkdir(self.env["PKGCORE_EMPTYDIR"]) userpriv = self.userpriv and userpriv sandbox = self.sandbox and sandbox self._set_per_phase_env(phase, self.env) extra_handlers = extra_handlers.copy() extra_handlers.update(self._ipc_helpers) if not suppress_bashrc: extra_handlers.setdefault("request_bashrcs", self._request_bashrcs) return run_generic_phase( self.pkg, phase, self.env, userpriv, sandbox, extra_handlers=extra_handlers, failure_allowed=failure_allowed, logging=self.logging, ) def _request_bashrcs(self, ebd): for source in self.domain.get_package_bashrcs(self.pkg): if source.path is not None: ebd.write(f"path\n{source.path}") elif source.get_data is not None: raise NotImplementedError else: chuck_UnhandledCommand( ebd, "bashrc request: unable to process bashrc " f"due to source '{source}' due to lacking usable get_*", ) if not ebd.expect("next"): chuck_UnhandledCommand( ebd, "bashrc transfer, didn't receive 'next' response. " "failure?" ) ebd.write("end_request")
[docs] def set_is_replacing(self, *pkgs): if self.eapi.options.exports_replacing: self.env["REPLACING_VERSIONS"] = " ".join(pkg.PVR for pkg in pkgs)
[docs] def set_is_being_replaced_by(self, pkg=None): if self.eapi.options.exports_replacing and pkg is not None: self.env["REPLACED_BY_VERSION"] = pkg.PVR
[docs] def cleanup(self, disable_observer=False, force=False): if not force: if not self.clean_needed: return True if not os.path.exists(self.builddir): return True if disable_observer: return self.do_cleanup(disable_observer=disable_observer) return self.do_cleanup()
@observer.decorate_build_method("cleanup") def do_cleanup(self): try: shutil.rmtree(self.builddir) # try to wipe the cat dir; if not empty, ignore it try: os.rmdir(os.path.dirname(self.builddir)) except EnvironmentError as e: # POSIX specifies either ENOTEMPTY or EEXIST for non-empty dir # in particular, Solaris uses EEXIST in that case. # https://github.com/pkgcore/pkgcore/pull/181 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): raise except EnvironmentError as e: raise format.GenericBuildError( f"clean: Caught exception while cleansing: {e}" ) from e return True
[docs] def feat_or_bool(self, name, extra_env=None): if name in self.env: v = bool(self.env[name]) del self.env[name] name = name.lower() if v: self.features.add(name) else: if name in self.features: self.features.remove(name) elif extra_env is not None and name in extra_env: v = bool(extra_env[name]) if v: self.features.add(name.lower()) else: self.features.remove(name.lower()) else: v = name.lower() in self.features return v
def __stage_step_callback__(self, stage): try: touch(pjoin(self.builddir, f".{stage}")) except EnvironmentError: # we really don't care... pass def _reload_state(self): try: self.__set_stage_state__( [x[1:] for x in listdir_files(self.builddir) if x.startswith(".")] ) except EnvironmentError as e: if e.errno not in (errno.ENOTDIR, errno.ENOENT): raise
[docs] class setup_mixin: setup_is_for_src = True
[docs] def setup(self, setup_phase_override=None): self.setup_logging() additional_commands = {} phase_name = "setup-binpkg" if self.setup_is_for_src: phase_name = "setup" if setup_phase_override is not None: phase_name = setup_phase_override if self.setup_is_for_src: additional_commands["request_inherit"] = partial( inherit_handler, self.eclass_cache ) return self._generic_phase( phase_name, False, True, extra_handlers=additional_commands )
def run_generic_phase( pkg, phase, env, userpriv, sandbox, fd_pipes=None, extra_handlers=None, failure_allowed=False, logging=None, **kwargs, ): """ :param phase: phase to execute :param env: environment mapping for the phase :param userpriv: will we drop to :obj:`pkgcore.os_data.portage_uid` and :obj:`pkgcore.os_data.portage_gid` access for this phase? :param sandbox: should this phase be sandboxed? :param fd_pipes: use custom file descriptors for ebd instance :type fd_pipes: mapping between file descriptors :param extra_handlers: extra command handlers :type extra_handlers: mapping from string to callable :param failure_allowed: allow failure without raising error :type failure_allowed: boolean :param logging: None or a filepath to log output to :return: True when the phase has finished execution """ userpriv = userpriv and is_userpriv_capable() sandbox = sandbox and is_sandbox_capable() tmpdir = kwargs.get("tmpdir", env.get("T", None)) if env is None: env = expected_ebuild_env(pkg) ebd = request_ebuild_processor( userpriv=userpriv, sandbox=sandbox, fd_pipes=fd_pipes ) # this is a bit of a hack; used until ebd accepts observers that handle # the output redirection on its own. Primary relevance is when # stdout/stderr are pointed at a file; we leave buffering on, just # force the flush for synchronization. sys.stdout.flush() sys.stderr.flush() try: if not ebd.run_phase( phase, env, tmpdir=tmpdir, sandbox=sandbox, logging=logging, additional_commands=extra_handlers, ): if not failure_allowed: raise format.GenericBuildError( phase + ": Failed building (False/0 return from handler)" ) logger.warning(f"executing phase {phase}: execution failed, ignoring") except Exception as e: if isinstance(e, ebd_ipc.IpcError): # notify bash side of IPC error ebd.write(e.ret) if isinstance(e, ebd_ipc.IpcInternalError): # show main exception cause for internal IPC errors ebd.shutdown_processor(force=True) raise e.__cause__ try: ebd.shutdown_processor() except ProcessorError as pe: # catch die errors during shutdown e = pe release_ebuild_processor(ebd) if isinstance(e, ProcessorError): # force verbose die output e._verbosity = 1 raise e elif isinstance(e, IGNORED_EXCEPTIONS + (format.GenericBuildError,)): raise raise format.GenericBuildError( f"Executing phase {phase}: Caught exception: {e}" ) from e release_ebuild_processor(ebd) return True
[docs] class install_op(ebd, format.install): """Phase operations and steps for install execution.""" def __init__(self, domain, pkg, observer): format.install.__init__(self, domain, pkg, observer) ebd.__init__( self, pkg, observer=observer, initial_env=self.domain.settings, env_data_source=pkg.environment, clean=False, ) preinst = pretty_docs( observer.decorate_build_method("preinst")( post_curry(ebd._generic_phase, "preinst", False, False) ), "run the postinst phase", ) postinst = pretty_docs( observer.decorate_build_method("postinst")( post_curry(ebd._generic_phase, "postinst", False, False) ), "run the postinst phase", )
[docs] def add_triggers(self, domain_op, engine): self.new_pkg.add_format_triggers(domain_op, self, engine)
[docs] class uninstall_op(ebd, format.uninstall): """Phase operations and steps for uninstall execution.""" def __init__(self, domain, pkg, observer): format.uninstall.__init__(self, domain, pkg, observer) ebd.__init__( self, pkg, observer=observer, initial_env=self.domain.settings, env_data_source=pkg.environment, clean=False, tmp_offset="unmerge", ) prerm = pretty_docs( observer.decorate_build_method("prerm")( post_curry(ebd._generic_phase, "prerm", False, False) ), "run the prerm phase", ) postrm = pretty_docs( observer.decorate_build_method("postrm")( post_curry(ebd._generic_phase, "postrm", False, False, failure_allowed=True) ), "run the postrm phase", )
[docs] def add_triggers(self, domain_op, engine): self.old_pkg.add_format_triggers(domain_op, self, engine)
[docs] def finish(self): self.cleanup() return format.uninstall.finish(self)
[docs] class replace_op(format.replace): """Phase operations and steps for replace execution.""" install_kls = staticmethod(install_op) uninstall_kls = staticmethod(uninstall_op) def __init__(self, domain, old_pkg, new_pkg, observer): super().__init__(domain, old_pkg, new_pkg, observer) self.install_op = install_op(domain, new_pkg, observer) self.install_op.set_is_replacing(old_pkg) self.uninstall_op = uninstall_op(domain, old_pkg, observer) self.uninstall_op.set_is_being_replaced_by(new_pkg) def start(self): self.install_op.start() self.uninstall_op.start() return True prerm = klass.alias_method("uninstall_op.prerm") postrm = klass.alias_method("uninstall_op.postrm") preinst = klass.alias_method("install_op.preinst") postinst = klass.alias_method("install_op.postinst") def finalize(self): ret = self.uninstall_op.finish() ret2 = self.install_op.finish() return ret and ret2
[docs] def add_triggers(self, domain_op, engine): self.uninstall_op.add_triggers(domain_op, engine) self.install_op.add_triggers(domain_op, engine)
[docs] class buildable(ebd, setup_mixin, format.build): """Generic build operation.""" # XXX this is unclean- should be handing in strictly what is build # env, rather then dumping domain settings as env. def __init__( self, domain, pkg, verified_files, eclass_cache, observer=None, force_test=False, **kwargs, ): """ :param pkg: :obj:`pkgcore.ebuild.ebuild_src.package` instance we'll be building :param eclass_cache: the :class:`pkgcore.ebuild.eclass_cache` we'll be using :param verified_files: mapping of fetchables mapped to their disk location """ self._built_class = ebuild_built.fresh_built_package format.build.__init__(self, domain, pkg, verified_files, observer) domain_settings = self.domain.settings ebd.__init__(self, pkg, initial_env=domain_settings, **kwargs) self.env["FILESDIR"] = pjoin(os.path.dirname(pkg.ebuild.path), "files") self.eclass_cache = eclass_cache self.run_test = force_test or self.feat_or_bool("test", domain_settings) self.allow_failed_test = self.feat_or_bool( "test-fail-continue", domain_settings ) if "test" in self.restrict: self.run_test = False elif not force_test and "test" not in pkg.use: if self.run_test: logger.warning( f"disabling test for {pkg} due to test use flag being disabled" ) self.run_test = False # XXX minor hack path = self.env["PATH"].split(os.pathsep) for s, default in (("DISTCC", ".distcc"), ("CCACHE", "ccache")): b = self.feat_or_bool(s, domain_settings) and s not in self.restrict setattr(self, s.lower(), b) if b: # looks weird I realize, but # pjoin("/foor/bar", "/barr/foo") == "/barr/foo" # and pjoin("/foo/bar", ".asdf") == "/foo/bar/.asdf" self.env.setdefault(s + "_DIR", pjoin(self.domain.tmpdir, default)) # gentoo bug 355283 libdir = self.env.get("ABI") if libdir is not None: libdir = self.env.get(f"LIBDIR_{libdir}") if libdir is not None: libdir = self.env.get(libdir) if libdir is None: libdir = "lib" path.insert(0, f"/usr/{libdir}/{s.lower()}/bin") else: for y in ("_PATH", "_DIR"): if s + y in self.env: del self.env[s + y] self.env["PATH"] = os.pathsep.join(path) # ordering must match appearance order in SRC_URI per PMS self.env["A"] = " ".join(iter_stable_unique(pkg.distfiles)) if self.eapi.options.has_AA: pkg = self.pkg while hasattr(pkg, "_raw_pkg"): pkg = getattr(pkg, "_raw_pkg") self.env["AA"] = " ".join(set(iflatten_instance(pkg.distfiles))) if self.eapi.options.has_KV: self.env["KV"] = domain.KV if self.eapi.options.has_merge_type: self.env["MERGE_TYPE"] = "source" if self.eapi.options.has_portdir: self.env["PORTDIR"] = pkg.repo.location self.env["ECLASSDIR"] = eclass_cache.eclassdir if self.setup_is_for_src: # TODO: PORTAGE_ACTUAL_DISTDIR usage by VCS eclasses needs to be # dropped, but it's currently required for repo reuse. self.env["PORTAGE_ACTUAL_DISTDIR"] = domain.distdir self.env["DISTDIR"] = normpath(pjoin(self.builddir, "distdir")) for k in ("PORTAGE_ACTUAL_DISTDIR", "DISTDIR"): self.env[k] = os.path.realpath(self.env[k]).rstrip(os.sep) + os.sep def _setup_distfiles(self): # fetch distfiles if not self.verified_files: ops = self.domain.get_pkg_operations(self.pkg, observer=self.observer) if ops.fetch(): # this break encapsulation and should be refactored. Trace # f35f2 and 6561eac for where this was refactored. self.verified_files = ops.verified_files # symlink them into builddir if self.verified_files: try: if os.path.exists(self.env["DISTDIR"]): if os.path.isdir(self.env["DISTDIR"]) and not os.path.islink( self.env["DISTDIR"] ): shutil.rmtree(self.env["DISTDIR"]) else: os.unlink(self.env["DISTDIR"]) except EnvironmentError as e: raise format.FailedDirectory( self.env["DISTDIR"], f"failed removing existing file/dir/link: {e}" ) from e if not ensure_dirs(self.env["DISTDIR"], mode=0o770, gid=portage_gid): raise format.FailedDirectory( self.env["DISTDIR"], "failed creating distdir symlink directory" ) try: for src, dest in [ (k, pjoin(self.env["DISTDIR"], v.filename)) for (k, v) in self.verified_files.items() ]: os.symlink(src, dest) except EnvironmentError as e: raise format.GenericBuildError( f"Failed symlinking in distfiles for src {src} -> {dest}: {e}" ) from e @observer.decorate_build_method("setup") def setup(self): """Execute the setup phase, mapping out to pkg_setup in the ebuild. Necessarily dirs are created as required, and build env is initialized at this point. """ if self.distcc: for p in ("", "/lock", "/state"): if not ensure_dirs( pjoin(self.env["DISTCC_DIR"], p), mode=0o2775, gid=portage_gid ): raise format.FailedDirectory( pjoin(self.env["DISTCC_DIR"], p), "failed creating needed distcc directory", ) if self.ccache: # yuck. st = None try: st = os.stat(self.env["CCACHE_DIR"]) except OSError as e: st = None if not ensure_dirs( self.env["CCACHE_DIR"], mode=0o2775, gid=portage_gid ): raise format.FailedDirectory( self.env["CCACHE_DIR"], "failed creation of ccache dir" ) from e # XXX this is more then mildly stupid. st = os.stat(self.env["CCACHE_DIR"]) try: if st.st_gid != portage_gid or (st.st_mode & 0o2775) != 0o2775: try: cwd = os.getcwd() except OSError: cwd = "/" with chdir(cwd): # crap. os.chmod(self.env["CCACHE_DIR"], 0o2775) os.chown(self.env["CCACHE_DIR"], -1, portage_gid) if 0 != spawn( ["chgrp", "-R", str(portage_gid), self.env["CCACHE_DIR"]] ): raise format.FailedDirectory( self.env["CCACHE_DIR"], "failed changing ownership for CCACHE_DIR", ) if 0 != spawn_bash( "find '%s' -type d -print0 | %s --null chmod 02775" % (self.env["CCACHE_DIR"], xargs) ): raise format.FailedDirectory( self.env["CCACHE_DIR"], "failed correcting perms for CCACHE_DIR", ) if 0 != spawn_bash( "find '%s' -type f -print0 | %s --null chmod 0775" % (self.env["CCACHE_DIR"], xargs) ): raise format.FailedDirectory( self.env["CCACHE_DIR"], "failed correcting perms for CCACHE_DIR", ) except OSError as e: raise format.FailedDirectory( self.env["CCACHE_DIR"], "failed ensuring perms/group owner for CCACHE_DIR", ) from e return setup_mixin.setup(self) def configure(self): """Execute the configure phase. Does nothing if the pkg's EAPI is less than 2 (that spec lacks a separated configure phase). """ if "configure" in self.eapi.phases: return self._generic_phase("configure", True, True) return True def prepare(self): """Execute a source preparation phase. does nothing if the pkg's EAPI is less than 2 """ ret = True if "prepare" in self.eapi.phases: ret = self._generic_phase("prepare", True, True) if self.eapi.options.user_patches and not os.path.exists( pjoin(self.env["T"], ".user_patches_applied") ): self.observer.error( "eapply_user (or default) must be called in src_prepare()" ) raise format.GenericBuildError("missing eapply_user call") return ret
[docs] def nofetch(self): """Execute the nofetch phase. We need the same prerequisites as setup, so reuse that. """ ensure_dirs(self.env["T"], mode=0o770, gid=portage_gid, minimal=True) return setup_mixin.setup(self, "nofetch")
def unpack(self): """Execute the unpack phase.""" if self.setup_is_for_src: self._setup_distfiles() if self.userpriv: try: os.chown(self.env["WORKDIR"], portage_uid, -1) except OSError as e: raise format.GenericBuildError( "failed forcing %i uid for WORKDIR: %s" % (portage_uid, e) ) from e return self._generic_phase("unpack", True, True) compile = pretty_docs( observer.decorate_build_method("compile")( post_curry(ebd._generic_phase, "compile", True, True) ), "Run the compile phase (maps to src_compile).", ) @observer.decorate_build_method("install") def install(self): """Run the install phase (maps to src_install).""" # TODO: replace print() usage with observer print( f">>> Install {self.env['PF']} into {self.ED!r} category {self.env['CATEGORY']}" ) ret = self._generic_phase("install", False, True) print(f">>> Completed installing {self.env['PF']} into {self.ED!r}") return ret @observer.decorate_build_method("test") def test(self): """Run the test phase (if enabled), maps to src_test.""" if not self.run_test: return True return self._generic_phase( "test", True, True, failure_allowed=self.allow_failed_test ) def finalize(self): """Finalize the operation. This yields a built package, but the packages metadata/contents are bound to the workdir. In other words, install the package somewhere prior to executing clean if you intend on installing it. :return: :obj:`pkgcore.ebuild.ebuild_built.package` instance """ factory = ebuild_built.fake_package_factory(self._built_class) return factory.new_package( self.pkg, self.env["D"], pjoin(self.env["T"], "environment") )
[docs] class binpkg_localize(ebd, setup_mixin, format.build): stage_depends = {"finalize": "setup", "setup": "start"} setup_is_for_src = False def __init__(self, domain, pkg, **kwargs): self._built_class = ebuild_built.package format.build.__init__( self, domain, pkg, {}, observer=kwargs.get("observer", None) ) ebd.__init__(self, pkg, **kwargs) if self.eapi.options.has_merge_type: self.env["MERGE_TYPE"] = "binpkg" def finalize(self): return MutatedPkg(self.pkg, {"environment": self.get_env_source()})
class ebuild_operations: _checks = [] def _register_check(checks): """Decorator to register sanity checks that will be run.""" def _wrap_func(func): def wrapped(*args, **kwargs): return func(*args, **kwargs) checks.append(func) return wrapped return _wrap_func def _cmd_implementation_sanity_check(self, domain): """Run all defined sanity checks.""" failures = [] for check in self._checks: if result := check(self, self.pkg, domain=domain): failures.append(result) return failures @_register_check(_checks) def _check_required_use(self, pkg, **kwargs): """Perform REQUIRED_USE verification against a set of USE flags. Note that this assumes the REQUIRED_USE depset has been evaluated against a known set of enabled USE flags and is in collapsed form. """ if pkg.eapi.options.has_required_use: if failures := tuple( node for node in pkg.required_use if not node.match(pkg.use) ): return errors.RequiredUseError(pkg, failures) @_register_check(_checks) def _check_pkg_pretend(self, pkg, *, domain, **kwargs): """Run pkg_pretend phase.""" # pkg_pretend is not defined or required if "pretend" not in pkg.mandatory_phases: return commands = None if not pkg.built: commands = { "request_inherit": partial(inherit_handler, self._eclass_cache), "has_version": ebd_ipc.Has_Version(self), "best_version": ebd_ipc.Best_Version(self), } # Use base build tempdir for $T instead of full pkg specific path to # avoid having to create/remove directories -- pkg_pretend isn't # allowed to write to the filesystem anyway. self.env = expected_ebuild_env(pkg) self.env["T"] = domain.pm_tmpdir ebd.set_path_vars(self.env, pkg, domain) # avoid clipping eend() messages self.env["PKGCORE_RC_PREFIX"] = "2" with TemporaryFile() as f: # suppress bash output by default fd_pipes = {1: f.fileno(), 2: f.fileno()} try: run_generic_phase( pkg, "pretend", self.env, tmpdir=None, fd_pipes=fd_pipes, userpriv=True, sandbox=True, extra_handlers=commands, ) except ProcessorError as e: f.seek(0) output = f.read().decode().strip("\n") return errors.PkgPretendError(pkg, output, e) class src_operations(ebuild_operations, format.build_operations): def __init__(self, domain, pkg, eclass_cache, observer=None): format.build_operations.__init__(self, domain, pkg, observer=observer) self._eclass_cache = eclass_cache def _cmd_implementation_build( self, observer, verified_files, clean=False, force_test=False ): return buildable( self.domain, self.pkg, verified_files, self._eclass_cache, observer=observer, clean=clean, force_test=force_test, ) class misc_operations(ebd): def __init__(self, domain, *args, **kwds): self.domain = domain super().__init__(*args, **kwds) def configure(self, observer=None): return self._generic_phase("config", False, True) def info(self, observer=None): return self._generic_phase("info", True, True) class built_operations(ebuild_operations, format.operations): def __init__(self, domain, pkg, observer=None, initial_env=None): format.operations.__init__(self, domain, pkg, observer=observer) self._initial_env = initial_env self._localized_ebd = None def _cmd_implementation_localize(self, observer, force=False): if not force and getattr(self.pkg, "_is_from_source", False): return self.pkg self._localized_ebd = op = binpkg_localize( self.domain, self.pkg, clean=False, initial_env=self._initial_env, env_data_source=self.pkg.environment, observer=observer, ) return op.finalize() def _cmd_implementation_cleanup(self, observer, force=False): if not self._localized_ebd: return True return self._localized_ebd.cleanup(force=force) def _cmd_check_support_configure(self): pkg = self.pkg if "config" not in pkg.mandatory_phases: return False return True def _cmd_implementation_configure(self, observer): misc = misc_operations( self.domain, self.pkg, env_data_source=self.pkg.environment, clean=True ) try: misc.start() misc.configure() finally: misc.cleanup() return True