Source code for pkgcore.ebuild.portageq

import os
from functools import partial

from snakeoil import osutils
from snakeoil.cli import arghparse

from ..restrictions.boolean import AndRestriction
from ..util import commandline, packages
from . import atom, conditionals
from .eapi import get_eapi


[docs] def str_pkg(pkg): pkg = packages.get_raw_pkg(pkg) # special casing; old style virtuals come through as the original pkg. if pkg.package_is_real: return pkg.cpvstr if hasattr(pkg, "actual_pkg"): return pkg.actual_pkg.cpvstr # icky, but works. return str(pkg.rdepend).lstrip("=")
[docs] def get_atom_kls(value): eapi = get_eapi(value) if eapi is None: raise ValueError(f"EAPI {value} isn't known/supported") return eapi.atom_kls
[docs] def default_portageq_args(parser): parser.add_argument( "--eapi", dest="atom_kls", type=get_atom_kls, default=atom.atom, help="limit all operations to just what the given EAPI supports.", ) parser.add_argument( "--use", default=None, help="override the use flags used for transititive USE deps- " "dev-lang/python[threads=] for example", )
[docs] def make_atom(value): return arghparse.DelayedValue(partial(_render_atom, value), 100)
def _render_atom(value, namespace, attr): a = namespace.atom_kls(value) if isinstance(a, atom.transitive_use_atom): a.restrictions # XXX bit of a hack. a = conditionals.DepSet(a.restrictions, atom.atom, True) a = a.evaluate_depset(getattr(namespace, "use", ())) a = AndRestriction(*a.restrictions) setattr(namespace, attr, a)
[docs] class BaseCommand(arghparse.ArgparseCommand): required_arg_count = 0 has_optional_args = False arg_spec = ()
[docs] def bind_to_parser(self, parser, compat=False): arghparse.ArgparseCommand.bind_to_parser(self, parser) default_portageq_args(parser) if self.requires_root: if compat: kwds = {} if self._compat_root_default: kwds["nargs"] = "?" kwds["default"] = self._compat_root_default parser.add_argument( dest="domain", metavar="root", action=commandline.DomainFromPath, help="the domain that lives at root will be used", **kwds, ) else: mux = parser.add_mutually_exclusive_group() commandline._mk_domain(mux) mux.add_argument( "--domain-at-root", action=commandline.DomainFromPath, dest="domain", help="specify the domain to use via its root path", ) for token in self.arg_spec: kwds = {} if token[-1] in "+?*": kwds["nargs"] = token[-1] token = token[:-1] if token == "atom": parser.add_argument( "atom", help="atom to inspect", type=make_atom, **kwds ) else: parser.add_argument(token, help=f"{token} to inspect", **kwds)
[docs] @classmethod def make_command( cls, arg_spec="", requires_root=True, bind=None, root_default=None, name=None, **kwds, ): kwds = dict( arg_spec=tuple(arg_spec.split()), requires_root=requires_root, _compat_root_default=root_default, **kwds, ) def internal_function(functor, name=name): class mycommand(BaseCommand): function = __call__ = staticmethod(functor) __doc__ = getattr(functor, "__doc__", None) locals().update(kwds) if name is None: name = functor.__name__ mycommand.__name__ = name if bind is not None: bind.append(mycommand) return mycommand return internal_function
common_commands = [] query_commands = [] portageq_commands = []
[docs] @BaseCommand.make_command("variable+", bind=query_commands) def env_var(options, out, err): """ return configuration defined variables. """ for x in options.variable: val = options.domain.settings.get(x, "") if not isinstance(val, str): val = " ".join(val) out.write(str(val)) return 0
[docs] @BaseCommand.make_command( "variable+", bind=portageq_commands, name="envvar", root_default="/" ) def portageq_envvar(options, out, err): """ return configuration defined variables. Use envvar2 instead, this will be removed. """ return env_var.function(options, out, err)
[docs] @BaseCommand.make_command("variable+", bind=portageq_commands, name="envvar2") def portageq_envvar2(options, out, err): """ return configuration defined variables. """ return env_var.function(options, out, err)
[docs] @BaseCommand.make_command("atom", bind=common_commands) def has_version(options, out, err): """ Return 0 if an atom is merged, 1 if not. """ if options.atom in options.domain.all_installed_repos: return 0 return 1
def _best_version(domain, restrict): try: p = max(domain.all_installed_repos.itermatch(restrict)) except ValueError: # empty sequence. return "" return str_pkg(p)
[docs] @BaseCommand.make_command("atom+", bind=common_commands) def mass_best_version(options, out, err): """ multiple best_version calls. """ for x in options.atom: out.write("%s:%s" % (x, _best_version(options.domain, x).rstrip())) return 0
[docs] @BaseCommand.make_command("atom", bind=common_commands) def best_version(options, out, err): """ Return the maximum visible version for a given atom. """ out.write(_best_version(options.domain, options.atom)) return 0
[docs] @BaseCommand.make_command("atom", bind=portageq_commands) def match(options, out, err): """shorthand for `pquery --installed`""" i = options.domain.all_installed_repos.itermatch(options.atom, sorter=sorted) for pkg in i: out.write(str_pkg(pkg)) return 0
[docs] @BaseCommand.make_command(bind=common_commands, root_default="/") def get_repos(options, out, err): l = [] for repo in options.domain.ebuild_repos_raw: repo_id = getattr(repo, "repo_id", getattr(repo, "location", None)) l.append(repo_id) for x in sorted(set(l)): out.write(x) return 0
[docs] def find_profile_paths_by_repo_id(config, repo_id, fullpath=False): repo = config.repo.get(repo_id, None) if repo is not None and getattr(repo, "location", None) is not None: profiles = repo.config.profiles.arch_profiles for arch in profiles.keys(): for path, stability in profiles[arch]: if fullpath: path = os.path.join(repo.location, "profiles", path) yield path
[docs] @BaseCommand.make_command("repo_id", bind=query_commands) def get_profiles(options, out, err): if options.repo_id == "all": profiles = ( profile for repo in options.domain.ebuild_repos_raw for profile in find_profile_paths_by_repo_id( options.config, repo.repo_id, fullpath=True ) ) else: profiles = find_profile_paths_by_repo_id(options.config, options.repo_id) for x in sorted(set(profiles)): out.write(x) return 0
[docs] @BaseCommand.make_command("repo_id", bind=portageq_commands) def get_repo_path(options, out, err): repo = options.config.repo.get(options.repo_id, None) if repo is not None and getattr(repo, "location", None) is not None: out.write(repo.location) return 0 return 1
get_repo_path = BaseCommand.make_command( "repo_id", bind=query_commands, name="get_repo_path" )(get_repo_path.function)
[docs] @BaseCommand.make_command("repo_id", bind=portageq_commands) def get_repo_news_path(options, out, err): repo = options.config.repo.get(options.repo_id, None) if repo is not None and getattr(repo, "location", None) is not None: out.write(osutils.normpath(osutils.pjoin(repo.location, "metadata", "news"))) return 0 return 1
[docs] def bind_parser(parser, compat=False, name="portageq"): subparsers = parser.add_subparsers(description=f"{name} commands") l = common_commands[:] if compat: l += portageq_commands else: l += query_commands for command in sorted(l, key=lambda x: x.__name__): subparser = subparsers.add_parser( command.__name__, help=command.__doc__, description=command.__doc__ ) command().bind_to_parser(subparser, compat=compat)