Source code for pkgcore.ebuild.digest

"""
ebuild tree manifest/digest support
"""

__all__ = ("parse_manifest", "Manifest")

import errno
import operator
import os

from snakeoil.chksum import get_handler
from snakeoil.mappings import ImmutableDict

from .. import gpg
from ..fs.livefs import iter_scan
from ..package import errors
from . import cpv


def _write_manifest(handle, chf, filename, chksums):
    """Convenient, internal method for writing manifests"""
    size = chksums.pop("size")
    handle.write("%s %s %i" % (chf.upper(), filename, size))
    for chf in sorted(chksums):
        handle.write(" %s %s" % (chf.upper(), get_handler(chf).long2str(chksums[chf])))
    handle.write("\n")


def convert_chksums(iterable):
    for chf, sum in iterable:
        chf = chf.lower()
        if chf == "size":
            # explicit size entries are stupid, format has implicit size
            continue
        else:
            yield chf, int(sum, 16)


[docs] def parse_manifest(source, ignore_gpg=True): types = {"DIST": {}, "AUX": {}, "EBUILD": {}, "MISC": {}} # manifest v2 format: (see glep 44 for exact rules) # TYPE filename size (CHF sum)+ # example 'type' entry, all one line # MISC metadata.xml 219 RMD160 613195ece366b33606e71ff1753be048f2507841 SHA1 d162fb909241ef50b95a3539bdfcde95429bdf81 SHA256 cbd3a20e5c89a48a842f7132fe705bf39959f02c1025052efce8aad8a8baa8dc # manifest v1 format is # CHF sum filename size # note that we do _not_ support manifest1 chf_types = {"size"} f = None try: if isinstance(source, str): i = f = open(source, "r", 32768) else: i = f = source.text_fileobj() if ignore_gpg: i = gpg.skip_signatures(f) for data in i: line = data.split() if not line: continue d = types.get(line[0]) if d is None: raise errors.ParseChksumError( source, f"unknown manifest type: {line[0]}: {line!r}" ) if len(line) % 2 != 1: raise errors.ParseChksumError( source, "manifest 2 entry doesn't have right " f"number of tokens, {len(line)}: {line!r}", ) chf_types.update(line[3::2]) # this is a trick to do pairwise collapsing; # [size, 1] becomes [(size, 1)] i = iter(line[3:]) d[line[1]] = [("size", int(line[2]))] + list(convert_chksums(zip(i, i))) except (IndexError, ValueError): raise errors.ParseChksumError(source, "invalid data format") finally: if f is not None and f.close: f.close() for t, d in types.items(): types[t] = ImmutableDict((k, dict(v)) for k, v in d.items()) # ordering annoyingly matters. bad api. return [types[x] for x in ("DIST", "AUX", "EBUILD", "MISC")]
[docs] class Manifest: def __init__(self, path, enforce_gpg=False, thin=False, allow_missing=False): self.path = path self.thin = thin self.allow_missing = allow_missing self._gpg = enforce_gpg self._sourced = False def _pull_manifest(self): if self._sourced: return try: data = parse_manifest(self.path, ignore_gpg=self._gpg) except EnvironmentError as e: if not (self.thin or self.allow_missing) or e.errno != errno.ENOENT: raise errors.ParseChksumError(self.path, e) from e data = {}, {}, {}, {} except errors.ChksumError as e: # recreate cpv from manifest path catpn = os.sep.join(self.path.split(os.sep)[-3:-1]) pkg = cpv.UnversionedCPV(catpn) raise errors.MetadataException(pkg, "manifest", str(e)) self._dist, self._aux, self._ebuild, self._misc = data self._sourced = True
[docs] def update(self, fetchables, chfs=None): """Update the related Manifest file. :param fetchables: fetchables of the package """ if self.thin and not fetchables: # Manifest files aren't necessary with thin manifests and no distfiles return _key_sort = operator.itemgetter(0) excludes = frozenset(["CVS", ".svn", "Manifest"]) aux, ebuild, misc = {}, {}, {} if not self.thin: filesdir = "/files/" for obj in iter_scan( "/", offset=os.path.dirname(self.path), chksum_types=chfs ): if not obj.is_reg: continue pathname = obj.location if excludes.intersection(pathname.split("/")): continue if pathname.startswith(filesdir): d = aux pathname = pathname[len(filesdir) :] elif obj.dirname == "/": pathname = pathname[1:] if obj.location[-7:] == ".ebuild": d = ebuild else: d = misc else: raise Exception( "Unexpected directory found in %r; %r" % (self.path, obj.dirname) ) d[pathname] = dict(obj.chksums) handle = open(self.path, "w") # write it in alphabetical order; aux gets flushed now. for path, chksums in sorted(aux.items(), key=_key_sort): _write_manifest(handle, "AUX", path, chksums) # next dist... for fetchable in sorted(fetchables, key=operator.attrgetter("filename")): _write_manifest( handle, "DIST", os.path.basename(fetchable.filename), dict(fetchable.chksums), ) # then ebuild and misc for mtype, inst in (("EBUILD", ebuild), ("MISC", misc)): for path, chksum in sorted(inst.items(), key=_key_sort): _write_manifest(handle, mtype, path, chksum)
@property def aux_files(self): self._pull_manifest() return self._aux @property def distfiles(self): self._pull_manifest() return self._dist @property def ebuilds(self): self._pull_manifest() return self._ebuild @property def misc(self): self._pull_manifest() return self._misc