"""DepSet parsing.
Turns a DepSet (depend, rdepend, SRC_URI, license, etc) into
appropriate conditionals.
"""
__all__ = ("DepSet", "stringify_boolean")
import typing
from snakeoil.compatibility import IGNORED_EXCEPTIONS
from snakeoil.iterables import expandable_chain
from snakeoil.sequences import iflatten_instance
from ..restrictions import boolean, packages, restriction, values
from .atom import atom, transitive_use_atom
from .errors import DepsetParseError
[docs]
class DepSet(boolean.AndRestriction):
"""Gentoo DepSet syntax parser"""
__slots__ = ("element_class", "_node_conds", "_known_conditionals")
_evaluate_collapse = True
# do not enable instance caching w/out adjust evaluate_depset!
__inst_caching__ = False
def __init__(
self,
restrictions="",
element_class=atom,
node_conds=True,
known_conditionals=None,
):
sf = object.__setattr__
sf(self, "_known_conditionals", known_conditionals)
sf(self, "element_class", element_class)
sf(self, "restrictions", restrictions)
sf(self, "_node_conds", node_conds)
sf(self, "type", restriction.package_type)
sf(self, "negate", False)
[docs]
@classmethod
def parse(
cls,
dep_str,
element_class,
operators=None,
attr=None,
element_func=None,
transitive_use_atoms=False,
allow_src_uri_file_renames=False,
):
"""
:param dep_str: string abiding by DepSet syntax
:param operators: mapping of node -> callable for special operators
in DepSet syntax
:param element_func: if None, element_class is used for generating
elements, else it's used to generate elements.
Mainly useful for when you need to curry a few args for instance
generation, since element_class _must_ be a class
:param element_class: class of generated elements
:param attr: name of the DepSet attribute being parsed
"""
if element_func is None:
element_func = element_class
restrictions = []
if operators is None:
operators = {"||": boolean.OrRestriction, "": boolean.AndRestriction}
raw_conditionals = []
depsets = [restrictions]
node_conds = False
words = iter(dep_str.split())
# we specifically do it this way since expandable_chain has a bit of nasty
# overhead to the tune of 33% slower
if allow_src_uri_file_renames:
words = expandable_chain(words)
k = None
try:
for k in words:
if ")" == k:
# no elements == error. if closures don't map up,
# indexerror would be chucked from trying to pop
# the frame so that is addressed.
if not depsets[-1] or not raw_conditionals:
raise DepsetParseError(dep_str, attr=attr)
elif raw_conditionals[-1] in operators:
if len(depsets[-1]) == 1:
depsets[-2].append(depsets[-1][0])
else:
depsets[-2].append(
operators[raw_conditionals[-1]](*depsets[-1])
)
else:
node_conds = True
c = raw_conditionals[-1]
if c[0] == "!":
c = values.ContainmentMatch(c[1:-1], negate=True)
else:
c = values.ContainmentMatch(c[:-1])
depsets[-2].append(
packages.Conditional("use", c, tuple(depsets[-1]))
)
raw_conditionals.pop()
depsets.pop()
elif "(" == k:
k = ""
# push another frame on
depsets.append([])
raw_conditionals.append(k)
elif k[-1] == "?" or k in operators:
# use conditional or custom op.
# no tokens left == bad dep_str.
k2 = next(words)
if k2 != "(":
raise DepsetParseError(dep_str, k2, attr=attr)
# push another frame on
depsets.append([])
raw_conditionals.append(k)
elif "|" in k:
raise DepsetParseError(dep_str, k, attr=attr)
elif allow_src_uri_file_renames:
try:
k2 = next(words)
except StopIteration:
depsets[-1].append(element_func(k))
else:
if k2 != "->":
depsets[-1].append(element_func(k))
words.appendleft((k2,))
else:
k3 = next(words)
# file rename
depsets[-1].append(element_func(k, k3))
else:
# node/element
depsets[-1].append(element_func(k))
except IGNORED_EXCEPTIONS:
raise
except DepsetParseError:
# [][-1] for a frame access, which means it was a parse error.
raise
except StopIteration:
if k is None:
raise
raise DepsetParseError(dep_str, k, attr=attr)
except Exception as e:
raise DepsetParseError(dep_str, e, attr=attr) from e
# check if any closures required
if len(depsets) != 1:
raise DepsetParseError(dep_str, attr=attr)
if transitive_use_atoms and not node_conds:
element_class = transitive_use_atom
# we can't rely on iter(self) here since it doesn't
# descend through boolean restricts.
node_conds = cls._has_transitive_use_atoms(restrictions)
return cls(tuple(restrictions), element_class, node_conds)
@staticmethod
def _has_transitive_use_atoms(iterable):
kls = transitive_use_atom
ifunc = isinstance
return any(ifunc(x, kls) for x in iflatten_instance(iterable, atom))
[docs]
def evaluate_depset(self, cond_dict, tristate_filter=None, pkg=None):
"""
:param cond_dict: container to be used for conditional collapsing,
typically is a use list
:param tristate_filter: a control; if specified, must be a container
of conditionals to lock to cond_dict.
during processing, if it's not in tristate_filter will
automatically enable the payload
(regardless of the conditionals negation)
"""
if not self.has_conditionals:
return self
results = []
self.evaluate_conditionals(
self.__class__, results, cond_dict, tristate_filter, force_collapse=True
)
return self.__class__(tuple(results), self.element_class, False)
[docs]
@staticmethod
def find_cond_nodes(restriction_set, yield_non_conditionals=False):
conditions_stack = []
new_set = expandable_chain(restriction_set)
for cur_node in new_set:
if isinstance(cur_node, packages.Conditional):
conditions_stack.append(cur_node.restriction)
new_set.appendleft(list(cur_node.payload) + [None])
elif isinstance(cur_node, transitive_use_atom):
new_set.appendleft(cur_node.convert_to_conditionals())
elif isinstance(cur_node, boolean.base) and not isinstance(cur_node, atom):
new_set.appendleft(cur_node.restrictions)
elif cur_node is None:
conditions_stack.pop()
elif conditions_stack or yield_non_conditionals: # leaf
yield (cur_node, conditions_stack[:])
@property
def node_conds(self):
if self._node_conds is False:
object.__setattr__(self, "_node_conds", {})
elif self._node_conds is True:
nc = {}
always_required = set()
for payload, restrictions in self.find_cond_nodes(self.restrictions, True):
if not restrictions:
always_required.add(payload)
else:
if len(restrictions) == 1:
current = restrictions[0]
else:
current = values.AndRestriction(*restrictions)
nc.setdefault(payload, []).append(current)
for k in always_required:
if k in nc:
del nc[k]
for k in nc:
nc[k] = tuple(nc[k])
object.__setattr__(self, "_node_conds", nc)
return self._node_conds
@property
def has_conditionals(self):
return bool(self._node_conds)
@property
def known_conditionals(self):
if self._node_conds is False:
return frozenset()
if self._known_conditionals is None:
kc = set()
for payload, restrictions in self.find_cond_nodes(self.restrictions):
kc.update(iflatten_instance(x.vals for x in restrictions))
kc = frozenset(kc)
object.__setattr__(self, "_known_conditionals", kc)
return kc
return self._known_conditionals
[docs]
def match(self, *a):
raise NotImplementedError
[docs]
def slotdep_str(self, domain):
return stringify_boolean(self, domain=domain)
force_False = force_True = match
def __str__(self):
return stringify_boolean(self)
# parent __hash__() isn't inherited when __eq__() is defined in the child class
__hash__ = boolean.AndRestriction.__hash__
def __eq__(self, other):
if isinstance(other, DepSet):
return set(self.restrictions) == set(other.restrictions)
return False
def __ne__(self, other):
return not self.__eq__(other)
def __iter__(self):
return iter(self.restrictions)
def __getitem__(self, key):
return self.restrictions[key]
[docs]
def stringify_boolean(
node: restriction.base,
func: typing.Callable[[restriction.base], str] = str,
domain=None,
):
"""func is used to stringify the actual content. Useful for fetchables."""
l = []
if isinstance(node, DepSet):
for x in node.restrictions:
_internal_stringify_boolean(x, domain, func, l.append)
else:
_internal_stringify_boolean(node, domain, func, l.append)
return " ".join(l)
def _internal_stringify_boolean(
node: restriction.base,
domain,
func: typing.Callable[[restriction.base], str],
visit: typing.Callable[[str], None],
):
"""func is used to stringify the actual content. Useful for fetchables."""
if isinstance(node, boolean.OrRestriction):
visit("|| (")
iterable = node.restrictions
elif isinstance(node, boolean.AndRestriction) and not isinstance(node, atom):
visit("(")
iterable = node.restrictions
elif isinstance(node, packages.Conditional):
assert len(node.restriction.vals) == 1
iterable = node.payload
visit(
"%s%s? ("
% ("!" if node.restriction.negate else "", list(node.restriction.vals)[0])
)
else:
if domain is not None and (
isinstance(node, atom) and node.slot_operator == "="
):
pkg = max(sorted(domain.all_installed_repos.itermatch(node)))
object.__setattr__(node, "slot", pkg.slot)
object.__setattr__(node, "subslot", pkg.subslot)
visit(func(node))
return
for node in iterable:
_internal_stringify_boolean(node, domain, func, visit)
visit(")")