Source code for pkgcore.merge.engine

"""
core engine for livefs modifications
"""

__all__ = ("alias_cset", "map_new_cset_livefs", "MergeEngine")

# need better documentation...

# pre merge triggers
# post merge triggers
# ordering?

import io
import operator
import tempfile
import traceback
from functools import partial
from itertools import chain
from multiprocessing import cpu_count

from snakeoil import data_source
from snakeoil.compatibility import IGNORED_EXCEPTIONS
from snakeoil.currying import post_curry
from snakeoil.fileutils import touch
from snakeoil.mappings import ImmutableDict, LazyValDict, StackedDict
from snakeoil.osutils import normpath

from ..fs import contents, livefs
from ..operations import observer as observer_mod
from . import errors
from .const import INSTALL_MODE, REPLACE_MODE, UNINSTALL_MODE
from .triggers import default_plugins_triggers


[docs] def alias_cset(alias, engine, csets): """alias a cset to another""" return csets[alias]
[docs] def map_new_cset_livefs(engine, csets, cset_name="new_cset"): """Find symlinks on disk that redirect new_cset, and return a livefs localized cset.""" initial = csets[cset_name] ondisk = contents.contentsSet(livefs.intersect(initial.iterdirs(), realpath=False)) livefs.recursively_fill_syms(ondisk) ret = initial.map_directory_structure(ondisk, add_conflicting_sym=True) return ret
[docs] class MergeEngine: install_hooks = { x: [] for x in ("sanity_check", "pre_merge", "merge", "post_merge", "final") } uninstall_hooks = { x: [] for x in ("sanity_check", "pre_unmerge", "unmerge", "post_unmerge", "final") } replace_hooks = { x: [] for x in set(chain(install_hooks.keys(), uninstall_hooks.keys())) } install_csets = { "install_existing": "get_install_livefs_intersect", "resolved_install": map_new_cset_livefs, "new_cset": partial(alias_cset, "raw_new_cset"), "install": partial(alias_cset, "new_cset"), "replace": partial(alias_cset, "new_cset"), } uninstall_csets = { "uninstall_existing": partial(alias_cset, "uninstall"), "uninstall": partial(alias_cset, "old_cset"), "old_cset": "get_uninstall_livefs_intersect", } replace_csets = install_csets.copy() replace_csets.update(uninstall_csets) replace_csets["modifying"] = lambda e, c: c["resolved_install"].intersection( c["uninstall"] ) replace_csets["uninstall"] = "get_remove_cset" replace_csets["replace"] = "get_replace_cset" replace_csets["install_existing"] = "get_install_livefs_intersect" install_csets_preserve = ["new_cset"] uninstall_csets_preserve = ["old_cset"] replace_csets_preserve = ["new_cset", "old_cset"] allow_reuse = True def __init__( self, mode, tempdir, hooks, csets, preserves, observer, offset=None, disable_plugins=False, parallelism=None, ): if observer is None: observer = observer_mod.repo_observer(observer_mod.null_output) self.observer = observer self.mode = mode if tempdir is not None: tempdir = normpath(tempdir) + "/" self.tempdir = tempdir self.parallelism = parallelism if parallelism is not None else cpu_count() self.hooks = ImmutableDict((x, []) for x in hooks) self.preserve_csets = [] self.cset_sources = {} # instantiate these separately so their values are preserved self.preserved_csets = LazyValDict(self.preserve_csets, self._get_cset_source) for k, v in csets.items(): if isinstance(v, str): v = getattr(self, v, v) if not callable(v): raise TypeError( "cset values must be either the string name of " f"existing methods, or callables (got {v})" ) if k in preserves: self.add_preserved_cset(k, v) else: self.add_cset(k, v) if offset is None: offset = "/" self.offset = offset if not disable_plugins: # merge in default triggers first. for trigger in default_plugins_triggers(): trigger().register(self) # merge in overrides for hook, triggers in hooks.items(): for trigger in triggers: self.add_trigger(hook, trigger) self.regenerate_csets() for x in hooks: setattr(self, x, partial(self.execute_hook, x))
[docs] @classmethod def install(cls, tempdir, pkg, offset=None, observer=None, disable_plugins=False): """Generate a MergeEngine instance configured for installing a pkg. :param tempdir: tempspace for the merger to use; this space it must control alone, no sharing. :param pkg: :obj:`pkgcore.package.metadata.package` instance to install :param offset: any livefs offset to force for modifications :param disable_plugins: if enabled, run just the triggers passed in :return: :obj:`MergeEngine` """ hooks = {k: [y() for y in v] for (k, v) in cls.install_hooks.items()} csets = cls.install_csets.copy() if "raw_new_cset" not in csets: csets["raw_new_cset"] = post_curry(cls.get_pkg_contents, pkg) o = cls( INSTALL_MODE, tempdir, hooks, csets, cls.install_csets_preserve, observer, offset=offset, disable_plugins=disable_plugins, ) if o.offset != "/": # wrap the results of new_cset to pass through an offset generator o.cset_sources["raw_new_cset"] = post_curry( o.generate_offset_cset, o.cset_sources["raw_new_cset"] ) o.new = pkg return o
[docs] @classmethod def uninstall(cls, tempdir, pkg, offset=None, observer=None, disable_plugins=False): """Generate a MergeEngine instance configured for uninstalling a pkg. :param tempdir: tempspace for the merger to use; this space it must control alone, no sharing. :param pkg: :obj:`pkgcore.package.metadata.package` instance to uninstall, must be from a livefs vdb :param offset: any livefs offset to force for modifications :param disable_plugins: if enabled, run just the triggers passed in :return: :obj:`MergeEngine` """ hooks = {k: [y() for y in v] for (k, v) in cls.uninstall_hooks.items()} csets = cls.uninstall_csets.copy() if "raw_old_cset" not in csets: csets["raw_old_cset"] = post_curry(cls.get_pkg_contents, pkg) o = cls( UNINSTALL_MODE, tempdir, hooks, csets, cls.uninstall_csets_preserve, observer, offset=offset, disable_plugins=disable_plugins, ) if o.offset != "/": # wrap the results of new_cset to pass through an offset generator o.cset_sources["old_cset"] = post_curry( o.generate_offset_cset, o.cset_sources["old_cset"] ) o.old = pkg return o
[docs] @classmethod def replace( cls, tempdir, old, new, offset=None, observer=None, disable_plugins=False ): """Generate a MergeEngine instance configured for replacing a pkg. :param tempdir: tempspace for the merger to use; this space it must control alone, no sharing. :param old: :obj:`pkgcore.package.metadata.package` instance to replace, must be from a livefs vdb :param new: :obj:`pkgcore.package.metadata.package` instance :param offset: any livefs offset to force for modifications :param disable_plugins: if enabled, run just the triggers passed in :return: :obj:`MergeEngine` """ hooks = {k: [y() for y in v] for (k, v) in cls.replace_hooks.items()} csets = cls.replace_csets.copy() csets.setdefault("raw_old_cset", post_curry(cls.get_pkg_contents, old)) csets.setdefault("raw_new_cset", post_curry(cls.get_pkg_contents, new)) o = cls( REPLACE_MODE, tempdir, hooks, csets, cls.replace_csets_preserve, observer, offset=offset, disable_plugins=disable_plugins, ) if o.offset != "/": for k in ("raw_old_cset", "raw_new_cset"): # wrap the results of new_cset to pass through an # offset generator o.cset_sources[k] = post_curry( o.generate_offset_cset, o.cset_sources[k] ) o.old = old o.new = new return o
[docs] def replace_cset(self, name, new_cset): """Replace the cset referenced by this engine. Use only if you know what you're doing. :param name: name of the cset :new_cset: a contentsSet instance to use """ if name in self.preserved_csets: # yes this is evil awareness of LazyValDict internals... self.preserved_csets._vals[name] = new_cset else: raise KeyError(f"attempted to replace a non preserved cset: {name}")
[docs] def regenerate_csets(self): """Internal function, reset non preserverd csets. Used in transitioning between hook points """ self.csets = StackedDict( self.preserved_csets, LazyValDict(self.cset_sources, self._get_cset_source) )
def _get_cset_source(self, key): return self.cset_sources[key](self, self.csets)
[docs] def add_preserved_cset(self, cset_name, func): """Register a cset generator for use. The cset will stay in memory until the engine finishes all steps. :param cset_name: what to call the generated cset :param func: callable to get the cset """ self.add_cset(cset_name, func) self.preserve_csets.append(cset_name)
[docs] def add_cset(self, cset_name, func): """Regiser a cset generator for use. The cset will be released from memory when it's no longer used. :param cset_name: what to call the generated cset :param func: callable to get the cset """ if not callable(func): raise TypeError("func must be a callable") if not isinstance(cset_name, str): raise TypeError("cset_name must be a string") self.cset_sources[cset_name] = func
[docs] def add_trigger(self, hook_name, trigger, required_csets): """Register a :obj:`pkgcore.merge.triggers.base` instance to be executed. :param hook_name: engine step to hook the trigger into :param trigger: :class:`pkgcore.merge.triggers.base` to add """ if hook_name not in self.hooks: raise KeyError(f"trigger {trigger!r}'s hook {hook_name} isn't a known hook") if required_csets is not None: for rcs in required_csets: if rcs not in self.cset_sources: if isinstance(rcs, str): raise errors.TriggerUnknownCset(trigger, rcs) self.hooks[hook_name].append(trigger)
[docs] def execute_hook(self, hook): """Execute any triggers bound to a hook point.""" try: self.phase = hook self.regenerate_csets() for trigger in sorted( self.hooks[hook], key=operator.attrgetter("priority") ): # error checking needed here. self.observer.trigger_start(hook, trigger) try: try: trigger(self, self.csets) except IGNORED_EXCEPTIONS: raise except errors.BlockModification as e: self.observer.error( f"modification was blocked by trigger {trigger!r}: {e}" ) raise except errors.ModificationError as e: self.observer.error( f"modification error occurred during trigger {trigger!r}: {e}" ) raise except Exception as e: if not trigger.suppress_exceptions: raise handle = io.StringIO() traceback.print_exc(file=handle) self.observer.warn( "unhandled exception caught and " f"suppressed:\n{handle.getvalue()}" ) finally: self.observer.trigger_end(hook, trigger) finally: self.phase = None
[docs] @staticmethod def generate_offset_cset(engine, csets, cset_generator): """Generate a cset with offset applied.""" return cset_generator(engine, csets).insert_offset(engine.offset)
[docs] @staticmethod def get_pkg_contents(engine, csets, pkg): """Generate the cset of what files shall be merged to the livefs.""" return pkg.contents.clone()
[docs] @staticmethod def get_remove_cset(engine, csets): """Generate the cset of what files shall be removed from the livefs.""" return csets["old_cset"].difference(csets["install"])
[docs] @staticmethod def get_replace_cset(engine, csets): """Return the cset of what will be replaced going from old->new pkg.""" return csets["install"].intersection(csets["old_cset"])
@staticmethod def _get_livefs_intersect_cset(engine, csets, cset_name, realpath=False): """Generate the livefs intersection against a cset.""" return contents.contentsSet( livefs.intersect(csets[cset_name], realpath=realpath) )
[docs] @staticmethod def get_install_livefs_intersect(engine, csets): return engine._get_livefs_intersect_cset(engine, csets, "install")
[docs] @staticmethod def get_uninstall_livefs_intersect(engine, csets): return engine._get_livefs_intersect_cset(engine, csets, "raw_old_cset")
alias_cset = staticmethod(alias_cset)
[docs] def get_merged_cset(self, strip_offset=True): cset = self.csets["install"] if self.offset not in (None, "/") and strip_offset: rewrite = contents.change_offset_rewriter(self.offset, "/", cset) cset = contents.contentsSet(rewrite) return cset
[docs] def get_writable_fsobj(self, fsobj, prefer_reuse=True, empty=False): path = source = None if fsobj: source = fsobj.data if source.mutable: return fsobj if self.allow_reuse and prefer_reuse: path = source.path # XXX: this should be doing abspath fs intersection probably, # although the paths generated are from triggers/engine- still. if path is not None and not path.startswith(self.tempdir): # the fsobj pathway isn't in temp space; force a transfer. path = None if path: # ok, it's tempspace, and reusable. obj = data_source.local_source(path, True, encoding=source.encoding) if empty: obj.bytes_fileobj(True).truncate(0) return obj # clone it into tempspace; it's required we control the tempspace, # so this function is safe in our usage. fd, path = tempfile.mkstemp(prefix="merge-engine-", dir=self.tempdir) # XXX: annoying quirk of python, we don't want append mode, so 'a+' # isn't viable; wr will truncate the file, so data_source uses r+. # this however doesn't allow us to state "create if missing" # so we create it ourselves. Annoying, but so it goes. # just touch the filepath. touch(path) new_source = data_source.local_source( path, True, encoding=getattr(fsobj, "encoding", None) ) if source and not empty: data_source.transfer(source.bytes_fsobj(), new_source.bytes_fsobj(True)) return new_source