Source code for pkgcheck.feeds

"""Feed functionality used by checks."""

from . import base, sources


[docs] class Feed(base.Addon): """Base template for addon iterating over an item feed. :cvar scope: scope relative to the package repository the check runs under :cvar source: source of feed items """ _source = sources.RepoSource def __init_subclass__(cls, **kwargs): """Initialize feed subclasses and set 'scope' class attribute.""" super().__init_subclass__(**kwargs) # determine feed scope by its registered source if isinstance(cls._source, tuple): if issubclass(cls._source[0], sources.EmptySource): scope = cls._source[1][0] else: scope = cls._source[0].scope else: scope = cls._source.scope cls.scope = scope @property def source(self): return self._source
[docs] def feed(self, item): """Handle functionality against the passed in item.""" yield from ()
[docs] def cleanup(self): """Do cleanup here."""
[docs] class QueryCache(Feed):
[docs] @staticmethod def mangle_argparser(parser): group = parser.add_argument_group("query caching") group.add_argument( "--reset-caching-per", dest="query_caching_freq", choices=("version", "package", "category"), default="package", help="control how often the cache is cleared " "(version, package or category)", )
@staticmethod def _version(item): return item @staticmethod def _package(item): return item.key @staticmethod def _category(item): return item.category def __init__(self, options): super().__init__(options) self.query_cache = {} self._keyfunc = getattr(self, f"_{options.query_caching_freq}") self._key = None
[docs] def feed(self, item): key = self._keyfunc(item) # TODO: this should be logging debug info if key != self._key: self.query_cache.clear() self._key = key super().feed(item)
[docs] class EvaluateDepSet(Feed): def __init__(self, *args, profile_addon): super().__init__(*args) self.pkg_evaluate_depsets_cache = {} self.pkg_profiles_cache = {} self.profiles = profile_addon
[docs] def feed(self, item): super().feed(item) self.pkg_evaluate_depsets_cache.clear() self.pkg_profiles_cache.clear()
def _identify_common_depsets(self, pkg, depset): profile_grps = self.pkg_profiles_cache.get(pkg) if profile_grps is None: profile_grps = self.profiles.identify_profiles(pkg) self.pkg_profiles_cache[pkg] = profile_grps # strip use dep defaults so known flags get identified correctly diuse = frozenset(x[:-3] if x[-1] == ")" else x for x in depset.known_conditionals) collapsed = {} for profiles in profile_grps: immutable, enabled = profiles[0].identify_use(pkg, diuse) collapsed.setdefault((immutable, enabled), []).extend(profiles) return [ (depset.evaluate_depset(k[1], tristate_filter=k[0]), v) for k, v in collapsed.items() ]
[docs] def collapse_evaluate_depset(self, pkg, attr, depset): depset_profiles = self.pkg_evaluate_depsets_cache.get((pkg, attr)) if depset_profiles is None: depset_profiles = self._identify_common_depsets(pkg, depset) self.pkg_evaluate_depsets_cache[(pkg, attr)] = depset_profiles return depset_profiles