Source code for pkgcheck.checks.visibility

from collections import defaultdict
from operator import attrgetter

from pkgcore.ebuild.atom import atom, transitive_use_atom
from snakeoil import klass
from snakeoil.iterables import caching_iter
from snakeoil.sequences import iflatten_func, iflatten_instance, stable_unique
from snakeoil.strings import pluralism

from .. import addons, feeds, results
from . import Check


[docs] class FakeConfigurable: "Package wrapper binding profile data." "" configurable = True __slots__ = ("use", "iuse", "_forced_use", "_masked_use", "_pkg_use", "_raw_pkg", "_profile") def __init__(self, pkg, profile): object.__setattr__(self, "_raw_pkg", pkg) object.__setattr__(self, "_profile", profile) object.__setattr__(self, "_forced_use", self._profile.forced_use.pull_data(self._raw_pkg)) object.__setattr__(self, "_masked_use", self._profile.masked_use.pull_data(self._raw_pkg)) object.__setattr__(self, "_pkg_use", self._profile.pkg_use.pull_data(self._raw_pkg)) use_defaults = {x[1:] for x in pkg.iuse if x[0] == "+"} enabled_use = ( use_defaults | profile.use | self._pkg_use | self._forced_use ) - self._masked_use object.__setattr__( self, "use", frozenset(enabled_use & (profile.iuse_effective | pkg.iuse_effective)) ) object.__setattr__(self, "iuse", frozenset(profile.iuse_effective.union(pkg.iuse_stripped)))
[docs] def request_enable(self, attr, *vals): if attr != "use": return False set_vals = frozenset(vals) if not set_vals.issubset(self.iuse): # requested a flag that doesn't exist in iuse return False # if any of the flags are in masked_use, it's a no go. return set_vals.isdisjoint(self._masked_use)
[docs] def request_disable(self, attr, *vals): if attr != "use": return False set_vals = frozenset(vals) if not set_vals.issubset(self.iuse): # requested a flag that doesn't exist in iuse return False # if any of the flags are forced_use, it's a no go. return set_vals.isdisjoint(self._forced_use)
[docs] def rollback(self, point=0): return True
[docs] def changes_count(self): return 0
def __str__(self): return str(self._raw_pkg) __getattr__ = klass.GetAttrProxy("_raw_pkg") def __setattr__(self, attr, val): raise AttributeError(self, "is immutable")
class _BlockMemoryExhaustion(Exception): pass # This is fast path code, hence the seperated implementations. if getattr(atom, "_TRANSITIVE_USE_ATOM_BUG_IS_FIXED", False): def _eapi2_flatten(val): return isinstance(val, atom) and not isinstance(val, transitive_use_atom) else: def _eapi2_flatten(val): if isinstance(val, transitive_use_atom): if len([x for x in val.use if x.endswith("?")]) > 16: raise _BlockMemoryExhaustion(val) return isinstance(val, atom) and not isinstance(val, transitive_use_atom)
[docs] def visit_atoms(pkg, stream): if not pkg.eapi.options.transitive_use_atoms: return iflatten_instance(stream, atom) return iflatten_func(stream, _eapi2_flatten)
[docs] class VisibleVcsPkg(results.VersionResult, results.Warning): """Package is VCS-based, but visible.""" def __init__(self, arch, profile, num_profiles=None, **kwargs): super().__init__(**kwargs) self.arch = arch self.profile = profile self.num_profiles = num_profiles @property def desc(self): if self.num_profiles is not None and self.num_profiles > 1: num_profiles = f" ({self.num_profiles} total)" else: num_profiles = "" return ( f'VCS version visible for KEYWORDS="{self.arch}", ' f"profile {self.profile}{num_profiles}" )
[docs] class NonexistentDeps(results.VersionResult, results.Warning): """No matches exist for a package dependency.""" def __init__(self, attr, nonexistent, **kwargs): super().__init__(**kwargs) self.attr = attr self.nonexistent = tuple(nonexistent) @property def desc(self): s = pluralism(self.nonexistent) nonexistent = ", ".join(self.nonexistent) return f"{self.attr}: nonexistent package{s}: {nonexistent}"
[docs] class UncheckableDep(results.VersionResult, results.Warning): """Given dependency cannot be checked due to the number of transitive use deps in it.""" def __init__(self, attr, **kwargs): super().__init__(**kwargs) self.attr = attr @property def desc(self): return f"depset {self.attr}: could not be checked due to pkgcore limitation"
[docs] class DependencyMoved(results.VersionResult, results.Error): """Ebuild depends on a dependency which was pkgmoved.""" def __init__(self, attr: str, source: str, target: str, **kwargs): super().__init__(**kwargs) self.attr = attr self.source = source self.target = target @property def desc(self): return f"depset({self.attr}) dependency moved, update {self.source!r} to {self.target!r}"
[docs] class NonsolvableDeps(results.VersionResult, results.AliasResult, results.Error): """No potential solution for a depset attribute.""" def __init__( self, attr, keyword, profile, deps, profile_status, profile_deprecated, num_profiles=None, **kwargs, ): super().__init__(**kwargs) self.attr = attr self.keyword = keyword self.profile = profile self.deps = tuple(deps) self.profile_status = profile_status self.profile_deprecated = profile_deprecated self.num_profiles = num_profiles @property def desc(self): profile_status = "deprecated " if self.profile_deprecated else "" profile_status += self.profile_status or "custom" if self.num_profiles is not None and self.num_profiles > 1: num_profiles = f" ({self.num_profiles} total)" else: num_profiles = "" return ( f"nonsolvable depset({self.attr}) keyword({self.keyword}) " f"{profile_status} profile ({self.profile}){num_profiles}: " f"solutions: [ {', '.join(self.deps)} ]" )
[docs] class NonsolvableDepsInStable(NonsolvableDeps): """No potential solution for dependency on stable profile."""
[docs] class NonsolvableDepsInDev(NonsolvableDeps): """No potential solution for dependency on dev profile."""
[docs] class NonsolvableDepsInExp(NonsolvableDeps): """No potential solution for dependency on exp profile.""" # results require experimental profiles to be enabled _profile = "exp"
[docs] class OldPackageName(results.PackageResult, results.Error): """Package uses old name which is source of pkgmove. Package is using ``${CATEGORY}/${PN}`` which is the source of a pkgmove. It should be updated to the destination (new name) from this repository or one of its master repositories. """ def __init__(self, new_name: str, **kwargs): super().__init__(**kwargs) self.new_name = new_name @property def desc(self): return f"package uses old name which is source of pkgmove, rename into {self.new_name!r}"
[docs] class VisibilityCheck(feeds.EvaluateDepSet, feeds.QueryCache, Check): """Visibility dependency scans. Check that at least one solution is possible for a pkg, checking all profiles (defined by arch.list) visibility modifiers per stable/unstable keyword. """ required_addons = (addons.profiles.ProfileAddon,) known_results = frozenset( { VisibleVcsPkg, NonexistentDeps, UncheckableDep, NonsolvableDepsInStable, NonsolvableDepsInDev, NonsolvableDepsInExp, DependencyMoved, OldPackageName, } ) @staticmethod def _collect_pkgmoves(repo): pkgmoves: dict[str, str] = {} for master in repo.masters: pkgmoves.update(VisibilityCheck._collect_pkgmoves(master)) for (action, *params), *_ in repo.config.updates.values(): if action == "move": source, target = params pkgmoves[source.key] = target.key return pkgmoves def __init__(self, *args, profile_addon): super().__init__(*args, profile_addon=profile_addon) self.profiles = profile_addon self.pkgmoves = self._collect_pkgmoves(self.options.target_repo) self.report_cls_map = { "stable": NonsolvableDepsInStable, "dev": NonsolvableDepsInDev, "exp": NonsolvableDepsInExp, }
[docs] def feed(self, pkg): super().feed(pkg) # query_cache gets caching_iter partial repo searches shoved into it- # reason is simple, it's likely that versions of this pkg probably # use similar deps- so we're forcing those packages that were # accessed for atom matching to remain in memory. # end result is less going to disk if pkg.live: # vcs ebuild that better not be visible yield from self.check_visibility_vcs(pkg) if pkg.key in self.pkgmoves: yield OldPackageName(self.pkgmoves[pkg.key], pkg=pkg) suppressed_depsets = [] for attr in (x.lower() for x in pkg.eapi.dep_keys): nonexistent = set() try: for orig_node in visit_atoms(pkg, getattr(pkg, attr)): node = orig_node.no_usedeps if node not in self.query_cache: if node in self.profiles.global_insoluble: nonexistent.add(node) # insert an empty tuple, so that tight loops further # on don't have to use the slower get method self.query_cache[node] = () else: matches = caching_iter(self.options.search_repo.itermatch(node)) if matches: self.query_cache[node] = matches if orig_node is not node: self.query_cache[str(orig_node)] = matches elif not node.blocks: nonexistent.add(node) self.query_cache[node] = () self.profiles.global_insoluble.add(node) elif not self.query_cache[node]: nonexistent.add(node) except _BlockMemoryExhaustion: yield UncheckableDep(attr, pkg=pkg) suppressed_depsets.append(attr) if nonexistent: for dep in set(nonexistent): if target := self.pkgmoves.get(dep.key): new_dep = str(dep).replace(dep.key, target) yield DependencyMoved(attr, str(dep), new_dep, pkg=pkg) nonexistent = {dep for dep in nonexistent if dep.key not in self.pkgmoves} if nonexistent := sorted(map(str, sorted(nonexistent))): yield NonexistentDeps(attr.upper(), nonexistent, pkg=pkg) for attr in (x.lower() for x in pkg.eapi.dep_keys): if attr in suppressed_depsets: continue depset = getattr(pkg, attr) profile_failures = defaultdict(lambda: defaultdict(set)) for edepset, profiles in self.collapse_evaluate_depset(pkg, attr, depset): for profile, failures in self.process_depset(pkg, attr, depset, edepset, profiles): failures = {failure for failure in failures if failure.key not in self.pkgmoves} if failures := tuple(map(str, sorted(failures))): profile_failures[failures][profile.status].add(profile) if profile_failures: if self.options.verbosity > 0: # report all failures across all profiles in verbose mode for failures, profiles in profile_failures.items(): for profile_status, cls in self.report_cls_map.items(): for profile in sorted( profiles.get(profile_status, ()), key=attrgetter("key", "name") ): yield cls( attr, profile.key, profile.name, failures, profile_status, profile.deprecated, pkg=pkg, ) else: # only report one failure per depset per profile type in regular mode for failures, profiles in profile_failures.items(): for profile_status, cls in self.report_cls_map.items(): status_profiles = sorted( profiles.get(profile_status, ()), key=attrgetter("key", "name") ) if status_profiles: profile = status_profiles[0] yield cls( attr, profile.key, profile.name, failures, profile_status, profile.deprecated, len(status_profiles), pkg=pkg, )
[docs] def check_visibility_vcs(self, pkg): visible = [] for profile in self.profiles: if profile.visible(pkg): visible.append(profile) if visible: if self.options.verbosity > 0: # report all failures across all profiles in verbose mode for p in visible: yield VisibleVcsPkg(p.key, p.name, pkg=pkg) else: p = visible[0] yield VisibleVcsPkg(p.key, p.name, len(visible), pkg=pkg)
[docs] def process_depset(self, pkg, attr, depset, edepset, profiles): get_cached_query = self.query_cache.get csolutions = [] for required in edepset.iter_cnf_solutions(): for node in required: if node.blocks: break else: csolutions.append(required) for profile in profiles: failures = set() # is it visible? ie, is it masked? # if so, skip it. # long term, probably should do testing in the same respect we do # for other visibility tiers cache = profile.cache provided = profile.provides_has_match insoluble = profile.insoluble visible = profile.visible for required in csolutions: # scan all of the quickies, the caches... for node in required: if node in cache: break elif provided(node): break else: for node in required: if node in insoluble: pass # get is required since there is an intermix between old style # virtuals and new style- thus the cache priming doesn't get # all of it. src = get_cached_query(node.no_usedeps, ()) if node.use: src = (FakeConfigurable(pkg, profile) for pkg in src) src = (pkg for pkg in src if node.force_True(pkg)) if any(visible(pkg) for pkg in src): cache.add(node) break else: insoluble.add(node) else: # no matches. not great, should collect them all failures.update(required) if failures: yield profile, failures