Source code for pkgcore.restrictions.boolean

"""Boolean combinations of restrictions.

This module provides classes that can be used to combine arbitrary
collections of restrictions in AND, NAND, OR, NOR, XOR, XNOR style

__all__ = ("AndRestriction", "OrRestriction")

from itertools import islice

from snakeoil.klass import cached_hash, generic_equality

from . import restriction

class base(restriction.base, metaclass=generic_equality):
    """base template for boolean restrictions"""

    __attr_comparison__ = ("negate", "type", "restrictions")
    __slots__ = ("restrictions", "type", "negate", "_hash")

    _evaluate_collapsible = False
    _evaluate_wipe_empty = True

    def __hash__(self):
        if not isinstance(self.restrictions, tuple):
            raise TypeError(f"{self!r} isn't finalized")
        return hash(tuple(getattr(self, x) for x in self.__attr_comparison__))

    def __init__(self, *restrictions, **kwds):
        :keyword node_type: type of restriction this accepts
            (:obj:`pkgcore.restrictions.restriction.package_type` and
            :obj:`pkgcore.restrictions.restriction.value_type` being
            common types).  If set to C{None}, no instance limiting is done.
        :type restrictions: node_type (if that is specified)
        :param restrictions: initial restrictions to add
        :keyword finalize: should this instance be made immutable immediately?
            defaults to True
        :keyword negate: should the logic be negated?

        sf = object.__setattr__

        node_type = kwds.pop("node_type", None)

        sf(self, "type", node_type)
        sf(self, "negate", kwds.pop("negate", False))

        if node_type is not None:
                for r in restrictions:
                    if r.type is not None and r.type != node_type:
                        raise TypeError(
                            f"instance '{r}' is restriction type '{r.type}', must be '{node_type}'"
            except AttributeError:
                raise TypeError(
                    f"type '{r.__class__}' instance '{r}' has no restriction type, '{node_type}' required"

        if kwds.pop("finalize", True):
            if not isinstance(restrictions, tuple):
                sf(self, "restrictions", tuple(restrictions))
                sf(self, "restrictions", restrictions)
            sf(self, "restrictions", list(restrictions))

        if kwds:
            kwds.pop("disable_inst_caching", None)
            if kwds:
                raise TypeError(f"unknown keywords to {self.__class__}: {kwds}")

    def change_restrictions(self, *restrictions, **kwds):
        """return a new instance of self.__class__, using supplied restrictions"""
        if self.type is not None:
            if (
                self.__class__.type not in restriction.valid_types
                or self.__class__.type != self.type
                kwds["node_type"] = self.type
        kwds.setdefault("negate", self.negate)
        return self.__class__(*restrictions, **kwds)

    def remove_restriction(self, restriction_types=(), *restrictions):
        """return a new instance of self.__class__, dropping supplied restrictions or types"""
        new_restrictions = tuple(
            for r in self.restrictions
            if not isinstance(r, tuple(restriction_types)) and r not in restrictions
        if new_restrictions != self.restrictions:
            return self.change_restrictions(*new_restrictions)
        return self

    def add_restriction(self, *new_restrictions):
        """add more restriction(s)

        :param new_restrictions: if node_type is enforced,
            restrictions must be of that type.
        if not new_restrictions:
            raise TypeError("need at least one restriction handed in")
        if self.type is not None:
                for r in new_restrictions:
                    if r.type is not None and r.type != self.type:
                        raise TypeError(
                            "instance '%s' is restriction type '%s', "
                            "must be '%s'" % (r, r.type, self.type)
            except AttributeError:
                raise TypeError(
                    "type '%s' instance '%s' has no restriction type, "
                    "'%s' required" % (r.__class__, r, getattr(self, "type", "unset"))

        except AttributeError:
            raise TypeError("%r is finalized" % self)

    def finalize(self):
        """finalize the restriction instance, disallowing adding restrictions."""
        object.__setattr__(self, "restrictions", tuple(self.restrictions))

    def __repr__(self):
        return "<%s negate=%r type=%r finalized=%r restrictions=%r @%#8x>" % (
            getattr(self, "type", None),
            isinstance(self.restrictions, tuple),

    def __len__(self):
        return len(self.restrictions)

    def __iter__(self):
        return iter(self.restrictions)

    def match(self, action, *vals):
        raise NotImplementedError

    force_False, force_True = match, match

    def dnf_solutions(self, full_solution_expansion=False):
        raise NotImplementedError()

    cnf_solutions = dnf_solutions

    def iter_cnf_solutions(self, *a, **kwds):
        """iterate over the cnf solution"""
        return iter(self.cnf_solutions(*a, **kwds))

    def iter_dnf_solutions(self, *a, **kwds):
        """iterate over the dnf solution"""
        return iter(self.dnf_solutions(*a, **kwds))

    def __getitem__(self, key):
        return self.restrictions[key]

    def evaluate_conditionals(
        l = []
        for restrict in self:
            f = getattr(restrict, "evaluate_conditionals", None)
            if f is None:
                f(self.__class__, l, enabled, tristate_locked)

        if not self._evaluate_wipe_empty or l:
            if force_collapse or (
                (issubclass(parent_cls, self.__class__) and self._evaluate_collapsible)
                or len(l) <= 1

# this beast, handles N^2 permutations.  convert to stack based.
def iterative_quad_toggling(
    if desired_false is None:
        desired_false = lambda r, a: r.force_False(*a)
    if desired_true is None:
        desired_true = lambda r, a: r.force_True(*a)

    reset = True
    if starting == 0:
        if filter_func(truths):
            yield True
    for index, rest in islice(enumerate(restrictions), starting, end):
        if reset:
            entry = pkg.changes_count()
        reset = False
        if truths[index]:
            if desired_false(rest, pvals):
                reset = True
                t = truths[:]
                t[index] = False
                if filter_func(t):
                    yield True
                for i in iterative_quad_toggling(
                    index + 1,
                    yield True
                reset = True
                if kill_switch is not None and kill_switch(truths, index):
            if desired_true(rest, pvals):
                reset = True
                t = truths[:]
                t[index] = True
                if filter_func(t):
                    yield True
                for x in iterative_quad_toggling(
                    index + 1,
                    yield True
                reset = True
            elif index == end:
                if filter_func(truths):
                    yield True
                if kill_switch is not None and kill_switch(truths, index):

        if reset:

[docs] class AndRestriction(base): """Boolean AND grouping of restrictions. negation is a NAND""" __slots__ = () _evaluate_collapsible = True
[docs] def match(self, vals): for rest in self.restrictions: if not rest.match(vals): return self.negate return not self.negate
[docs] def force_True(self, pkg, *vals): pvals = [pkg] pvals.extend(vals) entry_point = pkg.changes_count() # get the simple one out of the way first. if not self.negate: for r in self.restrictions: if not r.force_True(*pvals): pkg.rollback(entry_point) return False return True # <insert page long curse here>, NAND logic, # len(restrictions)**2 potential solutions. # 0|0 == 0, 0|1 == 1|0 == 0|0 == 1. # XXX this is quadratic. patches welcome to dodge the # requirement to push through all potential truths. truths = [r.match(*pvals) for r in self.restrictions] def filter_func(truths): return False in truths for i in iterative_quad_toggling( pkg, pvals, self.restrictions, 0, len(self.restrictions), truths, filter_func, ): return True return False
[docs] def force_False(self, pkg, *vals): pvals = [pkg] pvals.extend(vals) entry_point = pkg.changes_count() # get the simple one out of the way first. if self.negate: for r in self.restrictions: if not r.force_True(*pvals): pkg.rollback(entry_point) return False return True # <insert page long curse here>, NAND logic, # (len(restrictions)^2)-1 potential solutions. # 1|1 == 0, 0|1 == 1|0 == 0|0 == 1. # XXX this is quadratic. patches welcome to dodge the # requirement to push through all potential truths. truths = [r.match(*pvals) for r in self.restrictions] def filter_func(truths): return False in truths for i in iterative_quad_toggling( pkg, pvals, self.restrictions, 0, len(self.restrictions), truths, filter_func, ): return True return False
[docs] def iter_dnf_solutions(self, full_solution_expansion=False): """generater yielding DNF (disjunctive normalized form) of this instance. :param full_solution_expansion: controls whether to expand everything (break apart atoms for example); this isn't likely what you want """ if self.negate: # raise NotImplementedError("negation for dnf_solutions on " # "AndRestriction isn't implemented yet") # hack- this is an experiment for r in OrRestriction( node_type=self.type, *[restriction.Negate(x) for x in self.restrictions] ).iter_dnf_solutions(): yield r return if not self.restrictions: yield [] return hardreqs = [] optionals = [] for x in self.restrictions: method = getattr(x, "dnf_solutions", None) if method is None: hardreqs.append(x) else: s2 = method(full_solution_expansion) assert s2 if len(s2) == 1: hardreqs.extend(s2[0]) else: optionals.append(s2) def f(arg, *others): if others: for node in arg: for node2 in f(*others): yield node + node2 else: for node in arg: yield node for solution in f([hardreqs], *optionals): assert isinstance(solution, (tuple, list)) yield solution
[docs] def dnf_solutions(self, *args, **kwds): """list form of :obj:`iter_dnf_solutions`, see iter_dnf_solutions for args""" return list(self.iter_dnf_solutions(*args, **kwds))
[docs] def iter_cnf_solutions(self, full_solution_expansion=False): """returns solutions in CNF (conjunctive normalized form) of this instance :param full_solution_expansion: controls whether to expand everything (break apart atoms for example); this isn't likely what you want """ if self.negate: raise NotImplementedError( "negation for solutions on " "AndRestriction isn't implemented yet" ) for x in self.restrictions: method = getattr(x, "iter_cnf_solutions", None) if method is None: yield [x] else: for y in method(full_solution_expansion): yield y
[docs] def cnf_solutions(self, full_solution_expansion=False): """returns solutions in CNF (conjunctive normalized form) of this instance :param full_solution_expansion: controls whether to expand everything (break apart atoms for example); this isn't likely what you want """ if self.negate: raise NotImplementedError( "negation for solutions on " "AndRestriction isn't implemented yet" ) andreqs = [] for x in self.restrictions: method = getattr(x, "iter_cnf_solutions", None) if method is None: andreqs.append([x]) else: andreqs.extend(method(full_solution_expansion)) return andreqs
def __str__(self): restricts_str = " && ".join(map(str, self.restrictions)) negate = "not " if self.negate else "" return f"{negate}( {restricts_str} )"
[docs] class OrRestriction(base): """Boolean OR grouping of restrictions.""" __slots__ = () _evaluate_collapsible = True
[docs] def match(self, vals): for rest in self.restrictions: if rest.match(vals): return not self.negate return self.negate
[docs] def cnf_solutions(self, full_solution_expansion=False): """Returns a list in CNF (conjunctive normalized form) of this instance. :param full_solution_expansion: controls whether to expand everything (break apart atoms for example); this isn't likely what you want """ if self.negate: raise NotImplementedError( " doesn't yet support self.negate" ) if not self.restrictions: return [] dcnf = [] cnf = [] for x in self.restrictions: method = getattr(x, "dnf_solutions", None) if method is None: dcnf.append(x) else: s2 = method(full_solution_expansion) if len(s2) == 1: cnf.extend(s2) else: for y in s2: if len(y) == 1: dcnf.append(y[0]) else: cnf.append(y) # combinatorial explosion. if it's got cnf, we peel off one of # each and smash append to the dcnf. dcnf = [dcnf] for andreq in cnf: dcnf = list(y + [x] for x in andreq for y in dcnf) return dcnf
[docs] def iter_dnf_solutions(self, full_solution_expansion=False): """Returns a list in DNF (disjunctive normalized form) of this instance. :param full_solution_expansion: controls whether to expand everything (break apart atoms for example); this isn't likely what you want """ if self.negate: # hack- this is an experiment for x in AndRestriction( node_type=self.type, *[restriction.Negate(x) for x in self.restrictions] ).iter_dnf_solutions(): yield x if not self.restrictions: yield [] return for x in self.restrictions: method = getattr(x, "iter_dnf_solutions", None) if method is None: yield [x] else: for y in method(full_solution_expansion): yield y
[docs] def dnf_solutions(self, *args, **kwds): """see dnf_solutions, iterates yielding DNF solutions""" return list(self.iter_dnf_solutions(*args, **kwds))
[docs] def force_True(self, pkg, *vals): pvals = [pkg] pvals.extend(vals) entry_point = pkg.changes_count() # get the simple one out of the way first. if self.negate: for r in self.restrictions: if not r.force_False(*pvals): pkg.rollback(entry_point) return False return True # <insert page long curse here>, OR logic, # len(restrictions)**2-1 potential solutions. # 0|0 == 0, 0|1 == 1|0 == 1|1 == 1. # XXX this is quadratic. patches welcome to dodge the # requirement to push through all potential truths. truths = [r.match(*pvals) for r in self.restrictions] def filter_func(truths): return True in truths for i in iterative_quad_toggling( pkg, pvals, self.restrictions, 0, len(self.restrictions), truths, filter_func, ): return True return False
[docs] def force_False(self, pkg, *vals): pvals = [pkg] pvals.extend(vals) entry_point = pkg.changes_count() # get the simple one out of the way first. if not self.negate: for r in self.restrictions: if not r.force_False(*pvals): pkg.rollback(entry_point) return yield True return # <insert page long curse here>, OR logic, # (len(restrictions)**2)-1 potential solutions. # 0|0 == 0, 0|1 == 1|0 == 1|1 == 1. # XXX this is quadratic. patches welcome to dodge the # requirement to push through all potential truths. truths = [r.match(*pvals) for r in self.restrictions] def filter_func(truths): return True in truths for i in iterative_quad_toggling( pkg, pvals, self.restrictions, 0, len(self.restrictions), truths, filter_func, ): yield True
def __str__(self): restricts_str = " || ".join(map(str, self.restrictions)) negate = "not " if self.negate else "" return f"{negate}( {restricts_str} )"
class JustOneRestriction(base): """Exactly one must match, or there must be no restrictions""" __slots__ = () _evaluate_collapsable = True _evaluate_wipe_empty = False def match(self, vals): if not self.restrictions: return not self.negate armed = False for node in self.restrictions: if node.match(vals): # two matches found? if armed: return self.negate armed = True if armed: return not self.negate return self.negate def __str__(self): restricts_str = " ".join(map(str, self.restrictions)) negate = "not " if self.negate else "" return f"{negate}exactly-one-of ( {restricts_str} )" class AtMostOneOfRestriction(base): """Either none, or exactly one match must occur.""" __slots__ = () _evaluate_collapsable = True _evaluate_wipe_empty = False def match(self, vals): armed = False for restrict in self.restrictions: if not restrict.match(vals): continue if armed: return self.negate armed = True return not self.negate def __str__(self): restricts_str = " ".join(map(str, self.restrictions)) negate = "not " if self.negate else "" return f"{negate}at-most-one-of ( {restricts_str} )"