import re
from collections import defaultdict
from difflib import SequenceMatcher
from itertools import chain
from pkgcore import fetch
from pkgcore.ebuild.digest import Manifest
from snakeoil.sequences import iflatten_instance
from snakeoil.strings import pluralism
from .. import addons, base, results, sources
from . import Check, MirrorsCheck, RepoCheck
DEPRECATED_HASHES = frozenset({"md5", "rmd160", "sha1", "whirlpool"})
[docs]
class MultiMovePackageUpdate(results.ProfilesResult, results.Warning):
"""Entry for package moved multiple times in profiles/updates files."""
def __init__(self, pkg, moves):
super().__init__()
self.pkg = pkg
self.moves = tuple(moves)
@property
def desc(self):
return f"{self.pkg!r}: multi-move update: {' -> '.join(self.moves)}"
[docs]
class OldMultiMovePackageUpdate(results.ProfilesResult, results.Warning):
"""Old entry for removed package moved multiple times in profiles/updates files.
This means that the reported pkg has been moved at least three times and
finally removed from the tree. All the related lines should be removed from
the update files.
"""
def __init__(self, pkg, moves):
super().__init__()
self.pkg = pkg
self.moves = tuple(moves)
@property
def desc(self):
return f"{self.pkg!r} unavailable: old multi-move update: {' -> '.join(self.moves)}"
[docs]
class OldPackageUpdate(results.ProfilesResult, results.Warning):
"""Old entry for removed package in profiles/updates files."""
def __init__(self, pkg, updates):
super().__init__()
self.pkg = pkg
self.updates = tuple(updates)
@property
def desc(self):
return f"{self.pkg!r} unavailable: old update line: {' '.join(self.updates)!r}"
[docs]
class RedundantPackageUpdate(results.ProfilesResult, results.Warning):
"""Move entry to the same package/slot (source == target)."""
def __init__(self, updates):
super().__init__()
self.updates = tuple(updates)
@property
def desc(self):
return f"update line moves to the same package/slot: {' '.join(self.updates)!r}"
[docs]
class MovedPackageUpdate(results.ProfilesResult, results.LogWarning):
"""Entry for package already moved in profiles/updates files."""
[docs]
class BadPackageUpdate(results.ProfilesResult, results.LogError):
"""Badly formatted package update in profiles/updates files."""
[docs]
class PackageUpdatesCheck(RepoCheck):
"""Scan profiles/updates/* for outdated entries and other issues."""
_source = (sources.EmptySource, (base.profiles_scope,))
known_results = frozenset(
{
MultiMovePackageUpdate,
OldMultiMovePackageUpdate,
OldPackageUpdate,
MovedPackageUpdate,
BadPackageUpdate,
RedundantPackageUpdate,
}
)
def __init__(self, *args):
super().__init__(*args)
self.repo = self.options.target_repo
self.search_repo = self.options.search_repo
[docs]
def finish(self):
logmap = (
base.LogMap("pkgcore.log.logger.warning", MovedPackageUpdate),
base.LogMap("pkgcore.log.logger.error", BadPackageUpdate),
)
# convert log warnings/errors into reports
with base.LogReports(*logmap) as log_reports:
repo_updates = self.repo.config.updates
yield from log_reports
multi_move_updates = {}
old_move_updates = {}
old_slotmove_updates = {}
for pkg, updates in repo_updates.items():
move_updates = [x for x in updates if x[0] == "move"]
slotmove_updates = [x for x in updates if x[0] == "slotmove"]
# check for multi-updates, a -> b, b -> c, ...
if len(move_updates) > 1:
# the most recent move should override all the older entries,
# meaning only a single report for the entire chain should created
multi_move_updates[move_updates[-1][2]] = (pkg, [x[2] for x in move_updates])
else:
# scan updates for old entries with removed packages
for x in move_updates:
_, old, new = x
if not self.search_repo.match(new):
old_move_updates[new] = x
if old == new:
yield RedundantPackageUpdate(map(str, x))
# scan updates for old entries with removed packages
for x in slotmove_updates:
_, pkg, newslot = x
orig_line = ("slotmove", str(pkg)[: -(len(pkg.slot) + 1)], pkg.slot, newslot)
if not self.search_repo.match(pkg.unversioned_atom):
# reproduce updates file line data for result output
old_slotmove_updates[pkg.key] = orig_line
if pkg.slot == newslot:
yield RedundantPackageUpdate(map(str, orig_line))
for pkg, v in multi_move_updates.items():
orig_pkg, moves = v
# check for multi-move chains ending in removed packages
moves = [str(orig_pkg)] + list(map(str, moves))
if not self.search_repo.match(pkg):
yield OldMultiMovePackageUpdate(str(moves[-1]), moves)
# don't generate duplicate old report
old_move_updates.pop(pkg, None)
else:
yield MultiMovePackageUpdate(str(orig_pkg), moves)
# report remaining old updates
for pkg, move in chain(old_move_updates.items(), old_slotmove_updates.items()):
updates = map(str, move)
yield OldPackageUpdate(str(pkg), updates)
[docs]
class UnusedLicenses(results.Warning):
"""Unused license(s) detected."""
def __init__(self, licenses):
super().__init__()
self.licenses = tuple(licenses)
@property
def desc(self):
s = pluralism(self.licenses)
licenses = ", ".join(self.licenses)
return f"unused license{s}: {licenses}"
[docs]
class UnusedLicensesCheck(RepoCheck):
"""Check for unused license files."""
_source = sources.RepositoryRepoSource
known_results = frozenset({UnusedLicenses})
def __init__(self, *args):
super().__init__(*args)
self.unused_licenses = None
[docs]
def start(self):
master_licenses = set()
for repo in self.options.target_repo.masters:
master_licenses.update(repo.licenses)
self.unused_licenses = set(self.options.target_repo.licenses) - master_licenses
[docs]
def feed(self, pkg):
self.unused_licenses.difference_update(iflatten_instance(pkg.license))
yield from ()
[docs]
def finish(self):
if self.unused_licenses:
yield UnusedLicenses(sorted(self.unused_licenses))
[docs]
class UnusedMirrors(results.Warning):
"""Unused mirrors detected."""
def __init__(self, mirrors):
super().__init__()
self.mirrors = tuple(mirrors)
@property
def desc(self):
s = pluralism(self.mirrors)
mirrors = ", ".join(self.mirrors)
return f"unused mirror{s}: {mirrors}"
[docs]
class UnusedMirrorsCheck(MirrorsCheck, RepoCheck):
"""Check for unused mirrors."""
_source = sources.RepositoryRepoSource
known_results = frozenset({UnusedMirrors})
[docs]
def start(self):
master_mirrors = set()
for repo in self.options.target_repo.masters:
master_mirrors.update(repo.mirrors.keys())
self.unused_mirrors = set(self.options.target_repo.mirrors.keys()) - master_mirrors
[docs]
def feed(self, pkg):
if self.unused_mirrors:
self.unused_mirrors.difference_update(self.get_mirrors(pkg))
yield from ()
[docs]
def finish(self):
if self.unused_mirrors:
yield UnusedMirrors(sorted(self.unused_mirrors))
[docs]
class UnusedEclasses(results.Warning):
"""Unused eclasses detected."""
def __init__(self, eclasses):
super().__init__()
self.eclasses = tuple(eclasses)
@property
def desc(self):
es = pluralism(self.eclasses, plural="es")
eclasses = ", ".join(self.eclasses)
return f"unused eclass{es}: {eclasses}"
[docs]
class UnusedEclassesCheck(RepoCheck):
"""Check for unused eclasses."""
_source = sources.RepositoryRepoSource
known_results = frozenset({UnusedEclasses})
def __init__(self, *args):
super().__init__(*args)
self.unused_eclasses = None
[docs]
def start(self):
master_eclasses = set()
for repo in self.options.target_repo.masters:
master_eclasses.update(repo.eclass_cache.eclasses.keys())
self.unused_eclasses = (
set(self.options.target_repo.eclass_cache.eclasses.keys()) - master_eclasses
)
[docs]
def feed(self, pkg):
self.unused_eclasses.difference_update(pkg.inherited)
yield from ()
[docs]
def finish(self):
if self.unused_eclasses:
yield UnusedEclasses(sorted(self.unused_eclasses))
[docs]
class UnknownLicenses(results.Warning):
"""License(s) listed in license group(s) that don't exist."""
def __init__(self, group, licenses):
super().__init__()
self.group = group
self.licenses = tuple(licenses)
@property
def desc(self):
s = pluralism(self.licenses)
licenses = ", ".join(self.licenses)
return f"license group {self.group!r} has unknown license{s}: [ {licenses} ]"
[docs]
class LicenseGroupsCheck(RepoCheck):
"""Scan license groups for unknown licenses."""
_source = (sources.EmptySource, (base.repo_scope,))
known_results = frozenset({UnknownLicenses})
def __init__(self, *args):
super().__init__(*args)
self.repo = self.options.target_repo
[docs]
def finish(self):
for group, licenses in self.repo.licenses.groups.items():
if unknown_licenses := set(licenses).difference(self.repo.licenses):
yield UnknownLicenses(group, sorted(unknown_licenses))
[docs]
class PotentialLocalUse(results.Info):
"""Global USE flag is a potential local USE flag."""
def __init__(self, flag, pkgs):
super().__init__()
self.flag = flag
self.pkgs = tuple(pkgs)
@property
def desc(self):
s = pluralism(self.pkgs)
pkgs = ", ".join(self.pkgs)
return (
f"global USE flag {self.flag!r} is a potential local, "
f"used by {len(self.pkgs)} package{s}: {pkgs}"
)
[docs]
class UnusedGlobalUse(results.Warning):
"""Unused use.desc flag(s)."""
def __init__(self, flags):
super().__init__()
self.flags = tuple(flags)
@property
def desc(self):
s = pluralism(self.flags)
flags = ", ".join(self.flags)
return f"use.desc unused flag{s}: {flags}"
[docs]
class UnusedGlobalUseExpand(results.Warning):
"""Unused global USE_EXPAND flag(s)."""
def __init__(self, flags):
super().__init__()
self.flags = tuple(flags)
@property
def desc(self):
s = pluralism(self.flags)
flags = ", ".join(self.flags)
return f"unused flag{s}: {flags}"
[docs]
class PotentialGlobalUse(results.Info):
"""Local USE flag is a potential global USE flag."""
def __init__(self, flag, pkgs):
super().__init__()
self.flag = flag
self.pkgs = tuple(pkgs)
@property
def desc(self):
return (
f"local USE flag {self.flag!r} is a potential global "
f"used by {len(self.pkgs)} packages: {', '.join(self.pkgs)}"
)
def _dfs(graph, start, visited=None):
if visited is None:
visited = set()
visited.add(start)
for node in graph[start] - visited:
_dfs(graph, node, visited)
return visited
[docs]
class GlobalUseCheck(RepoCheck):
"""Check global USE and USE_EXPAND flags for various issues."""
_source = (sources.RepositoryRepoSource, (), (("source", sources.PackageRepoSource),))
known_results = frozenset(
{
PotentialLocalUse,
PotentialGlobalUse,
UnusedGlobalUse,
UnusedGlobalUseExpand,
}
)
def __init__(self, *args):
super().__init__(*args)
self.global_flag_usage = defaultdict(set)
self.repo = self.options.target_repo
[docs]
def feed(self, pkgs):
# ignore bad XML, it will be caught by metadata.xml checks
local_use = set(pkgs[0].local_use.keys())
for pkg in pkgs:
for flag in pkg.iuse_stripped.difference(local_use):
self.global_flag_usage[flag].add(pkg.unversioned_atom)
yield from ()
@staticmethod
def _similar_flags(pkgs):
"""Yield groups of packages with similar local USE flag descriptions."""
# calculate USE flag description difference ratios
diffs = {}
for i, (_i_pkg, i_desc) in enumerate(pkgs):
for j, (_j_pkg, j_desc) in enumerate(pkgs[i + 1 :]):
diffs[(i, i + j + 1)] = SequenceMatcher(None, i_desc, j_desc).ratio()
# create an adjacency list using all closely matching flags pairs
similar = defaultdict(set)
for (i, j), r in diffs.items():
if r >= 0.75:
similar[i].add(j)
similar[j].add(i)
# not enough close matches found
if len(similar.keys()) < 5:
return
# determine groups of connected components
nodes = set(similar.keys())
components = []
while nodes:
visited = _dfs(similar, nodes.pop())
components.append(visited)
nodes -= visited
# Flag groups of five or more pkgs with similar local USE flags as a
# potential globals -- note that this can yield the same flag for
# multiple, distinct descriptions.
for component in components:
if len(component) >= 5:
yield [pkgs[i][0] for i in component]
[docs]
def finish(self):
repo_global_use = {flag for matcher, (flag, desc) in self.repo.config.use_desc}
repo_global_use_expand = {
flag
for use_expand in self.repo.config.use_expand_desc.values()
for flag, desc in use_expand
}
repo_local_use = self.repo.config.use_local_desc
unused_global_use = []
unused_global_use_expand = []
potential_locals = []
for flag in repo_global_use:
pkgs = self.global_flag_usage[flag]
if not pkgs:
unused_global_use.append(flag)
elif len(pkgs) < 5:
potential_locals.append((flag, pkgs))
for flag in repo_global_use_expand:
if not self.global_flag_usage[flag]:
unused_global_use_expand.append(flag)
if unused_global_use:
yield UnusedGlobalUse(sorted(unused_global_use))
if unused_global_use_expand:
yield UnusedGlobalUseExpand(sorted(unused_global_use_expand))
for flag, pkgs in sorted(potential_locals, key=lambda x: len(x[1])):
pkgs = sorted(map(str, pkgs))
yield PotentialLocalUse(flag, pkgs)
local_use = defaultdict(list)
for pkg, (flag, desc) in repo_local_use:
if flag not in repo_global_use:
local_use[flag].append((pkg, desc))
potential_globals = []
for flag, pkgs in sorted((k, v) for k, v in local_use.items() if len(v) >= 5):
for matching_pkgs in self._similar_flags(pkgs):
potential_globals.append((flag, matching_pkgs))
for flag, pkgs in sorted(potential_globals, key=lambda x: len(x[1]), reverse=True):
pkgs = sorted(map(str, pkgs))
yield PotentialGlobalUse(flag, pkgs)
[docs]
class MissingChksum(results.VersionResult, results.Warning):
"""A file in the chksum data lacks required checksums."""
def __init__(self, filename, missing, existing, **kwargs):
super().__init__(**kwargs)
self.filename = filename
self.missing = tuple(missing)
self.existing = tuple(existing)
@property
def desc(self):
return (
f"{self.filename!r} missing required chksums: "
f"{', '.join(self.missing)}; has chksums: {', '.join(self.existing)}"
)
[docs]
class DeprecatedChksum(results.VersionResult, results.Warning):
"""A file in the chksum data does not use modern checksum set."""
def __init__(self, filename, deprecated, **kwargs):
super().__init__(**kwargs)
self.filename = filename
self.deprecated = tuple(deprecated)
@property
def desc(self):
s = pluralism(self.deprecated)
deprecated = ", ".join(self.deprecated)
return f"{self.filename!r} has deprecated checksum{s}: {deprecated}"
[docs]
class MissingManifest(results.VersionResult, results.Error):
"""SRC_URI targets missing from Manifest file."""
def __init__(self, files, **kwargs):
super().__init__(**kwargs)
self.files = tuple(files)
@property
def desc(self):
s = pluralism(self.files)
files = ", ".join(self.files)
return f"distfile{s} missing from Manifest: [ {files} ]"
[docs]
class UnknownManifest(results.PackageResult, results.Warning):
"""Manifest entries not matching any SRC_URI targets."""
def __init__(self, files, **kwargs):
super().__init__(**kwargs)
self.files = tuple(files)
@property
def desc(self):
s = pluralism(self.files)
files = ", ".join(self.files)
return f"unknown distfile{s} in Manifest: [ {files} ]"
[docs]
class UnnecessaryManifest(results.PackageResult, results.Warning):
"""Manifest entries for non-DIST targets on a repo with thin manifests enabled."""
def __init__(self, files, **kwargs):
super().__init__(**kwargs)
self.files = tuple(files)
@property
def desc(self):
s = pluralism(self.files)
files = ", ".join(self.files)
return f"unnecessary file{s} in Manifest: [ {files} ]"
[docs]
class InvalidManifest(results.MetadataError, results.PackageResult):
"""Package's Manifest file is invalid."""
attr = "manifest"
[docs]
class DeprecatedManifestHash(results.PackageResult, results.Warning):
"""Manifest uses deprecated hashes.
The package uses deprecated hash types in its Manifest file.
"""
def __init__(self, hashes, **kwargs):
super().__init__(**kwargs)
self.hashes = tuple(hashes)
@property
def desc(self):
s = pluralism(self.hashes)
hashes = ", ".join(self.hashes)
return f"defines deprecated manifest hash types{s}: [ {hashes} ]"
[docs]
class ManifestCheck(Check):
"""Manifest related checks.
Verify that the Manifest file exists, doesn't have missing or
extraneous entries, and that the required hashes are in use.
"""
required_addons = (addons.UseAddon,)
_source = sources.PackageRepoSource
known_results = frozenset(
{
MissingChksum,
MissingManifest,
UnknownManifest,
UnnecessaryManifest,
DeprecatedChksum,
InvalidManifest,
DeprecatedManifestHash,
}
)
def __init__(self, *args, use_addon: addons.UseAddon):
super().__init__(*args)
repo = self.options.target_repo
self.preferred_checksums = frozenset(
repo.config.manifests.hashes if hasattr(repo, "config") else ()
)
self.required_checksums = frozenset(
repo.config.manifests.required_hashes if hasattr(repo, "config") else ()
)
self.iuse_filter = use_addon.get_filter("fetchables")
[docs]
def feed(self, pkgset):
pkg_manifest: Manifest = pkgset[0].manifest
pkg_manifest.allow_missing = True
manifest_distfiles = set(pkg_manifest.distfiles.keys())
seen = set()
for pkg in pkgset:
pkg.release_cached_data()
fetchables, _ = self.iuse_filter(
(fetch.fetchable,),
pkg,
pkg.generate_fetchables(allow_missing_checksums=True, ignore_unknown_mirrors=True),
)
fetchables = set(fetchables)
pkg.release_cached_data()
fetchable_files = {f.filename for f in fetchables}
missing_manifests = fetchable_files.difference(manifest_distfiles)
if missing_manifests:
yield MissingManifest(sorted(missing_manifests), pkg=pkg)
for f_inst in fetchables:
if f_inst.filename in seen:
continue
missing = self.required_checksums.difference(f_inst.chksums)
if f_inst.filename not in missing_manifests and missing:
yield MissingChksum(
f_inst.filename, sorted(missing), sorted(f_inst.chksums), pkg=pkg
)
elif f_inst.chksums:
if deprecated := frozenset(f_inst.chksums).difference(self.preferred_checksums):
yield DeprecatedChksum(f_inst.filename, sorted(deprecated), pkg=pkg)
seen.add(f_inst.filename)
if pkg_manifest.thin:
unnecessary_manifests = set()
for attr in ("aux_files", "ebuilds", "misc"):
unnecessary_manifests.update(getattr(pkg_manifest, attr, ()))
if unnecessary_manifests:
yield UnnecessaryManifest(sorted(unnecessary_manifests), pkg=pkgset[0])
if unknown_manifests := manifest_distfiles.difference(seen):
yield UnknownManifest(sorted(unknown_manifests), pkg=pkgset[0])
used_hashes = frozenset().union(*pkg_manifest.distfiles.values())
if deprecated_hashes := DEPRECATED_HASHES.intersection(used_hashes):
yield DeprecatedManifestHash(sorted(deprecated_hashes), pkg=pkgset[0])
[docs]
class ConflictingChksums(results.VersionResult, results.Error):
"""Checksum conflict detected between two files."""
def __init__(self, filename, chksums, pkgs, **kwargs):
super().__init__(**kwargs)
self.filename = filename
self.chksums = tuple(chksums)
self.pkgs = tuple(pkgs)
@property
def desc(self):
s = pluralism(self.chksums)
chksums = ", ".join(self.chksums)
pkgs_s = pluralism(self.pkgs)
pkgs = ", ".join(self.pkgs)
return (
f"distfile {self.filename!r} has different checksum{s} "
f"({chksums}) for package{pkgs_s}: {pkgs}"
)
[docs]
class MatchingChksums(results.VersionResult, results.Warning):
"""Two distfiles share the same checksums but use different names."""
def __init__(self, filename, orig_file, orig_pkg, **kwargs):
super().__init__(**kwargs)
self.filename = filename
self.orig_file = orig_file
self.orig_pkg = orig_pkg
@property
def desc(self):
msg = f"distfile {self.filename!r} matches checksums for {self.orig_file!r}"
if f"{self.category}/{self.package}" != self.orig_pkg:
msg += f" from {self.orig_pkg}"
return msg
[docs]
class ManifestCollisionCheck(Check):
"""Search Manifest entries for different types of distfile collisions.
In particular, search for matching filenames with different checksums and
different filenames with matching checksums.
"""
_source = (sources.RepositoryRepoSource, (), (("source", sources.PackageRepoSource),))
known_results = frozenset({ConflictingChksums, MatchingChksums})
def __init__(self, *args):
super().__init__(*args)
self.seen_files = {}
self.seen_chksums = {}
# ignore go.mod false positives (issue #228)
self._ignored_files_re = re.compile(r"^.*%2F@v.*\.mod$")
def _conflicts(self, pkg):
"""Check for similarly named distfiles with different checksums."""
for filename, chksums in pkg.manifest.distfiles.items():
existing = self.seen_files.get(filename)
if existing is None:
self.seen_files[filename] = ([pkg.key], dict(chksums.items()))
continue
seen_pkgs, seen_chksums = existing
conflicting_chksums = []
for chf_type, value in seen_chksums.items():
our_value = chksums.get(chf_type)
if our_value is not None and our_value != value:
conflicting_chksums.append(chf_type)
if conflicting_chksums:
pkgs = map(str, sorted(seen_pkgs))
yield ConflictingChksums(filename, sorted(conflicting_chksums), pkgs, pkg=pkg)
else:
seen_chksums.update(chksums)
seen_pkgs.append(pkg.key)
def _matching(self, pkg):
"""Check for distfiles with matching checksums and different names."""
for filename, chksums in pkg.manifest.distfiles.items():
key = tuple(chksums.values())
existing = self.seen_chksums.get(key)
if existing is None:
self.seen_chksums[key] = (pkg.key, filename)
continue
seen_pkg, seen_file = existing
if seen_file == filename or self._ignored_files_re.match(filename):
continue
yield MatchingChksums(filename, seen_file, seen_pkg, pkg=pkg)
[docs]
def feed(self, pkgs):
pkg = pkgs[0]
yield from self._conflicts(pkg)
yield from self._matching(pkg)
[docs]
class EmptyProject(results.Warning):
"""A project has no developers."""
def __init__(self, project):
super().__init__()
self.project = str(project)
@property
def desc(self):
return f"Project has no members: {self.project}"
[docs]
class DeprecatedRepoHash(results.Warning):
"""Repositories ``manifest-hashes`` defines deprecated hashes.
The repository defines deprecated hashes in ``manifest-hashes``.
"""
def __init__(self, hashes):
super().__init__()
self.hashes = tuple(hashes)
@property
def desc(self):
s = pluralism(self.hashes)
hashes = ", ".join(self.hashes)
return f"defines deprecated manifest-hash{s}: [ {hashes} ]"
[docs]
class RepoManifestHashCheck(RepoCheck):
"""Check ``manifest-hashes`` config for issues."""
_source = (sources.EmptySource, (base.repo_scope,))
known_results = frozenset({DeprecatedRepoHash})
def __init__(self, *args):
super().__init__(*args)
self.repo = self.options.target_repo
[docs]
def finish(self):
if deprecated_hashes := DEPRECATED_HASHES.intersection(self.repo.config.manifests.hashes):
yield DeprecatedRepoHash(sorted(deprecated_hashes))