Source code for pkgcore.ebuild.pkg_updates

from collections import defaultdict, deque
from operator import itemgetter

from snakeoil.demandload import demand_compile_regexp
from snakeoil.osutils import listdir_files, pjoin
from snakeoil.sequences import iflatten_instance

from ..log import logger
from .atom import atom


def _scan_directory(path, eapi):
    files = []
    for filename in listdir_files(path):
        match = eapi.options.update_regex.match(filename)
        if match is not None:
            files.append(filename)
        else:
            logger.error(f"incorrectly named update file: {filename!r}")
    return sorted(files)


[docs] def read_updates(path, eapi): def f(): d = deque() return [d, d] # mods tracks the start point [0], and the tail, [1]. # via this, pkg moves into a specific pkg can pick up # changes past that point, while ignoring changes prior # to that point. # Afterwards, we flatten it to get a per cp chain of commands. # no need to do lookups basically, although we do need to # watch for cycles. mods = defaultdict(f) moved = {} try: for fp in _scan_directory(path, eapi): with open(pjoin(path, fp)) as f: data = (line.rstrip("\n") for line in f) _process_updates(data, fp, mods, moved) except FileNotFoundError: pass # force a walk of the tree, flattening it commands = {k: list(iflatten_instance(v[0], tuple)) for k, v in mods.items()} # filter out empty nodes. commands = {k: v for k, v in commands.items() if v} return commands
def _process_updates(sequence, filename, mods, moved): for lineno, raw_line in enumerate(sequence, 1): line = raw_line.strip() if not line: logger.error(f"file {filename!r}: empty line {lineno}") continue elif line != raw_line: logger.error( f"file {filename!r}: extra whitespace in {raw_line!r} on line {lineno}" ) line = line.split() if line[0] == "move": if len(line) != 3: logger.error( f"file {filename!r}: {raw_line!r} on line {lineno}: bad move form" ) continue src, trg = atom(line[1]), atom(line[2]) if src.fullver is not None: logger.error( f"file {filename!r}: {raw_line!r} on line {lineno}: " f"atom {src} must be versionless" ) continue elif trg.fullver is not None: logger.error( f"file {filename!r}: {raw_line!r} on line {lineno}: " f"atom {trg} must be versionless" ) continue if src.key in moved: logger.warning( f"file {filename!r}: {raw_line!r} on line {lineno}: " f"{src} was already moved to {moved[src.key]}, " "this line is redundant" ) continue d = deque() mods[src.key][1].extend([("move", src, trg), d]) # start essentially a new checkpoint in the trg mods[trg.key][1].append(d) mods[trg.key][1] = d moved[src.key] = trg elif line[0] == "slotmove": if len(line) != 4: logger.error( f"file {filename!r}: {raw_line!r} on line {lineno}: " "bad slotmove form" ) continue src = atom(line[1]) if src.key in moved: logger.warning( f"file {filename!r}: {raw_line!r} on line {lineno}: " f"{src} was already moved to {moved[src.key]}, " "this line is redundant" ) continue elif src.slot is not None: logger.error( f"file {filename!r}: {raw_line!r} on line {lineno}: " "slotted atom makes no sense for slotmoves" ) continue src_slot = atom(f"{src}:{line[2]}") trg_slot = atom(f"{src.key}:{line[3]}") mods[src.key][1].append(("slotmove", src_slot, line[3])) else: logger.error( f"file {filename!r}: {raw_line!r} on line {lineno}: unknown command" )