Source code for pkgcore.ebuild.ebd_ipc

import argparse
import grp
import itertools
import locale
import os
import pwd
import re
import shlex
import shutil
import stat
from operator import itemgetter

from snakeoil.cli import arghparse
from snakeoil.compression import ArComp, ArCompError
from snakeoil.contexts import chdir
from snakeoil.decorators import coroutine
from snakeoil.iterables import partition
from snakeoil.osutils import pjoin
from snakeoil.process import spawn

from .. import os_data
from ..exceptions import PkgcoreException, PkgcoreUserException
from . import atom as atom_mod
from . import filter_env, portageq
from .misc import get_relative_dosym_target


[docs] class IpcError(PkgcoreException): """Generic IPC errors.""" def __init__(self, msg="", code=1, name=None, **kwargs): super().__init__(msg, **kwargs) self.msg = msg self.code = code self.name = name self.ret = IpcCommand._encode_ret((code, msg)) def __str__(self): if self.name: return f"{self.name}: {self.msg}" return self.msg
[docs] class IpcInternalError(IpcError): """IPC errors related to internal bugs."""
[docs] class IpcCommandError(IpcError, PkgcoreUserException): """IPC errors related to parsing arguments or running the command."""
[docs] class UnknownOptions(IpcCommandError): """Unknown options passed to IPC command.""" def __init__(self, options): super().__init__(f"unknown options: {', '.join(map(repr, options))}")
[docs] class UnknownArguments(IpcCommandError): """Unknown arguments passed to IPC command.""" def __init__(self, args): super().__init__(f"unknown arguments: {', '.join(map(repr, args))}")
[docs] class IpcArgumentParser(arghparse.ArgumentParser): """Raise IPC exception for argparse errors. Otherwise standard argparse prints the parser usage then outputs the error message to stderr. """ def __init__(self, *args, **kwargs): super().__init__(*args, suppress=True, add_help=False, **kwargs)
[docs] def error(self, msg): raise IpcCommandError(msg)
[docs] class IpcCommand: """Commands sent from the bash side of the ebuild daemon to run.""" # argument parser for internal options parser = None # argument parser for command options/arguments arg_parser = None # override IPC name for error messages name = None def __init__(self, op): self.op = op self.pkg = op.pkg self.eapi = op.pkg.eapi self.observer = op.observer if self.name is None: self.name = self.__class__.__name__.lower() def __call__(self, ebd): self.opts = arghparse.Namespace() self.ebd = ebd ret = 0 # read info from bash side nonfatal = self.read() == "true" self.cwd = self.read() self.phase = self.read() options = shlex.split(self.read()) args = self.read().strip("\0") args = args.split("\0") if args else [] # parse args and run command with chdir(self.cwd): try: args = self.parse_args(options, args) ret = self.run(args) except IpcCommandError as e: if nonfatal: ret = (e.code, e.msg) else: raise IpcCommandError(msg=e.msg, code=e.code, name=self.name) except KeyboardInterrupt: raise except Exception as e: raise IpcInternalError("internal failure") from e # return completion status to the bash side self.write(self._encode_ret(ret)) @staticmethod def _encode_ret(ret): """Encode exit status and any returned value to be sent back to the bash side.""" if ret is None: return 0 elif isinstance(ret, tuple): code, response = ret return f"{code}\x07{response}" elif isinstance(ret, (int, str)): return f"0\x07{ret}" raise TypeError(f"unsupported return status type: {type(ret)}")
[docs] def parse_args(self, options, args): """Parse internal args passed from the bash side.""" if self.parser is not None: _, unknown = self.parser.parse_known_args(options, namespace=self.opts) if unknown: raise UnknownOptions(unknown) if self.arg_parser is not None: # pull user options off the start of the argument list _, args = self.arg_parser.parse_known_optionals(args, namespace=self.opts) # parse remaining command arguments args, unknown = self.arg_parser.parse_known_args(args, namespace=self.opts) if unknown: raise UnknownArguments(unknown) return args
[docs] def run(self, args): """Run the requested IPC command.""" raise NotImplementedError
[docs] def read(self): """Read a line from the ebuild daemon.""" return self.ebd.read().strip()
[docs] def write(self, data): """Write data to the ebuild daemon. Args: data: data to be sent to the bash side """ self.ebd.write(data)
[docs] def warn(self, msg): """Output warning message. Args: msg: message to be output """ self.observer.warn(f"{self.name}: {msg}") self.observer.flush()
def _parse_group(group): try: return grp.getgrnam(group).gr_gid except KeyError: pass return int(group) def _parse_user(user): try: return pwd.getpwnam(user).pw_uid except KeyError: pass return int(user) def _parse_mode(mode): try: return int(mode, 8) except ValueError: return None
[docs] def command_options(s): """Split string of command line options into list.""" return shlex.split(s)
[docs] def existing_path(path): """Check if a given path exists (allows broken symlinks).""" if not os.path.lexists(path): raise argparse.ArgumentTypeError(f"nonexistent path: {path!r}") return path
class _InstallWrapper(IpcCommand): """Python wrapper for commands using `install`.""" parser = IpcArgumentParser() parser.add_argument("--dest", default="/") parser.add_argument("--insoptions", type=command_options) parser.add_argument("--diroptions", type=command_options) # defaults options for file and dir install actions insoptions_default = "" diroptions_default = "" # supported install command options install_parser = IpcArgumentParser() install_parser.add_argument("-g", "--group", default=-1, type=_parse_group) install_parser.add_argument("-o", "--owner", default=-1, type=_parse_user) install_parser.add_argument("-m", "--mode", default=0o755, type=_parse_mode) install_parser.add_argument("-p", "--preserve-timestamps", action="store_true") arg_parser = IpcArgumentParser() arg_parser.add_argument("targets", nargs="+", type=existing_path) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.parser.set_defaults( insoptions=self.insoptions_default, diroptions=self.diroptions_default ) # initialize file/dir creation coroutines self.install = self._install().send self.install_dirs = self._install_dirs().send self.install_symlinks = self._install_symlinks().send self.install_from_dirs = self._install_from_dirs().send def parse_args(self, *args, **kwargs): args = super().parse_args(*args, **kwargs) self.parse_install_options() return args def parse_install_options(self): """Parse install command options.""" self.insoptions = arghparse.Namespace() self.diroptions = arghparse.Namespace() if self.opts.insoptions: if not self._parse_install_options(self.opts.insoptions, self.insoptions): self.install = self._install_cmd().send if self.opts.diroptions: if not self._parse_install_options(self.opts.diroptions, self.diroptions): self.install_dirs = self._install_dirs_cmd().send def _parse_install_options(self, options, namespace): """Internal install command option parser. Args: options: list of options to parse namespace: argparse namespace to populate Returns: True when all options are handled, otherwise False if unknown/unhandled options exist. """ opts, unknown = self.install_parser.parse_known_args( options, namespace=namespace ) if unknown or opts.mode is None: msg = "falling back to 'install'" if unknown: msg += f": unhandled options: {' '.join(map(repr, unknown))}" self.warn(msg) return False return True def run(self, args): try: dest_dir = pjoin(self.op.ED, self.opts.dest.lstrip(os.path.sep)) os.makedirs(dest_dir, exist_ok=True) except OSError as e: raise IpcCommandError(f"failed creating dir: {dest_dir!r}: {e.strerror}") self._install_targets(args.targets) def _prefix_targets(self, targets, files=True): """Prepend targets being installed with the destination path.""" dest_dir = self.opts.dest.lstrip(os.path.sep) if files: return ( (s, pjoin(self.op.ED, dest_dir, d.lstrip(os.path.sep))) for s, d in targets ) return (pjoin(self.op.ED, dest_dir, d.lstrip(os.path.sep)) for d in targets) def _install_targets(self, targets): """Install targets. Args: targets: files/symlinks/dirs/etc to install """ self.install((f, os.path.basename(f)) for f in targets) @coroutine def _install_from_dirs(self): """Install all targets under given directories. Args: iterable of directories to install from """ while True: dirs = yield for d in dirs: base_dir = os.path.basename(d.rstrip(os.path.sep)) for dirpath, dirnames, filenames in os.walk(d): dest_dir = os.path.normpath( pjoin(base_dir, os.path.relpath(dirpath, d)) ) self.install_dirs([dest_dir]) for dirname in dirnames: source = pjoin(dirpath, dirname) if os.path.islink(source): dest = pjoin(dest_dir, dirname) self.install_symlinks([(source, dest)]) if filenames: self.install( (pjoin(dirpath, f), pjoin(dest_dir, f)) for f in filenames ) @staticmethod def _set_attributes(opts, path): """Set file attributes on a given path. Args: path: file/directory path """ try: if opts.owner != -1 or opts.group != -1: os.lchown(path, opts.owner, opts.group) if opts.mode is not None and not os.path.islink(path): os.chmod(path, opts.mode) except OSError as e: raise IpcCommandError( f"failed setting file attributes: {path!r}: {e.strerror}" ) @staticmethod def _set_timestamps(source_stat, dest): """Apply timestamps from source_stat to dest. Args: source_stat: stat result for the source file dest: path to the dest file """ os.utime(dest, ns=(source_stat.st_atime_ns, source_stat.st_mtime_ns)) def _is_install_allowed(self, source, source_stat, dest): """Determine if installing source into dest should work. This aims to aid compatibility with the `install` command. Args: source: path to the source file source_stat: stat result for the source file, using stat() rather than lstat(), in order to match the `install` command dest: path to the dest file Raises: IpcCommandError on failure Returns: True if the install should succeed """ # matching `install` command, use stat() for source and lstat() for dest try: dest_lstat = os.lstat(dest) except FileNotFoundError: # installing file to a new path return True except OSError as e: raise IpcCommandError(f"cannot stat {dest!r}: {e.strerror}") # installing symlink if stat.S_ISLNK(dest_lstat.st_mode): return True # source file and dest file are different if not os.path.samestat(source_stat, dest_lstat): return True # installing hardlink if source and dest are different if dest_lstat.st_nlink > 1 and os.path.realpath(source) != os.path.realpath( dest ): return True raise IpcCommandError(f"{source!r} and {dest!r} are identical") @coroutine def _install(self): """Install files. Args: iterable of (source, dest) tuples of files to install Raises: IpcCommandError on failure """ while True: files = yield # TODO: skip/warn installing empty files for source, dest in self._prefix_targets(files): try: sstat = os.stat(source) except OSError as e: raise IpcCommandError(f"cannot stat {source!r}: {e.strerror}") self._is_install_allowed(source, sstat, dest) # matching `install` command, remove dest before file install try: os.unlink(dest) except FileNotFoundError: pass except OSError as e: raise IpcCommandError( f"failed removing file: {dest!r}: {e.strerror}" ) try: shutil.copyfile(source, dest, follow_symlinks=False) if self.insoptions: self._set_attributes(self.insoptions, dest) if self.insoptions.preserve_timestamps: self._set_timestamps(sstat, dest) except OSError as e: raise IpcCommandError( f"failed copying file: {source!r} to {dest!r}: {e.strerror}" ) @coroutine def _install_cmd(self): """Install files using `install` command. Args: iterable of (source, dest) tuples of files to install Raises: IpcCommandError on failure """ while True: files = yield # `install` forcibly resolves symlinks so split them out files, symlinks = partition(files, predicate=lambda x: os.path.islink(x[0])) self.install_symlinks(symlinks) # group and install sets of files by destination to decrease `install` calls files = sorted(self._prefix_targets(files), key=itemgetter(1)) for dest, files_group in itertools.groupby(files, itemgetter(1)): sources = list(path for path, _ in files_group) command = ["install"] + self.opts.insoptions + sources + [dest] ret, output = spawn.spawn_get_output(command, collect_fds=(2,)) if not ret: raise IpcCommandError("\n".join(output), code=ret) @coroutine def _install_dirs(self): """Create directories. Args: iterable of paths where directories should be created Raises: IpcCommandError on failure """ while True: dirs = yield try: for d in self._prefix_targets(dirs, files=False): os.makedirs(d, exist_ok=True) if self.diroptions: self._set_attributes(self.diroptions, d) except OSError as e: raise IpcCommandError(f"failed creating dir: {d!r}: {e.strerror}") @coroutine def _install_dirs_cmd(self): """Create directories using `install` command. Args: iterable of paths where directories should be created Raises: IpcCommandError on failure """ while True: dirs = yield dirs = self._prefix_targets(dirs, files=False) command = ["install", "-d"] + self.opts.diroptions + list(dirs) ret, output = spawn.spawn_get_output(command, collect_fds=(2,)) if not ret: raise IpcCommandError("\n".join(output), code=ret) @coroutine def _install_symlinks(self): """Install iterable of symlinks. Args: iterable of (path, target dir) tuples of symlinks to install Raises: IpcCommandError on failure """ while True: symlinks = yield try: for symlink, dest in self._prefix_targets(symlinks): os.symlink(os.readlink(symlink), dest) except OSError as e: raise IpcCommandError( f"failed creating symlink: {symlink!r} -> {dest!r}: {e.strerror}" )
[docs] class Doins(_InstallWrapper): """Python wrapper for doins.""" arg_parser = IpcArgumentParser(parents=(_InstallWrapper.arg_parser,)) arg_parser.add_argument("-r", dest="recursive", action="store_true") def _install_targets(self, targets): files, dirs = partition(targets, predicate=os.path.isdir) if self.opts.recursive: self.install_from_dirs(dirs) self.install((f, os.path.basename(f)) for f in files)
[docs] class Dodoc(_InstallWrapper): """Python wrapper for dodoc.""" insoptions_default = "-m0644" arg_parser = IpcArgumentParser(parents=(_InstallWrapper.arg_parser,)) arg_parser.add_argument("-r", dest="recursive", action="store_true") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.allow_recursive = self.eapi.options.dodoc_allow_recursive def _install_targets(self, targets): files, dirs = partition(targets, predicate=os.path.isdir) # TODO: add peekable class for iterables to avoid list conversion dirs = list(dirs) if dirs: if self.opts.recursive and self.allow_recursive: self.install_from_dirs(dirs) else: missing_option = ", missing -r option?" if self.allow_recursive else "" raise IpcCommandError(f"{dirs[0]!r} is a directory{missing_option}") self.install((f, os.path.basename(f)) for f in files)
[docs] class Doinfo(_InstallWrapper): """Python wrapper for doinfo.""" insoptions_default = "-m0644"
[docs] class Dodir(_InstallWrapper): """Python wrapper for dodir.""" diroptions_default = "-m0755" arg_parser = IpcArgumentParser() arg_parser.add_argument("targets", nargs="+")
[docs] def run(self, args): self.install_dirs(args.targets)
[docs] class Keepdir(Dodir): """Python wrapper for keepdir."""
[docs] def run(self, args): # create dirs super().run(args) # create stub files filename = f".keep_{self.pkg.category}_{self.pkg.PN}-{self.pkg.slot}" for x in args.targets: path = pjoin(self.op.ED, x.lstrip(os.path.sep), filename) open(path, "w").close()
[docs] class Doexe(_InstallWrapper): """Python wrapper for doexe."""
[docs] class Dobin(_InstallWrapper): """Python wrapper for dobin."""
[docs] def parse_install_options(self, *args, **kwargs): # TODO: fix this to be prefix aware at some point self.opts.insoptions = [ "-m0755", f"-g{os_data.root_gid}", f"-o{os_data.root_uid}", ] return super().parse_install_options(*args, **kwargs)
[docs] class Dosbin(Dobin): """Python wrapper for dosbin."""
[docs] class Dolib(_InstallWrapper): """Python wrapper for dolib."""
[docs] class Dolib_so(Dolib): """Python wrapper for dolib.so.""" name = "dolib.so"
[docs] class Dolib_a(Dolib): """Python wrapper for dolib.a.""" name = "dolib.a"
class _Symlink(_InstallWrapper): arg_parser = IpcArgumentParser() arg_parser.add_argument("source") arg_parser.add_argument("target") def run(self, args): dest_dir = args.target.rsplit(os.path.sep, 1)[0] if dest_dir != args.target: self.install_dirs([dest_dir]) target = pjoin(self.op.ED, args.target.lstrip(os.path.sep)) with chdir(self.op.ED): try: try: self._link(args.source, target) except FileExistsError: # overwrite target if it exists os.unlink(target) self._link(args.source, target) except OSError as e: raise IpcCommandError( f"failed creating link: {args.source!r} -> {args.target!r}: {e.strerror}" )
[docs] class Dosym(_Symlink): """Python wrapper for dosym.""" _link = os.symlink arg_parser = IpcArgumentParser(parents=(_Symlink.arg_parser,)) arg_parser.add_argument("-r", dest="relative", action="store_true") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.dosym_relative = self.eapi.options.dosym_relative
[docs] def run(self, args): target = args.target if target.endswith(os.path.sep) or ( os.path.isdir(target) and not os.path.islink(target) ): # bug 379899 raise IpcCommandError(f"missing filename target: {target!r}") if self.opts.relative: if not self.dosym_relative: raise IpcCommandError(f"-r not permitted in EAPI {self.eapi}") if not os.path.isabs(args.source): raise IpcCommandError("-r is only meaningful with absolute paths") args.source = get_relative_dosym_target(args.source, target) super().run(args)
[docs] class Dohard(_Symlink): """Python wrapper for dosym.""" _link = os.link
[docs] class Doman(_InstallWrapper): """Python wrapper for doman.""" insoptions_default = "-m0644" arg_parser = IpcArgumentParser(parents=(_InstallWrapper.arg_parser,)) arg_parser.add_argument("-i18n", action="store_true", default="") detect_lang_re = re.compile(r"^(\w+)\.([a-z]{2}([A-Z]{2})?)\.(\w+)$") valid_mandir_re = re.compile(r"man[0-9n](f|p|pm)?$") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.language_detect = self.eapi.options.doman_language_detect self.language_override = self.eapi.options.doman_language_override def _install_targets(self, targets): dirs = set() for x in targets: basename = os.path.basename(x) ext = os.path.splitext(basename)[1] if self.eapi.archive_exts_regex.match(ext): # TODO: uncompress/warn? ext = os.path.splitext(basename.rsplit(".", 1)[0])[1] name = basename mandir = f"man{ext[1:]}" if self.language_override and self.opts.i18n: mandir = pjoin(self.opts.i18n, mandir) elif self.language_detect: match = self.detect_lang_re.match(basename) if match: name = f"{match.group(1)}.{match.group(4)}" mandir = pjoin(match.group(2), mandir) if self.valid_mandir_re.match(os.path.basename(mandir)): if mandir not in dirs: self.install_dirs([mandir]) dirs.add(mandir) self.install([(x, pjoin(mandir, name))]) else: raise IpcCommandError(f"invalid man page: {x}")
[docs] class Domo(_InstallWrapper): """Python wrapper for domo.""" insoptions_default = "-m0644" def _install_targets(self, targets): dirs = set() for x in targets: d = pjoin(os.path.splitext(os.path.basename(x))[0], "LC_MESSAGES") if d not in dirs: self.install_dirs([d]) dirs.add(d) self.install([(x, pjoin(d, f"{self.pkg.PN}.mo"))])
[docs] class Dohtml(_InstallWrapper): """Python wrapper for dohtml.""" insoptions_default = "-m0644" arg_parser = IpcArgumentParser(parents=(_InstallWrapper.arg_parser,)) arg_parser.add_argument("-r", dest="recursive", action="store_true") arg_parser.add_argument("-V", dest="verbose", action="store_true") arg_parser.add_argument( "-A", dest="extra_allowed_file_exts", action="csv", default=[] ) arg_parser.add_argument("-a", dest="allowed_file_exts", action="csv", default=[]) arg_parser.add_argument("-f", dest="allowed_files", action="csv", default=[]) arg_parser.add_argument("-x", dest="excluded_dirs", action="csv", default=[]) arg_parser.add_argument("-p", dest="doc_prefix", default="") # default allowed file extensions default_allowed_file_exts = ( "css", "gif", "htm", "html", "jpeg", "jpg", "js", "png", )
[docs] def parse_args(self, *args, **kwargs): args = super().parse_args(*args, **kwargs) self.opts.dest = pjoin(self.opts.dest, self.opts.doc_prefix.lstrip(os.path.sep)) if not self.opts.allowed_file_exts: self.opts.allowed_file_exts = list(self.default_allowed_file_exts) self.opts.allowed_file_exts.extend(self.opts.extra_allowed_file_exts) self.opts.allowed_file_exts = set(self.opts.allowed_file_exts) self.opts.excluded_dirs = set(self.opts.excluded_dirs) self.opts.allowed_files = set(self.opts.allowed_files) if self.opts.verbose: self.observer.write(str(self), autoline=True) self.observer.flush() return args
def __str__(self): msg = ["dohtml:", f" Installing to: {self.opts.dest}"] if self.opts.allowed_file_exts: msg.append( f" Allowed extensions: {', '.join(sorted(self.opts.allowed_file_exts))}" ) if self.opts.excluded_dirs: msg.append( f" Allowed extensions: {', '.join(sorted(self.opts.allowed_file_exts))}" ) if self.opts.allowed_files: msg.append(f" Allowed files: {', '.join(sorted(self.opts.allowed_files))}") if self.opts.doc_prefix: msg.append(f" Document prefix: {self.opts.doc_prefix!r}") return "\n".join(msg) def _allowed_file(self, path): """Determine if a file is allowed to be installed.""" basename = os.path.basename(path) ext = os.path.splitext(basename)[1][1:] return ext in self.opts.allowed_file_exts or basename in self.opts.allowed_files def _install_targets(self, targets): files, dirs = partition(targets, predicate=os.path.isdir) # TODO: add peekable class for iterables to avoid list conversion dirs = list(dirs) if dirs: if self.opts.recursive: dirs = (d for d in dirs if d not in self.opts.excluded_dirs) self.install_from_dirs(dirs) else: raise IpcCommandError(f"{dirs[0]!r} is a directory, missing -r option?") self.install((f, os.path.basename(f)) for f in files if self._allowed_file(f))
class _AlterFiles(IpcCommand): arg_parser = IpcArgumentParser() arg_parser.add_argument("-x", dest="excludes", action="store_true") arg_parser.add_argument("targets", nargs="+") default_includes = () default_excludes = () def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.includes = set(self.default_includes) self.excludes = set(self.default_excludes) def run(self, args): if self.opts.excludes: self.excludes.update(args.targets) else: self.includes.update(args.targets)
[docs] class Docompress(_AlterFiles): """Python wrapper for docompress.""" default_includes = ("/usr/share/doc", "/usr/share/info", "/usr/share/man") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.excludes = {f"/usr/share/doc/{self.pkg.PF}/html"}
[docs] class Dostrip(_AlterFiles): """Python wrapper for dostrip.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if "strip" not in self.pkg.restrict: self.includes = {"/"}
class _QueryCmd(IpcCommand): arg_parser = IpcArgumentParser() arg_parser.add_argument("atom", type=atom_mod.atom) # >= EAPI 5 host_root_parser = IpcArgumentParser() host_root_parser.add_argument("--host-root", action="store_true") # >= EAPI 7 query_deps_parser = IpcArgumentParser() dep_opts = query_deps_parser.add_mutually_exclusive_group() dep_opts.add_argument("-b", dest="bdepend", action="store_true") dep_opts.add_argument("-d", dest="depend", action="store_true") dep_opts.add_argument("-r", dest="rdepend", action="store_true") def parse_args(self, options, args): # parse EAPI specific optionals then remaining args if self.eapi.options.query_host_root: _, args = self.host_root_parser.parse_known_optionals( args, namespace=self.opts ) elif self.eapi.options.query_deps: _, args = self.query_deps_parser.parse_known_optionals( args, namespace=self.opts ) args = super().parse_args(options, args) root = None self.opts.domain = self.op.domain if self.eapi.options.query_host_root and self.opts.host_root: root = "/" elif self.eapi.options.query_deps: if self.opts.bdepend: if self.pkg.eapi.options.prefix_capable: # not using BROOT as that's only defined in src_* phases root = pjoin("/", self.op.env["EPREFIX"]) else: root = "/" elif self.opts.depend: if self.pkg.eapi.options.prefix_capable: root = self.op.env["ESYSROOT"] else: root = self.op.env["SYSROOT"] else: if self.pkg.eapi.options.prefix_capable: root = self.op.env["EROOT"] else: root = self.op.env["ROOT"] # TODO: find domain from given path, pointless until full prefix support works if root and root != self.opts.domain.root: raise IpcCommandError("prefix support not implemented yet") return args
[docs] class Has_Version(_QueryCmd): """Python wrapper for has_version."""
[docs] def run(self, args): if args.atom in self.opts.domain.all_installed_repos: return 0 return 1
[docs] class Best_Version(_QueryCmd): """Python wrapper for best_version."""
[docs] def run(self, args): return portageq._best_version(self.opts.domain, args.atom)
[docs] class Eapply(IpcCommand): """Python wrapper for eapply.""" arg_parser = IpcArgumentParser() arg_parser.add_argument("targets", nargs="+", type=existing_path) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.patch_cmd = ["patch", "-p1", "-f", "-s", "-g0", "--no-backup-if-mismatch"] self.patch_opts = [] def _parse_patch_opts(self, args): patch_opts = [] files = [] for i, arg in enumerate(args): if arg == "--": if files: raise IpcCommandError( "options must be specified before file arguments" ) files = args[i + 1 :] break elif arg.startswith("-"): if files: raise IpcCommandError( "options must be specified before file arguments" ) patch_opts.append(arg) else: files.append(arg) return files, patch_opts def _find_patches(self, args): for path in args: if os.path.isdir(path): for root, _dirs, files in os.walk(path): patches = [ pjoin(root, f) for f in sorted(files, key=locale.strxfrm) if f.endswith((".diff", ".patch")) ] if not patches: raise IpcCommandError(f"no patches in directory: {path!r}") yield path, patches else: yield None, [path]
[docs] def parse_args(self, options, args): args, self.patch_opts = self._parse_patch_opts(args) args = super().parse_args(options, ["--"] + args) return self._find_patches(args.targets)
[docs] def run(self, args, user=False): if user: patch_type = "user patches" output_func = self.observer.warn else: patch_type = "patches" output_func = self.observer.info spawn_kwargs = {"collect_fds": (1, 2)} if self.op.userpriv: spawn_kwargs["uid"] = os_data.portage_uid spawn_kwargs["gid"] = os_data.portage_gid for path, patches in args: prefix = "" if path is not None: output_func(f"Applying {patch_type} from {path!r}:") prefix = " " for patch in patches: if path is None: output_func(f"{prefix}Applying {os.path.basename(patch)}...") else: output_func(f"{prefix}{os.path.basename(patch)}...") self.observer.flush() try: with open(patch) as f: ret, output = spawn.spawn_get_output( self.patch_cmd + self.patch_opts, fd_pipes={0: f.fileno()}, **spawn_kwargs, ) if ret: filename = os.path.basename(patch) msg = f"applying {filename!r} failed: {output[0]}" raise IpcCommandError(msg, code=ret) except OSError as e: raise IpcCommandError( f"failed reading patch file: {patch!r}: {e.strerror}" )
[docs] class Eapply_User(IpcCommand): """Python wrapper for eapply_user.""" # stub parser so any arguments are flagged as errors arg_parser = IpcArgumentParser()
[docs] def run(self, args): if self.pkg.user_patches: self.op._ipc_helpers["eapply"].run(self.pkg.user_patches, user=True) # create marker to skip additionals calls patches = itertools.chain.from_iterable( files for _, files in self.pkg.user_patches ) with open(pjoin(self.op.env["T"], ".user_patches_applied"), "w") as f: f.write("\n".join(patches))
[docs] class Unpack(IpcCommand): arg_parser = IpcArgumentParser() arg_parser.add_argument("targets", nargs="+") _file_mode = ( stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH ) _dir_mode = _file_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
[docs] def parse_args(self, *args, **kwargs): args = super().parse_args(*args, **kwargs) self.opts.distdir = self.op.env["DISTDIR"] return args
def _filter_targets(self, targets): for archive in targets: if os.path.sep not in archive: # regular filename get prefixed with ${DISTDIR} srcdir = self.opts.distdir elif archive.startswith("./"): # relative paths get passed through srcdir = "" else: srcdir = self.opts.distdir # >= EAPI 6 allows absolute paths if self.eapi.options.unpack_absolute_paths: srcdir = "" if archive.startswith(self.opts.distdir): self.warn( f"argument contains redundant ${{DISTDIR}}: {archive!r}" ) elif archive.startswith(self.opts.distdir): raise IpcCommandError( f"arguments must not begin with ${{DISTDIR}}: {archive!r}" ) elif archive.startswith(os.path.sep): raise IpcCommandError( f"arguments must not be absolute paths: {archive!r}" ) else: raise IpcCommandError( "relative paths must be prefixed with " f"'./' in EAPI {self.eapi}" ) path = pjoin(srcdir, archive) if not os.path.exists(path): raise IpcCommandError(f"nonexistent file: {archive!r}") elif os.stat(path).st_size == 0: raise IpcCommandError(f"empty file: {archive!r}") match = self.eapi.archive_exts_regex.search(archive) if not match: self.warn(f"skipping unrecognized file format: {archive!r}") continue ext = match.group(1) yield archive, ext, path
[docs] def run(self, args): spawn_kwargs = {} if self.op.userpriv and self.phase == "unpack": spawn_kwargs["uid"] = os_data.portage_uid spawn_kwargs["gid"] = os_data.portage_gid for filename, ext, source in self._filter_targets(args.targets): self.observer.write( f">>> Unpacking {filename} to {self.cwd}", autoline=True ) self.observer.flush() dest = pjoin(self.cwd, os.path.basename(filename[: -len(ext)])) try: target = ArComp(source, ext=ext) target.unpack(dest=dest, **spawn_kwargs) except ArCompError as e: raise IpcCommandError(str(e), code=e.code) for dirpath, dirnames, filenames in os.walk(self.cwd): dirs = ((self._dir_mode, x) for x in dirnames) files = ((self._file_mode, x) for x in filenames) for mode, f in itertools.chain.from_iterable((dirs, files)): path = pjoin(dirpath, f) current_mode = os.lstat(path).st_mode if not stat.S_ISLNK(current_mode): os.chmod(path, current_mode | mode)
[docs] class FilterEnv(IpcCommand): arg_parser = IpcArgumentParser() filtering = arg_parser.add_argument_group("Environment filtering options") filtering.add_argument( "-V", "--var-match", action="store_true", default=False, help="invert the filtering- instead of removing a var if it matches " "remove all vars that do not match", ) filtering.add_argument( "-F", "--func-match", action="store_true", default=False, help="invert the filtering- instead of removing a function if it matches " "remove all functions that do not match", ) filtering.add_argument( "-f", "--funcs", action="csv", help="comma separated list of regexes to match function names against for filtering", ) filtering.add_argument( "-v", "--vars", action="csv", help="comma separated list of regexes to match variable names against for filtering", ) arg_parser.add_argument("files", nargs=2)
[docs] def run(self, args): src_path, dest_path = args.files with open(src_path) as src, open(dest_path, "wb") as dest: filter_env.main_run( dest, src.read(), args.vars, args.funcs, args.var_match, args.func_match )