Source code for pkgcore.restrictions.packages

restriction classes designed for package level matching

from snakeoil import klass
from snakeoil.compatibility import IGNORED_EXCEPTIONS
from snakeoil.klass import generic_equality, static_attrgetter

from ..log import logger
from . import boolean, restriction

[docs] class PackageRestriction(restriction.base, metaclass=generic_equality): """Package data restriction.""" __slots__ = ( "_pull_attr_func", "_attr_split", "restriction", "ignore_missing", "negate", ) __attr_comparison__ = ("__class__", "negate", "_attr_split", "restriction") __inst_caching__ = True type = restriction.package_type subtype = restriction.value_type conditional = False # Note a sentinel is used purely because the codepath that use it # can get executed a *lot*, and setup/tear down of exception # machinery can be surprisingly costly # Careful: some methods (__eq__, __hash__, intersect) try to work # for subclasses too. They will not behave as intended if a # subclass adds attributes. So if you do that, override the # methods. def __init__(self, attr, childrestriction, negate=False, ignore_missing=True): """ :param attr: package attribute to match against :param childrestriction: a :obj:`pkgcore.restrictions.values.base` instance to pass attr to for matching :param negate: should the results be negated? """ if not childrestriction.type == self.subtype: raise TypeError("restriction must be of type %r" % (self.subtype,)) sf = object.__setattr__ sf(self, "negate", negate) self._parse_attr(attr) sf(self, "restriction", childrestriction) sf(self, "ignore_missing", ignore_missing) def _parse_attr(self, attr): object.__setattr__(self, "_pull_attr_func", static_attrgetter(attr)) object.__setattr__(self, "_attr_split", attr.split(".")) def _pull_attr(self, pkg): try: return self._pull_attr_func(pkg) except IGNORED_EXCEPTIONS: raise except Exception as e: if self._handle_exception(pkg, e, self._attr_split): raise return klass.sentinel
[docs] def match(self, pkg): attr = self._pull_attr(pkg) if attr is klass.sentinel: return self.negate return self.restriction.match(attr) != self.negate
def _handle_exception(self, pkg, exc, attr_split): if isinstance(exc, AttributeError): if not self.ignore_missing: logger.exception( "failed getting attribute %s from %s, " "exception %s", ".".join(attr_split), str(pkg), str(exc), ) eargs = [x for x in exc.args if isinstance(x, str)] if any(x in attr_split for x in eargs): return False elif any("'%s'" % x in y for x in attr_split for y in eargs): # this is fairly horrible; probably specific to cpython also. # either way, does a lookup specifically for attr components # in the string exception string, looking for 'attr' in the # text. # if it doesn't match, exception is thrown. return False logger.exception( "caught unexpected exception accessing %s from %s, " "exception %s", ".".join(attr_split), str(pkg), str(exc), ) return True
[docs] def force_False(self, pkg): attr = self._pull_attr(pkg) if attr is klass.sentinel: return not self.negate if self.negate: return self.restriction.force_True(pkg, self.attr, attr) return self.restriction.force_False(pkg, self.attr, attr)
[docs] def force_True(self, pkg): attr = self._pull_attr(pkg) if attr is klass.sentinel: return self.negate if self.negate: return self.restriction.force_False(pkg, self.attr, attr) return self.restriction.force_True(pkg, self.attr, attr)
def __len__(self): if not isinstance(self.restriction, boolean.base): return 1 return len(self.restriction) + 1 def __hash__(self): return hash((self.negate, self.attrs, self.restriction)) def __str__(self): s = f"{self.attrs} " if self.negate: s += "not " return s + str(self.restriction) def __repr__(self): if self.negate: string = "<%s attr=%r restriction=%r negated @%#8x>" else: string = "<%s attr=%r restriction=%r @%#8x>" return string % (self.__class__.__name__, self.attr, self.restriction, id(self)) @property def attr(self): return ".".join(self._attr_split) @property def attrs(self): return (self.attr,)
[docs] class PackageRestrictionMulti(PackageRestriction): __slots__ = () __inst_caching__ = True attr = None
[docs] def force_False(self, pkg): attrs = self._pull_attr(pkg) if attrs is klass.sentinel: return not self.negate if self.negate: return self.restriction.force_True(pkg, self.attrs, attrs) return self.restriction.force_False(pkg, self.attrs, attrs)
[docs] def force_True(self, pkg): attrs = self._pull_attr(pkg) if attrs is klass.sentinel: return self.negate if self.negate: return self.restriction.force_False(pkg, self.attrs, attrs) return self.restriction.force_True(pkg, self.attrs, attrs)
@property def attrs(self): return tuple(".".join(x) for x in self._attr_split) def _parse_attr(self, attrs): object.__setattr__( self, "_pull_attr_func", tuple(map(static_attrgetter, attrs)) ) object.__setattr__(self, "_attr_split", tuple(x.split(".") for x in attrs)) def _pull_attr(self, pkg): val = [] try: for attr_func in self._pull_attr_func: val.append(attr_func(pkg)) except IGNORED_EXCEPTIONS: raise except Exception as e: if self._handle_exception(pkg, e, self._attr_split[len(val)]): raise return klass.sentinel return val __hash__ = PackageRestriction.__hash__ __eq__ = PackageRestriction.__eq__
[docs] class Conditional(PackageRestriction, metaclass=generic_equality): """Base object representing a conditional package restriction. Used to control whether a payload of restrictions are accessible or not. """ __slots__ = ("payload",) __attr_comparison__ = ("__class__", "negate", "attr", "restriction", "payload") conditional = True # note that instance caching is turned off. # rarely pays off for conditionals from a speed/mem comparison def __init__(self, attr, childrestriction, payload, **kwds): """ :param attr: attr to match against :param childrestriction: restriction to control whether or not the payload is accessible :param payload: payload data, whatever it may be. :param kwds: additional args to pass to :obj:`PackageRestriction` """ PackageRestriction.__init__(self, attr, childrestriction, **kwds) object.__setattr__(self, "payload", tuple(payload)) def __str__(self): s = PackageRestriction.__str__(self) payload = ", ".join(str(x) for x in self.payload) return f"( Conditional: {s} payload: [ {payload} ] )" def __repr__(self): if self.negate: string = "<%s attr=%r restriction=%r payload=%r negated @%#8x>" else: string = "<%s attr=%r restriction=%r payload=%r @%#8x>" return string % ( self.__class__.__name__, self.attr, self.restriction, self.payload, id(self), ) def __iter__(self): return iter(self.payload) def __hash__(self): return hash((self.attr, self.negate, self.restriction, self.payload))
[docs] def evaluate_conditionals( self, parent_cls, parent_seq, enabled, tristate_locked=None ): if tristate_locked is not None: assert len(self.restriction.vals) == 1 val = list(self.restriction.vals)[0] if val in tristate_locked: # if val is forced true, but the check is # negation ignore it # if !mips != mips if (val in enabled) == self.restriction.negate: return elif not self.restriction.match(enabled): return if self.payload: boolean.AndRestriction(*self.payload).evaluate_conditionals( parent_cls, parent_seq, enabled, tristate_locked )
# "Invalid name" (pylint uses the module const regexp, not the class regexp) # pylint: disable-msg=C0103 AndRestriction = restriction.curry_node_type( boolean.AndRestriction, restriction.package_type ) OrRestriction = restriction.curry_node_type( boolean.OrRestriction, restriction.package_type ) AlwaysBool = restriction.curry_node_type( restriction.AlwaysBool, restriction.package_type )
[docs] class KeyedAndRestriction(boolean.AndRestriction): __inst_caching__ = True type = restriction.package_type def __init__(self, *a, **kwds): key = kwds.pop("key", None) tag = kwds.pop("tag", None) boolean.AndRestriction.__init__(self, *a, **kwds) object.__setattr__(self, "key", key) object.__setattr__(self, "tag", tag) def __str__(self): boolean_str = boolean.AndRestriction.__str__(self) if self.tag is None: return boolean_str return f"{self.tag} {boolean_str}"
AlwaysTrue = AlwaysBool(negate=True) AlwaysFalse = AlwaysBool(negate=False)