"""Core check classes."""
from collections import defaultdict
from functools import total_ordering
from pkgcore import fetch
from snakeoil import klass
from snakeoil.strings import pluralism
from .. import addons, base, feeds, runners, sources
from ..addons.caches import CachedAddon, CacheDisabled
from ..log import logger
from ..results import MetadataError
[docs]
@total_ordering
class Check(feeds.Feed):
"""Base template for a check.
:cvar scope: scope relative to the package repository the check runs under
:cvar source: source of feed items
:cvar known_results: result keywords the check can possibly yield
"""
known_results = frozenset()
# checkrunner class used to execute this check
runner_cls = runners.SyncCheckRunner
@klass.jit_attr
def priority(self):
"""Priority that affects order in which checks are run."""
# raise priority for checks that scan for metadata errors
if self.known_results.intersection(MetadataError.results.values()):
return -1
return 0
@property
def source(self):
# filter pkg feeds as required
if self.known_results.intersection(self.options.filter):
if self.scope >= base.version_scope:
return (
sources.FilteredRepoSource,
(sources.LatestVersionsFilter,),
(("source", self._source),),
)
elif max(x.scope for x in self.known_results) >= base.version_scope:
return (
sources.FilteredPackageRepoSource,
(sources.LatestPkgsFilter,),
(("source", self._source),),
)
return self._source
def __lt__(self, other):
if self.priority == other.priority:
return self.__class__.__name__ < other.__class__.__name__
return self.priority < other.priority
[docs]
class RepoCheck(Check):
"""Check that requires running at a repo level."""
runner_cls = runners.RepoCheckRunner
[docs]
def start(self):
"""Do startup here."""
[docs]
def finish(self):
"""Do cleanup and yield final results here."""
yield from ()
[docs]
class GentooRepoCheck(Check):
"""Check that is only run against the gentoo repo by default."""
def __init__(self, *args):
super().__init__(*args)
if not self.options.gentoo_repo:
check = self.__class__.__name__
if check in self.options.selected_checks:
self.options.override_skip["gentoo"].append(check)
else:
raise SkipCheck(self, "not running against gentoo repo")
[docs]
class OverlayRepoCheck(Check):
"""Check that is only run against overlay repos."""
def __init__(self, *args):
super().__init__(*args)
if not self.options.target_repo.masters:
raise SkipCheck(self, "not running against overlay")
[docs]
class OptionalCheck(Check):
"""Check that is only run when explicitly enabled."""
[docs]
class GitCommitsCheck(OptionalCheck):
"""Check that is only run when explicitly enabled via the --commits git option."""
runner_cls = runners.SequentialCheckRunner
def __init__(self, *args):
super().__init__(*args)
if not self.options.commits:
raise SkipCheck(self, "not scanning against git commits")
[docs]
class AsyncCheck(Check):
"""Check that schedules tasks to be run asynchronously."""
runner_cls = runners.AsyncCheckRunner
def __init__(self, *args, results_q):
super().__init__(*args)
self.results_q = results_q
[docs]
class NetworkCheck(AsyncCheck, OptionalCheck):
"""Check that is only run when network support is enabled."""
required_addons = (addons.NetAddon,)
def __init__(self, *args, net_addon, **kwargs):
super().__init__(*args, **kwargs)
if not self.options.net:
raise SkipCheck(self, "network checks not enabled")
self.timeout = net_addon.timeout
self.session = net_addon.session
[docs]
class MirrorsCheck(Check):
"""Check that requires determining mirrors used by a given package."""
required_addons = (addons.UseAddon,)
def __init__(self, *args, use_addon):
super().__init__(*args)
self.iuse_filter = use_addon.get_filter("fetchables")
[docs]
def get_mirrors(self, pkg):
mirrors = []
fetchables, _ = self.iuse_filter(
(fetch.fetchable,),
pkg,
pkg.generate_fetchables(allow_missing_checksums=True, ignore_unknown_mirrors=True),
)
for f in fetchables:
for m in f.uri.visit_mirrors(treat_default_as_mirror=False):
mirrors.append(m[0].mirror_name)
return set(mirrors)
[docs]
class SkipCheck(base.PkgcheckUserException):
"""Check failed to initialize due to missing dependencies or other situation.
Checks not explicitly selected will be skipped if they raise this during
initialization.
"""
def __init__(self, check, msg):
if isinstance(check, Check):
check_name = check.__class__.__name__
else:
# assume the check param is a raw class object
check_name = check.__name__
super().__init__(f"{check_name}: {msg}")
[docs]
def init_checks(enabled_addons, options, results_q, *, addons_map=None, source_map=None):
"""Initialize selected checks."""
if addons_map is None:
addons_map = {}
if source_map is None:
source_map = {}
enabled = defaultdict(list)
# mapping of check skip overrides
options.override_skip = defaultdict(list)
# initialize required caches before other addons
enabled_addons = sorted(enabled_addons, key=lambda x: not issubclass(x, CachedAddon))
for cls in enabled_addons:
try:
if issubclass(cls, AsyncCheck):
addon = addons.init_addon(cls, options, addons_map, results_q=results_q)
else:
addon = addons.init_addon(cls, options, addons_map)
if isinstance(addon, Check):
source = source_map.get(addon.source)
if source is None:
source = sources.init_source(addon.source, options, addons_map)
source_map[addon.source] = source
enabled[(source, addon.runner_cls)].append(addon)
except (CacheDisabled, SkipCheck) as e:
# Raise exception if the related check was explicitly selected,
# otherwise it gets transparently skipped.
if cls.__name__ in options.selected_checks:
if isinstance(e, SkipCheck):
raise
raise SkipCheck(cls, e)
# report which check skips were overridden
for skip_type, checks in sorted(options.override_skip.items()):
s = pluralism(checks)
checks_str = ", ".join(sorted(checks))
logger.warning(f"running {skip_type} specific check{s}: {checks_str}")
return enabled