Source code for pkgcore.ebuild.formatter

"""pmerge formatting module"""

__all__ = (
    "Formatter",
    "use_expand_filter",
    "BasicFormatter",
    "PkgcoreFormatter",
    "CountingFormatter",
    "PortageFormatter",
    "PortageVerboseFormatter",
)

import operator
import os

from snakeoil.cli.input import userquery
from snakeoil.mappings import defaultdictkey
from snakeoil.osutils import pjoin, sizeof_fmt
from snakeoil.sequences import iflatten_instance
from snakeoil.strings import pluralism

from ..config.hint import ConfigHint
from ..log import logger


[docs] class use_expand_filter: def __init__(self, use_expand, use_expand_hidden): """ :type use_expand: iterable of strings :param use_expand: names of use-expanded variables. :type use_expand_hidden: set of strings :param use_expand_hidden: names of use-expanded vars that should not be added to the dict. """ self.expand_filters = { x.lower(): (x not in use_expand_hidden, x) for x in use_expand } self.use_expand = use_expand self.use_expand_hidden = use_expand_hidden self.known_flags = {} def __call__(self, use): """Split USE flags up into "normal" flags and use-expanded ones. :type use: iterable of strings :param use: flags that are set. :rtype: sequence of strings, dict mapping a string to a list of strings :return: set of normal flags and a mapping from use_expand name to value (with the use-expanded bit stripped off, so C{"video_cards_alsa"} becomes C{"{'video_cards': ['alsa']}"}). """ # XXX: note this is fairly slow- actually takes up more time then # chunks of the resolver ue_dict = {} usel = [] ef = self.expand_filters kf = self.known_flags for flag in use: data = kf.get(flag) if data is None: split_flag = flag.rsplit("_", 1) while len(split_flag) == 2: if split_flag[0] not in ef: split_flag = split_flag[0].rsplit("_", 1) continue expand_state = ef[split_flag[0]] if expand_state[0]: # not hidden kf[flag] = data = ( expand_state[1], flag[len(split_flag[0]) + 1 :], ) else: kf[flag] = data = False break else: kf[flag] = data = True if data is True: # straight use flag. usel.append(flag) elif data: # non hidden flag. if not data[0] in ue_dict: ue_dict[data[0]] = set([data[1]]) else: ue_dict[data[0]].add(data[1]) return frozenset(usel), ue_dict
[docs] class Formatter: """Base Formatter class: All formatters should be subclasses of this.""" pkgcore_config_type = ConfigHint(typename="pmerge_formatter", raw_class=True) def __init__(self, **kwargs): self.__dict__.update(kwargs)
[docs] def format(self, op): """Formats an op. Subclasses must define this method""" raise NotImplementedError(self.format)
[docs] def ask(self, question, responses=None, default_answer=None, limit=3): return userquery(question, self.out, self.err, responses, default_answer, limit)
[docs] def end(self): """Called at the end, normally for summary information"""
[docs] class BasicFormatter(Formatter): """A basic formatter, intended for scripts"""
[docs] def format(self, op): self.out.write(op.pkg.key)
class VerboseFormatter(Formatter): """Formatter with output forced into verbose mode.""" def __init__(self, **kwargs): kwargs["verbosity"] = 1 super().__init__(**kwargs)
[docs] class PkgcoreFormatter(Formatter): """The original pkgcore output"""
[docs] def format(self, op): repo = getattr(op.pkg.repo, "repo_id", None) if not repo: p = str(op.pkg.cpvstr) else: p = f"{op.pkg.cpvstr}::{repo}" if op.desc == "replace": self.out.write(f"replace {op.old_pkg.cpvstr}, {p}") else: self.out.write(f"{op.desc.ljust(7)} {p}")
[docs] class CountingFormatter(Formatter): """Subclass for formatters that count packages""" def __init__(self, **kwargs): kwargs.setdefault("verbosity", 0) super().__init__(**kwargs) self.package_data = defaultdictkey(lambda x: 0) # total download size for all pkgs to be merged self.download_size = 0
[docs] def visit_op(self, op_type): """Track the number of package operations of each type.""" self.package_data[op_type] += 1
[docs] def end(self): """Output total package, operation, and download size counts.""" self.out.write() if self.verbosity > 0: total = sum(self.package_data.values()) self.out.write(f"Total: {total} package{pluralism(total)}", autoline=False) d = dict(self.package_data.items()) op_types = ( ("add", "new"), ("upgrade", "upgrade"), ("downgrade", "downgrade"), ("slotted_add", "in new slot"), ("replace", "reinstall"), ) op_list = [] for op_type, op_str in op_types: num_ops = d.pop(op_type, 0) if num_ops: if op_str == "new": op_list.append(f"{num_ops} {op_str}") else: op_list.append(f"{num_ops} {op_str}{pluralism(num_ops)}") if d: op_list.append(f"{len(d)} other op{pluralism(d)}") if op_list: self.out.write(f" ({', '.join(op_list)})", autoline=False) if self.download_size: self.out.write( ", Size of downloads: ", sizeof_fmt(self.download_size), autoline=False, ) self.out.write()
[docs] class PortageFormatter(CountingFormatter): """Formatter designed to resemble portage output.""" def __init__(self, **kwargs): kwargs.setdefault("use_expand", set()) kwargs.setdefault("use_expand_hidden", set()) super().__init__(**kwargs) self.use_splitter = use_expand_filter(self.use_expand, self.use_expand_hidden) # Map repo location to an index. self.repos = {} # set of files to be downloaded self.downloads = set()
[docs] def format(self, op): # <type> - ebuild, block or nomerge (for --tree) # N - new package # R - rebuild package # F - fetch restricted # f - fetch restricted already downloaded # D - downgrade # U - updating to another version # # - masked # * - missing keyword # ~ - unstable keyword # Caveats: # - U and D are both displayed to show a downgrade - this is kept # in order to be consistent with existing portage behaviour out = self.out origautoline = out.autoline out.autoline = False self.pkg_disabled_use = self.pkg_forced_use = set() if hasattr(self, "pkg_get_use"): self.pkg_forced_use, _, self.pkg_disabled_use = self.pkg_get_use(op.pkg) # This is for the summary at the end if self.quiet_repo_display: self.repos.setdefault(op.pkg.repo, len(self.repos) + 1) pkg_is_bold = any(x.match(op.pkg) for x in getattr(self, "world_list", ())) # We don't do blockers or --tree stuff yet data = ["["] pkg_coloring = [] if pkg_is_bold: pkg_coloring.append(out.bold) if op.desc == "remove": pkg_coloring.insert(0, out.fg("red")) data += pkg_coloring + ["uninstall"] elif getattr(op.pkg, "built", False): pkg_coloring.insert(0, out.fg("magenta")) data += pkg_coloring + ["binary"] else: pkg_coloring.insert(0, out.fg("green")) data += pkg_coloring + ["ebuild"] data += [out.reset, " "] out.write(*data) # Order is important here - look at the above diagram op_type = op.desc op_chars = [[" "] for x in range(7)] if "fetch" in op.pkg.restrict: if all(os.path.isfile(pjoin(self.distdir, f)) for f in op.pkg.distfiles): fetched = [out.fg("green"), out.bold, "f", out.reset] else: fetched = [out.fg("red"), out.bold, "F", out.reset] op_chars[3] = fetched if op.desc == "add": op_chars[1] = [out.fg("green"), out.bold, "N", out.reset] if op.pkg.slot != "0" and self.installed_repos.match( op.pkg.unversioned_atom ): op_chars[2] = [out.fg("green"), out.bold, "S", out.reset] op_type = "slotted_add" elif op.desc == "replace": if op.pkg == op.old_pkg: op_chars[2] = [out.fg("yellow"), out.bold, "R", out.reset] else: op_chars[4] = [out.fg("cyan"), out.bold, "U", out.reset] if op.pkg > op.old_pkg: op_type = "upgrade" else: op_chars[5] = [out.fg("blue"), out.bold, "D", out.reset] op_type = "downgrade" elif op.desc == "remove": pass else: logger.warning("unformattable op type: desc(%r), %r", op.desc, op) if self.verbosity > 0: if ( self.unstable_arch in op.pkg.keywords and self.unstable_arch not in op.pkg.repo.domain_settings["ACCEPT_KEYWORDS"] ): op_chars[6] = [out.fg("yellow"), out.bold, "~", out.reset] elif not op.pkg.keywords: op_chars[6] = [out.fg("red"), out.bold, "*", out.reset] else: if op.pkg.repo.masked.match(op.pkg.versioned_atom): op_chars[6] = [out.fg("red"), out.bold, "#", out.reset] out.write(*(iflatten_instance(op_chars))) out.write("] ") self.visit_op(op_type) pkg = [op.pkg.cpvstr] if self.verbosity > 0: if op.pkg.subslot != op.pkg.slot: pkg.append(f":{op.pkg.slot}/{op.pkg.subslot}") elif op.pkg.slot != "0": pkg.append(f":{op.pkg.slot}") if not self.quiet_repo_display and op.pkg.source_repository: pkg.append(f"::{op.pkg.source_repository}") out.write(*(pkg_coloring + pkg + [out.reset])) installed = [] if op.desc == "replace": old_pkg = [op.old_pkg.fullver] if self.verbosity > 0: if op.old_pkg.subslot != op.old_pkg.slot: old_pkg.append(f":{op.old_pkg.slot}/{op.old_pkg.subslot}") elif op.old_pkg.slot != "0": old_pkg.append(f":{op.old_pkg.slot}") if not self.quiet_repo_display and op.old_pkg.source_repository: old_pkg.append(f"::{op.old_pkg.source_repository}") if ( op_type != "replace" or op.pkg.source_repository != op.old_pkg.source_repository ): installed = "".join(old_pkg) elif op_type == "slotted_add": if self.verbosity > 0: pkgs = sorted( f"{x.fullver}:{x.slot}" for x in self.installed_repos.match(op.pkg.unversioned_atom) ) else: pkgs = sorted( x.fullver for x in self.installed_repos.match(op.pkg.unversioned_atom) ) installed = ", ".join(pkgs) # output currently installed versions if installed: out.write(" ", out.fg("blue"), out.bold, f"[{installed}]", out.reset) # Build a list of (useflags, use_expand_dicts) tuples. # HACK: if we are in "replace" mode we build a list of length # 4, else this is a list of length 2. We then pass this to # format_use which can take either 2 or 4 arguments. uses = ((), ()) if op.desc == "replace": uses = ( op.pkg.iuse_stripped, op.pkg.use, op.old_pkg.iuse_stripped, op.old_pkg.use, ) elif op.desc == "add": uses = (op.pkg.iuse_stripped, op.pkg.use) stuff = list(map(self.use_splitter, uses)) # Convert the list of tuples to a list of lists and a list of # dicts (both length 2 or 4). uselists, usedicts = list(zip(*stuff)) # output USE flags self.format_use("use", *uselists) # output USE_EXPAND flags for expand in sorted(self.use_expand - self.use_expand_hidden): try: sorter = op.pkg.repo.use_expand_sorter(expand) except AttributeError: # TODO: hack for unit tests using fake repo objs sorter = lambda k: k flaglists = [sorted(d.get(expand, ()), key=sorter) for d in usedicts] self.format_use(expand, *flaglists, sorter=sorter) # output download size if self.verbosity > 0: if not op.pkg.built: downloads = set( f for f in op.pkg.distfiles if not os.path.isfile(pjoin(self.distdir, f)) ) if downloads.difference(self.downloads): self.downloads.update(downloads) size = sum( v["size"] for dist, v in op.pkg.manifest.distfiles.items() if dist in downloads ) if size: self.download_size += size out.write(" ", sizeof_fmt(size)) if self.quiet_repo_display: out.write(out.fg("cyan"), f" [{self.repos[op.pkg.repo]}]") out.write("\n") out.autoline = origautoline
[docs] def format_use( self, attr, pkg_iuse, pkg_use, old_pkg_iuse=None, old_pkg_use=None, sorter=lambda k: k, ): """Write the current selection from a set of flags to a formatter. :type attr: string :param attr: name of the setting :type pkg_iuse: set of strings :param pkg_iuse: all available use flags for the package :type pkg_use: set of strings :param pkg_use: enabled use flags for the package :type old_pkg_iuse: set of strings :param old_pkg_iuse: all available use flags in the previous version :type old_pkg_use: set of strings :param old_pkg_use: enabled use flags in the previous version """ out = self.out red = out.fg("red") green = out.fg("green") blue = out.fg("blue") yellow = out.fg("yellow") bold = out.bold reset = out.reset flags = [] enabled = set(pkg_iuse) & set(pkg_use) disabled = set(pkg_iuse) - set(pkg_use) # updating or rebuilding pkg if old_pkg_iuse is not None and old_pkg_use is not None: old_enabled = set(old_pkg_iuse) & set(old_pkg_use) old_disabled = set(old_pkg_iuse) - set(old_pkg_use) removed = set(old_pkg_iuse) - set(pkg_iuse) for flag in sorted(enabled, key=sorter): expanded_flag = ( "_".join((attr.lower(), flag)) if attr != "use" else flag ) if flag in old_enabled: # unchanged if self.verbosity > 0: if expanded_flag in self.pkg_forced_use: flags.extend(("(", red, bold, flag, reset, ")", " ")) else: flags.extend((red, bold, flag, reset, " ")) elif flag in old_disabled: # toggled if expanded_flag in self.pkg_forced_use: flags.extend(("(", green, bold, flag, reset, "*)", " ")) else: flags.extend((green, bold, flag, reset, "*", " ")) else: # new if expanded_flag in self.pkg_forced_use: flags.extend(("(", yellow, bold, flag, reset, "%*)", " ")) else: flags.extend((yellow, bold, flag, reset, "%*", " ")) for flag in sorted(disabled, key=sorter): expanded_flag = ( "_".join((attr.lower(), flag)) if attr != "use" else flag ) if flag in old_disabled: # unchanged if self.verbosity > 0: if expanded_flag in self.pkg_disabled_use: flags.extend(("(", blue, bold, "-", flag, reset, ")", " ")) else: flags.extend((blue, bold, "-", flag, reset, " ")) elif flag in old_enabled: # toggled if expanded_flag in self.pkg_disabled_use: flags.extend(("(", green, bold, "-", flag, reset, "*)", " ")) else: flags.extend((green, bold, "-", flag, reset, "*", " ")) else: # new if expanded_flag in self.pkg_disabled_use: flags.extend(("(", yellow, bold, "-", flag, reset, "%)", " ")) else: flags.extend((yellow, bold, "-", flag, reset, "%", " ")) if self.verbosity > 0: for flag in sorted(removed, key=sorter): if flag in old_enabled: flags.extend(("(", yellow, bold, "-", flag, reset, "%*)", " ")) else: flags.extend(("(", yellow, bold, "-", flag, reset, "%)", " ")) # new pkg install else: for flag in sorted(enabled, key=sorter): expanded_flag = ( "_".join((attr.lower(), flag)) if attr != "use" else flag ) if expanded_flag in self.pkg_forced_use: flags.extend(("(", red, bold, flag, reset, ")", " ")) else: flags.extend((red, bold, flag, reset, " ")) for flag in sorted(disabled, key=sorter): expanded_flag = ( "_".join((attr.lower(), flag)) if attr != "use" else flag ) if expanded_flag in self.pkg_disabled_use: flags.extend(("(", blue, bold, "-", flag, reset, ")", " ")) else: flags.extend((blue, bold, "-", flag, reset, " ")) # Only write this if we have something to write if flags: out.write(" ", attr.upper(), '="') # Omit the final space. out.write(*flags[:-1]) out.write('"')
[docs] def end(self): """Output package repository list.""" out = self.out if self.verbosity > 0: super().end() out.write() if self.quiet_repo_display: repos = list(self.repos.items()) repos.sort(key=operator.itemgetter(1)) for k, v in repos: reponame = getattr(k, "repo_id", "unknown repo id") location = getattr(k, "location", "unspecified location") if reponame != location: self.out.write( " ", self.out.fg("cyan"), f"[{v}]", self.out.reset, f" {reponame} ({location})", ) else: self.out.write( " ", self.out.fg("cyan"), f"[{v}]", self.out.reset, f" {location}", )
[docs] class PortageVerboseFormatter(VerboseFormatter, PortageFormatter): """Formatter designed to resemble portage output in verbose mode."""