Source code for pkgcore.pkgsets.glsa

"""
Gentoo Linux Security Advisories (GLSA) support
"""

__all__ = ("GlsaDirSet", "SecurityUpgrades")

import os

from lxml import etree
from snakeoil.compatibility import IGNORED_EXCEPTIONS
from snakeoil.iterables import caching_iter
from snakeoil.klass import generic_equality
from snakeoil.osutils import listdir_files, pjoin

from ..config.hint import ConfigHint
from ..ebuild import atom, cpv
from ..ebuild import restricts as atom_restricts
from ..log import logger
from ..package import mutated
from ..repository.util import get_virtual_repos
from ..restrictions import packages, restriction, values


[docs] class GlsaDirSet(metaclass=generic_equality): """generate a pkgset based on GLSA's distributed via a directory. (rsync tree is the usual source.) """ pkgcore_config_type = ConfigHint(types={"src": "ref:repo"}, typename="pkgset") op_translate = {"ge": ">=", "gt": ">", "lt": "<", "le": "<=", "eq": "="} __attr_comparison__ = ("paths",) METADATA_FILES = frozenset( {"Manifest", "Manifest.files.gz", "timestamp.chk", "timestamp.commit"} ) def __init__(self, src): """ :param src: where to get the glsa from :type src: must be either full path to glsa dir, or a repo object to pull it from """ if not isinstance(src, str): src = sorted( path for repo in get_virtual_repos(src, False) if hasattr(repo, "base") if os.path.isdir(path := pjoin(repo.base, "metadata", "glsa")) ) else: src = (src,) self.paths = tuple(src) def __iter__(self): for _glsa, catpkg, pkgatom, vuln in self.iter_vulnerabilities(): yield packages.KeyedAndRestriction( pkgatom, vuln, key=catpkg, tag="GLSA vulnerable:" )
[docs] def pkg_grouped_iter(self, sorter=None): """yield GLSA restrictions grouped by package key :param sorter: must be either None, or a comparison function """ if sorter is None: sorter = iter pkgs = {} pkgatoms = {} for _glsa, pkg, pkgatom, vuln in self.iter_vulnerabilities(): pkgatoms[pkg] = pkgatom pkgs.setdefault(pkg, []).append(vuln) for pkgname in sorter(pkgs): yield packages.KeyedAndRestriction( pkgatoms[pkgname], packages.OrRestriction(*pkgs[pkgname]), key=pkgname )
[docs] def iter_vulnerabilities(self): """generator yielding each GLSA restriction""" for path in self.paths: for fn in listdir_files(path): # glsa-1234-12.xml if not (fn.startswith("glsa-") and fn.endswith(".xml")): if fn not in self.METADATA_FILES: logger.warning("invalid glsa file name: %r", fn) continue # This verifies the filename is of the correct syntax. try: [int(x) for x in fn[5:-4].split("-")] except ValueError: logger.warning("invalid glsa file name: %r", fn) continue root = etree.parse(pjoin(path, fn)) glsa_node = root.getroot() if glsa_node.tag != "glsa": logger.warning("glsa file without glsa root node: %r", fn) continue for affected in root.findall("affected"): for pkg in affected.findall("package"): try: pkgname = str(pkg.get("name")).strip() pkg_vuln_restrict = self.generate_intersects_from_pkg_node( pkg, tag=f"glsa({fn[5:-4]})" ) if pkg_vuln_restrict is None: continue pkgatom = atom.atom(pkgname) yield fn[5:-4], pkgname, pkgatom, pkg_vuln_restrict except (TypeError, ValueError) as exc: # thrown from cpv. logger.warning( "invalid glsa file %r, package %s: %s", fn, pkgname, exc ) except IGNORED_EXCEPTIONS: raise except Exception as exc: logger.warning("invalid glsa file %r: %s", fn, exc)
[docs] def generate_intersects_from_pkg_node(self, pkg_node, tag=None): arch = pkg_node.get("arch") if arch is not None: arch = tuple(str(arch.strip()).split()) if not arch or "*" in arch: arch = None vuln = list(pkg_node.findall("vulnerable")) if not vuln: return None elif len(vuln) > 1: vuln_list = [self.generate_restrict_from_range(x) for x in vuln] vuln = packages.OrRestriction(*vuln_list) else: vuln_list = [self.generate_restrict_from_range(vuln[0])] vuln = vuln_list[0] if arch is not None: vuln = packages.AndRestriction( vuln, packages.PackageRestriction( "keywords", values.ContainmentMatch(arch, match_all=False) ), ) invuln = pkg_node.findall("unaffected") if not invuln: # wrap it. return packages.KeyedAndRestriction(vuln, tag=tag) invuln_list = ( self.generate_restrict_from_range(x, negate=True) for x in invuln ) invuln = [x for x in invuln_list if x not in vuln_list] if not invuln: if tag is None: return packages.KeyedAndRestriction(vuln, tag=tag) return packages.KeyedAndRestriction(vuln, tag=tag) return packages.KeyedAndRestriction(vuln, tag=tag, *invuln)
[docs] def generate_restrict_from_range(self, node, negate=False): op = str(node.get("range").strip()) slot = str(node.get("slot", "").strip()) try: restrict = self.op_translate[op.lstrip("r")] except KeyError: raise ValueError(f"unknown operator: {op!r}") if node.text is None: raise ValueError(f"{op!r} node missing version") base = str(node.text.strip()) glob = base.endswith("*") if glob: base = base[:-1] base = cpv.VersionedCPV(f"cat/pkg-{base}") if glob: if op != "eq": raise ValueError(f"glob cannot be used with {op} ops") return packages.PackageRestriction( "fullver", values.StrGlobMatch(base.fullver) ) restrictions = [] if op.startswith("r"): if not base.revision: if op == "rlt": # rlt -r0 can never match # this is a non-range. raise ValueError( f"range {op} version {node.text.strip()} is a guaranteed empty set" ) elif op == "rle": # rle -r0 -> = -r0 return atom_restricts.VersionMatch("=", base.version, negate=negate) elif op == "rge": # rge -r0 -> ~ return atom_restricts.VersionMatch("~", base.version, negate=negate) # rgt -r0 passes through to regular ~ + > restrictions.append(atom_restricts.VersionMatch("~", base.version)) restrictions.append( atom_restricts.VersionMatch(restrict, base.version, rev=base.revision), ) if slot: restrictions.append(atom_restricts.SlotDep(slot)) return packages.AndRestriction(*restrictions, negate=negate)
def find_vulnerable_repo_pkgs(glsa_src, repo, grouped=False, arch=None): """generator yielding GLSA restrictions, and vulnerable pkgs from a repo. :param glsa_src: GLSA pkgset to pull vulnerabilities from :param repo: repo to scan for vulnerable packages :param grouped: if grouped, combine glsa restrictions into one restriction (thus yielding a pkg only once) :param arch: arch to scan for, x86 for example """ i = glsa_src.pkg_grouped_iter() if grouped else iter(glsa_src) if arch is None: wrapper = lambda p: p else: if isinstance(arch, str): arch = (arch,) else: arch = tuple(arch) wrapper = lambda p: mutated.MutatedPkg(p, {"keywords": arch}) for restrict in i: matches = caching_iter( wrapper(x) for x in repo.itermatch(restrict, sorter=sorted) ) if matches: yield restrict, matches
[docs] class SecurityUpgrades(metaclass=generic_equality): """Set of packages for available security upgrades.""" pkgcore_config_type = ConfigHint( types={"ebuild_repo": "ref:repo", "vdb": "ref:vdb"}, typename="pkgset" ) __attr_comparison__ = ("arch", "glsa_src", "vdb") def __init__(self, ebuild_repo, vdb, arch): self.glsa_src = GlsaDirSet(ebuild_repo) self.vdb = vdb self.arch = arch def __iter__(self): for glsa, _matches in find_vulnerable_repo_pkgs( self.glsa_src, self.vdb, grouped=True, arch=self.arch ): yield packages.KeyedAndRestriction(glsa[0], restriction.Negate(glsa[1]))