Source code for pkgcore.util.commandline

"""Utilities for writing commandline utilities.

pkgcore scripts should use the :obj:`ArgumentParser` subclass here for a
consistent commandline "look and feel" (and it tries to make life a
bit easier too). They will probably want to use :obj:`main` from an C{if
__name__ == '__main__'} block too: it will take care of things like
consistent exception handling.

See dev-notes/commandline.rst for more complete documentation.
"""

import argparse
import os
import sys
from functools import partial
from importlib import import_module

from snakeoil import modules
from snakeoil.cli import arghparse, tool
from snakeoil.log import suppress_logging
from snakeoil.osutils import abspath, normpath, pjoin
from snakeoil.sequences import iflatten_instance, unstable_unique
from snakeoil.strings import pluralism

from .. import const
from ..config import load_config
from ..repository import errors as repo_errors
from ..restrictions import packages, restriction
from . import parserestrict


[docs] class StoreTarget(argparse._AppendAction): """Parse extended package atom syntax and optionally set arguments. Various target arguments are supported including the following: atom An extended atom syntax is supported, see the related section in pkgcore(5). package set Used to define lists of packages, the syntax used for these is @pkgset. For example, the @system and @world package sets are supported. extended globbing Globbing package names or atoms allows for use cases such as ``'far*'`` (merge every package starting with 'far'), ``'dev-python/*::gentoo'`` (merge every package in the dev-python category from the gentoo repo), or even '*' (merge everything). Also, the target '-' allows targets to be read from standard input. """ def __init__(self, *args, **kwargs): self.use_sets = kwargs.pop("use_sets", False) self.allow_ebuild_paths = kwargs.pop("allow_ebuild_paths", False) self.allow_external_repos = kwargs.pop("allow_external_repos", False) self.separator = kwargs.pop("separator", None) kwargs.setdefault("default", ()) super().__init__(*args, **kwargs) def __call__(self, parser, namespace, values, option_string=None): if self.separator is not None: values = values.split(self.separator) if self.use_sets: setattr(namespace, self.use_sets, []) if isinstance(values, str): values = [values] elif values is not None and len(values) == 1 and values[0] == "-": if not sys.stdin.isatty(): values = [x.strip() for x in sys.stdin.readlines() if x.strip() != ""] # reassign stdin to allow interactivity (currently only works for unix) sys.stdin = open("/dev/tty") else: raise argparse.ArgumentError( self, "'-' is only valid when piping data in" ) # override default empty tuple value to appendable list if values: setattr(namespace, self.dest, []) for token in values: if self.use_sets and token.startswith("@"): namespace.sets.append(token[1:]) else: if self.allow_ebuild_paths and token.endswith(".ebuild"): try: repo = getattr( namespace, "repo", namespace.domain.ebuild_repos_raw ) except AttributeError: raise argparse.ArgumentTypeError( "repo or domain must be defined in the namespace" ) if not os.path.exists(token): raise argparse.ArgumentError( self, f"nonexistent ebuild: {token!r}" ) elif not os.path.isfile(token): raise argparse.ArgumentError(self, f"invalid ebuild: {token!r}") if self.allow_external_repos and token not in repo: repo_root_dir = os.path.abspath( pjoin(token, os.pardir, os.pardir, os.pardir) ) try: with suppress_logging(): repo = namespace.domain.add_repo( repo_root_dir, config=namespace.config ) except repo_errors.RepoError as e: raise argparse.ArgumentError(self, f"{token!r} -- {e}") try: restriction = repo.path_restrict(token) except ValueError as e: raise argparse.ArgumentError(self, e) else: try: restriction = parserestrict.parse_match(token) except parserestrict.ParseError as e: parser.error(e) super().__call__( parser, namespace, (token, restriction), option_string=option_string )
CONFIG_ALL_DEFAULT = object()
[docs] class NoDefaultConfigError(argparse.ArgumentError): pass
[docs] class StoreConfigObject(argparse._StoreAction): default_priority = 20 def __init__(self, *args, **kwargs): self.priority = int(kwargs.pop("priority", self.default_priority)) self.config_type = kwargs.pop("config_type", None) if self.config_type is None or not isinstance(self.config_type, str): raise ValueError("config_type must specified, and be a string") if kwargs.pop("get_default", False): kwargs["default"] = arghparse.DelayedValue( partial( self.store_default, self.config_type, option_string=kwargs.get("option_strings", [None])[0], ), self.priority, ) self.store_name = kwargs.pop("store_name", False) self.writable = kwargs.pop("writable", None) self.target = argparse._StoreAction(*args, **kwargs) super().__init__(*args, **kwargs) @staticmethod def _choices(sections): """Yield available values for a given option.""" for k, v in sections.items(): yield k def _load_obj(self, sections, name): obj_type = self.metavar if self.metavar is not None else self.config_type obj_type = obj_type.lower() + " " if obj_type is not None else "" try: val = sections[name] except KeyError: choices = ", ".join(self._choices(sections)) if choices: choices = f" (available: {choices})" raise argparse.ArgumentError( self, f"couldn't find {obj_type}{name!r}{choices}" ) if self.writable and getattr(val, "frozen", False): raise argparse.ArgumentError(self, f"{obj_type}{name!r} is readonly") if self.store_name: return name, val return val def __call__(self, parser, namespace, values, option_string=None): setattr( namespace, self.dest, arghparse.DelayedParse( partial(self._real_call, parser, namespace, values, option_string), self.priority, ), ) def _get_sections(self, config, namespace): return getattr(config.objects, self.config_type) def _real_call(self, parser, namespace, values, option_string=None): config = getattr(namespace, "config", None) if config is None: raise ValueError("no config found, internal bug") sections = self._get_sections(config, namespace) if self.nargs == argparse.ZERO_OR_MORE and values == []: values = list(sections.keys()) if values is CONFIG_ALL_DEFAULT: value = [self._load_obj(sections, x) for x in sections] elif isinstance(values, str): value = self._load_obj(sections, values) else: value = [self._load_obj(sections, x) for x in values] setattr(namespace, self.dest, value)
[docs] @staticmethod def store_default(config_type, namespace, attr, option_string=None): config = getattr(namespace, "config", None) if config is None: raise argparse.ArgumentTypeError( "no config found -- internal bug, or broken on disk configuration" ) obj = config.get_default(config_type) if obj is None: known_objs = sorted(getattr(config.objects, config_type).keys()) msg = f"config error: no default object of type {config_type!r} found. " if not option_string: msg += "Please fix your configuration." else: msg += ( "Please either fix your configuration, or set the " f"{config_type} via the {option_string} option." ) if known_objs: msg += f"Known {config_type}s: {', '.join(map(repr, known_objs))}" raise NoDefaultConfigError(None, msg) setattr(namespace, attr, obj)
[docs] @staticmethod def store_all_default(config_type, namespace, attr): config = getattr(namespace, "config", None) if config is None: raise ValueError("no config found -- internal bug") obj = [(k, v) for k, v in getattr(config, config_type).items()] setattr(namespace, attr, obj)
[docs] @classmethod def lazy_load_object(cls, config_type, key, priority=None): if priority is None: priority = cls.default_priority return arghparse.DelayedValue( partial(cls._lazy_load_object, config_type, key), priority )
@staticmethod def _lazy_load_object(config_type, key, namespace, attr): try: obj = getattr(namespace.config, config_type)[key] except KeyError: raise argparse.ArgumentError(None, f"couldn't find {config_type} {attr!r}") setattr(namespace, attr, obj)
[docs] class StoreRepoObject(StoreConfigObject): """Load a repo object from the config.""" # mapping between supported repo type requests and the related attr on # domain objects to pull the requested repos from valid_repo_types = { "config": "repo_configs", "all": "repos", "all-raw": "repos_raw", "source": "source_repos", "source-raw": "source_repos_raw", "installed": "installed_repos", "installed-raw": "installed_repos_raw", "unfiltered": "unfiltered_repos", "ebuild": "ebuild_repos", "ebuild-unfiltered": "ebuild_repos_unfiltered", "ebuild-raw": "ebuild_repos_raw", "binary": "binary_repos", "binary-unfiltered": "binary_repos_unfiltered", "binary-raw": "binary_repos_raw", } def __init__(self, *args, **kwargs): if "config_type" in kwargs: raise ValueError( f"StoreRepoObject: config_type keyword is redundant: got {kwargs['config_type']}" ) self.repo_type = kwargs.pop("repo_type", "all") if self.repo_type not in self.valid_repo_types: raise argparse.ArgumentTypeError(f"unknown repo type: {self.repo_type!r}") self.repo_key = self.valid_repo_types[self.repo_type] self.allow_aliases = set(kwargs.pop("allow_aliases", ())) if self.allow_aliases: unknown_aliases = self.allow_aliases.difference(self.valid_repo_types) if unknown_aliases: es = pluralism(unknown_aliases, plural="es") raise argparse.ArgumentTypeError( f"unknown repo alias{es}: {', '.join(unknown_aliases)}" ) if self.repo_type == "config": kwargs["config_type"] = "repo_config" else: kwargs["config_type"] = "repo" self.allow_name_lookup = kwargs.pop("allow_name_lookup", True) self.allow_external_repos = kwargs.pop("allow_external_repos", False) super().__init__(*args, **kwargs) def _get_sections(self, config, namespace): domain = getattr(namespace, "domain", None) # return repo config objects if domain is None or self.repo_type == "config": return StoreConfigObject._get_sections(self, config, namespace) self.config = config self.domain = config.get_default("domain") # return the type of repos requested return getattr(self.domain, self.repo_key) @staticmethod def _choices(sections): """Return an iterable of name: location mappings for available repos. If a repo doesn't have a proper location just the name is returned. """ for repo_name, repo in sorted(unstable_unique(sections.items())): repo_name = getattr(repo, "repo_id", repo_name) if hasattr(repo, "location"): yield f"{repo_name}:{repo.location}" else: yield repo_name def _load_obj(self, sections, name): repo = name if not self.allow_name_lookup or repo in sections: # requested repo exists in the config pass elif name in self.allow_aliases and self.valid_repo_types[name]: # pull repos related to given alias return getattr(self.domain, self.valid_repo_types[name]) else: # name wasn't found, check repo aliases for it for repo_name, repo_obj in sections.items(): if repo in repo_obj.aliases: repo = repo_name break else: # try to add it as an external repo if self.allow_external_repos and os.path.exists(repo): try: configure = not self.repo_type.endswith("-raw") with suppress_logging(): repo_obj = self.domain.add_repo( repo, config=self.config, configure=configure ) repo = repo_obj.location except repo_errors.RepoError as e: raise argparse.ArgumentError(self, e) if hasattr(self.domain, "_" + self.repo_key): # force JIT-ed attr refresh to include newly added repo setattr(self.domain, "_" + self.repo_key, None) sections = getattr(self.domain, self.repo_key) return StoreConfigObject._load_obj(self, sections, repo)
[docs] class DomainFromPath(StoreConfigObject): def __init__(self, *args, **kwargs): super().__init__(*args, config_type="domain", **kwargs) def _load_obj(self, sections, requested_path): targets = list(find_domains_from_path(sections, requested_path)) if not targets: raise ValueError(f"couldn't find domain at path {requested_path!r}") elif len(targets) != 1: domains = ", ".join(repr(x[0]) for x in targets) raise ValueError( f"multiple domains claim root {requested_path!r}: domains {domains}" ) return targets[0][1]
[docs] def find_domains_from_path(sections, path): path = normpath(abspath(path)) for name, domain in sections.items(): root = getattr(domain, "root", None) if root is None: continue root = normpath(abspath(root)) if root == path: yield name, domain
[docs] class BooleanQuery(arghparse.DelayedValue): def __init__(self, attrs, klass_type=None, priority=100, converter=None): if klass_type == "and": self.klass = packages.AndRestriction elif klass_type == "or": self.klass = packages.OrRestriction elif callable(klass_type): self.klass = klass_type else: raise ValueError( "klass_type either needs to be 'or', 'and', " f"or a callable. Got {klass_type!r}" ) if converter is not None and not callable(converter): raise ValueError( "converter either needs to be None, or a callable;" f" got {converter!r}" ) self.converter = converter self.priority = int(priority) self.attrs = tuple(attrs)
[docs] def invokable(self, namespace, attr): l = [] for x in self.attrs: val = getattr(namespace, x, None) if val is None: continue if isinstance(val, bool): # Skip converter call for disabled boolean actions if not val: self.converter = False elif isinstance(val, restriction.base): l.append(val) else: l.extend(val) if self.converter: l = self.converter(l, namespace) l = list(iflatten_instance(l, (restriction.base,))) if len(l) > 1: val = self.klass(*l) elif l: val = l[0] else: val = None setattr(namespace, attr, val)
[docs] def make_query(parser, *args, **kwargs): klass_type = kwargs.pop("klass_type", "or") dest = kwargs.pop("dest", None) if dest is None: raise TypeError("dest must be specified via kwargs") attrs = kwargs.pop("attrs", []) subattr = f"_{dest}" kwargs["dest"] = subattr if kwargs.get("type", False) is None: del kwargs["type"] else: def query(value): return parserestrict.parse_match(value) kwargs.setdefault("type", query) if kwargs.get("metavar", False) is None: del kwargs["metavar"] else: kwargs.setdefault("metavar", dest) final_priority = kwargs.pop("final_priority", None) final_converter = kwargs.pop("final_converter", None) parser.add_argument(*args, **kwargs) bool_kwargs = {"converter": final_converter} if final_priority is not None: bool_kwargs["priority"] = final_priority obj = BooleanQuery(list(attrs) + [subattr], klass_type=klass_type, **bool_kwargs) # note that dict expansion has to be used here; dest=obj would just set a # default named 'dest' parser.set_defaults(**{dest: obj})
[docs] def python_namespace_type(value, module=False, attribute=False): """ return the object from python namespace that value specifies :param value: python namespace, snakeoil.modules for example :param module: if true, the object must be a module :param attribute: if true, the object must be a non-module :raises ValueError: if the conditions aren't met, or import fails """ try: if module: return import_module(value) elif attribute: return modules.load_attribute(value) return modules.load_any(value) except (ImportError, modules.FailedImport) as err: raise argparse.ArgumentTypeError(str(err)) from err
[docs] def register_command(commands, real_type=type): def f(name, bases, scope, real_type=real_type, commands=commands): o = real_type(name, bases, scope) commands.append(o) return o return f
[docs] def store_config(namespace, attr, global_config=()): config = load_config( prepend_sources=tuple(global_config), location=namespace.config_path, **vars(namespace), ) setattr(namespace, attr, config)
def _mk_domain(parser, help=True): parser.add_argument( "--domain", get_default=True, config_type="domain", action=StoreConfigObject, help=( "custom pkgcore domain to use for this operation" if help else argparse.SUPPRESS ), ) class _SubParser(arghparse._SubParser): def add_parser(self, name, config=False, domain=False, **kwds): """Suppress config and domain options in subparsers by default. They are rarely used so only allow them as options to the base command. """ return super().add_parser(name, config=config, domain=domain, **kwds) class _ConfigArg(argparse._StoreAction): """Store given config path location or use the stub config when disabled.""" def __call__(self, parser, namespace, value, option_string=None): if value.lower() in ("false", "no", "n"): path = pjoin(const.DATA_PATH, "stubconfig") else: path = arghparse.existent_path(value) setattr(namespace, self.dest, path)
[docs] class ArgumentParser(arghparse.ArgumentParser): def __init__( self, suppress=False, help=True, config=True, domain=True, global_config=(), **kwds, ): super().__init__(suppress=suppress, **kwds) self.register("action", "parsers", _SubParser) if not suppress: config_opts = self.add_argument_group("config options") if config: config_opts.add_argument( "--config", action=_ConfigArg, dest="config_path", help=( "use custom config or skip loading system config" if help else argparse.SUPPRESS ), docs=""" The path to a custom pkgcore config file or portage config directory can be given to override loading the default system config. Alternatively, an argument of 'false' or 'no' will skip loading the system config entirely if one exists. """, ) self.set_defaults( config=arghparse.DelayedValue( partial(store_config, global_config=global_config) ) ) if domain: _mk_domain(config_opts, help)
[docs] def convert_to_restrict(sequence, default=packages.AlwaysTrue): """Convert an iterable to a list of atoms, or return the default""" l = [] try: for x in sequence: l.append(parserestrict.parse_match(x)) except parserestrict.ParseError as e: raise argparse.ArgumentError(f"invalid atom: {x!r}: {e}") from e return l or [default]
[docs] class Tool(tool.Tool): """pkgcore-specific commandline utility functionality."""
[docs] def pre_parse(self, *args, **kwargs): """Pass down pkgcore-specific settings to the bash side.""" # pass down verbosity level to affect debug output if self.parser.debug: os.environ["PKGCORE_DEBUG"] = str(self.parser.verbosity)