Source code for pkgcore.resolver.choice_point

__all__ = ("choice_point",)

from snakeoil import klass
from snakeoil.sequences import iter_stable_unique


[docs] class choice_point: __slots__ = ( "__weakref__", "atom", "matches", "matches_cur", "solution_filters", "_prdeps", "_rdeps", "_deps", "_bdeps", "_ideps", ) def __init__(self, a, matches): self.atom = a self.matches = iter(matches) self.matches_cur = None self.solution_filters = set() # match solutions, remaining self._bdeps = None self._deps = None self._rdeps = None self._prdeps = None @property def state(self): """Return choice point state. :return: A tuple consisting of the number of possible choices, current matches' repo, current package match, all possible matches, cbuild build deps, chost build deps, runtime deps, and post merge deps. """ m = self.matches_cur return ( len(self.solution_filters), m.repo, m, self.matches, self._bdeps, self._deps, self._rdeps, self._prdeps, ) @staticmethod def _filter_choices(cnf_reqs, filterset): for choices in cnf_reqs: l = [x for x in choices if x not in filterset] if not l: return yield l def _internal_force_next(self): """Force next pkg without triggering a reduce_atoms call. :return: True if pkgs remain, False if no more remain """ for self.matches_cur in self.matches: self._reset_iters() return True self.matches_cur = self.matches = None return False
[docs] def reduce_atoms(self, atom): """Alter choice point atom set. :param atom: set of package atoms :type atom: set of :obj:`pkgcore.ebuild.atom.atom` :return: True if no more pkgs remain or atoms were removed, False if no atoms were removed """ if self.matches is None: raise IndexError("no solutions remain") if hasattr(atom, "__contains__") and not isinstance(atom, str): self.solution_filters.update(atom) else: self.solution_filters.add(atom) filterset = self.solution_filters if self.matches_cur is None: if not self._internal_force_next(): return True round = -1 while True: round += 1 if round: if not self._internal_force_next(): return True for depset_name in ("_bdeps", "_deps", "_rdeps", "_prdeps"): depset = getattr(self, depset_name) reqs = list(self._filter_choices(depset, filterset)) if len(reqs) != len(depset): break setattr(self, depset_name, reqs) else: return round > 0
def _reset_iters(self): """ Reset bdepend, depend, rdepend, pdepend and idepend properties to current matches' related attributes. """ cur = self.matches_cur self._bdeps = cur.bdepend.cnf_solutions() self._deps = cur.depend.cnf_solutions() self._rdeps = cur.rdepend.cnf_solutions() self._prdeps = cur.pdepend.cnf_solutions() self._ideps = cur.pdepend.cnf_solutions() slot = klass.alias_attr("current_pkg.slot") key = klass.alias_attr("current_pkg.key") @property def current_pkg(self): """Current selected package.""" if self.matches_cur is None: if self.matches is None: raise IndexError("no packages remain") for self.matches_cur in self.matches: break else: self.matches = None raise IndexError("no more packages remain") self._reset_iters() return self.matches_cur
[docs] def force_next_pkg(self): """Force next package to be selected from available matches.""" if self.matches is None: return False for self.matches_cur in self.matches: break else: self.matches_cur = self.matches = None return False return self.reduce_atoms([])
@property def bdepend(self): """Build time dependencies for CBUILD.""" if not self: raise IndexError("no more solutions remain") return self._bdeps @property def depend(self): """Build time dependencies for CHOST.""" if not self: raise IndexError("no more solutions remain") return self._deps @property def rdepend(self): """Runtime dependencies.""" if not self: raise IndexError("no more solutions remain") return self._rdeps @property def pdepend(self): """Post merge dependencies.""" if not self: raise IndexError("no more solutions remain") return self._prdeps @property def idepend(self): """Install-time dependencies (for CBUILD).""" if not self: raise IndexError("no more solutions remain") return self._ideps def __bool__(self): if self.matches_cur is None: if self.matches is None: return False for self.matches_cur in self.matches: break else: self.matches = None return False self._reset_iters() return True def __str__(self): return "%s: (%s, %s)" % (self.__class__.__name__, self.atom, self.matches_cur)