Source code for pkgcore.ebuild.filter_env

"""Filter a bash environment dump."""

__all__ = ("run",)

import io
import re

from ..log import logger

COMMAND_PARSING, SPACE_PARSING = list(range(2))


[docs] def run( out, file_buff, var_match, func_match, global_envvar_callback=None, func_callback=None, ): """Print a filtered environment. :param out: file-like object to write to. :param file_buff: string containing the environment to filter. Should end in '\0'. :param var_match: result of build_regex_string or C{None}, for variables. :param func_match: result of build_regex_string or C{None}, for functions. """ process_scope( out, file_buff, 0, var_match, func_match, "\0", global_envvar_callback, func_callback=func_callback, )
def build_regex_string(tokens, invert=False): tokens = [_f for _f in tokens if _f] if not tokens: return None if len(tokens) == 1: s = tokens[0] else: s = f"(?:{'|'.join(tokens)})" s = f"^{s}$" if invert: s = f"(?!{s})" try: return re.compile(s) except re.error as e: raise Exception(f"failed compiling {s!r}:\n\nerror: {e}") FUNC_LEN = len("function") def is_function(buff, pos): """:return: start, end, pos or None, None, None tuple.""" isspace = str.isspace try: while buff[pos] in " \t": pos += 1 if buff[pos : pos + FUNC_LEN] == "function": try: if isspace(buff[pos + FUNC_LEN]): pos += FUNC_LEN + 1 except IndexError: # insane, but it could still be a function- # len('f(){:;}') <= FUNC_LEN. pass while isspace(buff[pos]): pos += 1 start = pos while buff[pos] not in "\0 \t\n=\"'()": pos += 1 end = pos if end == start: return None, None, None while buff[pos] in " \t": pos += 1 if buff[pos] != "(": return None, None, None pos += 1 while buff[pos] in " \t": pos += 1 if buff[pos] != ")": return None, None, None pos += 1 while isspace(buff[pos]): pos += 1 if buff[pos] != "{": return None, None, None return start, end, pos + 1 except IndexError: # can't be a function, ran off the end return None, None, None def is_envvar(buff, pos): """:return: start, end, pos or None, None, None tuple.""" try: while buff[pos] in " \t": pos += 1 start = pos while True: if buff[pos] in "\0\"'()- \t\n": return None, None, None if buff[pos] == "=": if pos == start: return None, None, None return start, pos, pos + 1 pos += 1 except IndexError: return None, None, None def process_scope( out, buff, pos, var_match, func_match, endchar, envvar_callback=None, func_callback=None, func_level=0, ): window_start = pos window_end = None isspace = str.isspace end = len(buff) while pos < end and buff[pos] != endchar: # Wander forward to the next non space. if window_end is not None: if out is not None: out.write(buff[window_start:window_end].encode("utf-8")) window_start = pos window_end = None com_start = pos ch = buff[pos] if isspace(ch): pos += 1 continue # Ignore comments. if ch == "#": pos = walk_statement_pound(buff, pos, endchar) continue new_start, new_end, new_p = is_function(buff, pos) if new_p is not None: func_name = buff[new_start:new_end] logger.debug(f"matched func name {func_name!r}") new_p = process_scope( None, buff, new_p, None, None, "}", func_callback=func_callback, func_level=func_level + 1, ) logger.debug(f"ended processing {func_name!r}") if func_callback is not None: func_callback(func_level, func_name, buff[new_start:new_p]) if func_match is not None and func_match(func_name): logger.debug(f"filtering func {func_name!r}") window_end = com_start pos = new_p pos += 1 continue # Check for env assignment. new_start, new_end, new_p = is_envvar(buff, pos) if new_p is None: # Non env assignment. pos = walk_command_complex(buff, pos, endchar, COMMAND_PARSING) # icky icky icky icky if pos < end and buff[pos] != endchar: pos += 1 else: # Env assignment. var_name = buff[new_start:new_end] pos = new_p if envvar_callback: envvar_callback(var_name) logger.debug(f"matched env assign {var_name!r}") if var_match is not None and var_match(var_name): # This would be filtered. logger.info(f"filtering var {var_name!r}") window_end = com_start if pos >= end: return pos while pos < end and not isspace(buff[pos]) and buff[pos] != ";": if buff[pos] == "'": pos = walk_statement_no_parsing(buff, pos + 1, "'") + 1 elif buff[pos] in '"`': pos = walk_command_escaped_parsing(buff, pos + 1, buff[pos]) + 1 elif buff[pos] == "(": pos = walk_command_escaped_parsing(buff, pos + 1, ")") + 1 elif buff[pos] == "$": pos += 1 if pos >= end: continue pos = walk_dollar_expansion(buff, pos, end, endchar) continue else: # blah=cah ; single word pos = walk_command_complex(buff, pos, " ", SPACE_PARSING) if out is not None: if window_end is None: window_end = pos if window_end > end: window_end = end out.write(buff[window_start:window_end].encode("utf-8")) return pos def walk_statement_no_parsing(buff, pos, endchar): pos = buff.find(endchar, pos) if pos == -1: pos = len(buff) - 1 return pos def walk_statement_dollared_quote_parsing(buff, pos, endchar): end = len(buff) while pos < end: if buff[pos] == endchar: return pos elif buff[pos] == "\\": pos += 1 pos += 1 return pos def walk_here_statement(buff, pos): pos += 1 logger.debug("starting here processing for COMMAND for level 2 at p == %.10s", pos) if buff[pos] == "<": logger.debug( "correction, it's a third level here. Handing back to command parsing" ) return pos + 1 isspace = str.isspace end = len(buff) while pos < end and (isspace(buff[pos]) or buff[pos] == "-"): pos += 1 if buff[pos] in "'\"": end_here = walk_statement_no_parsing(buff, pos + 1, buff[pos]) pos += 1 else: end_here = walk_command_complex(buff, pos, " ", SPACE_PARSING) here_word = buff[pos:end_here] logger.debug(f"matched len({len(here_word)})/{here_word!r} for a here word") # XXX watch this. Potential for horkage. Need to do the quote # removal thing. This sucks. end_here += 1 if end_here >= end: return end_here here_len = len(here_word) end_here = buff.find(here_word, end_here) while end_here != -1: i = here_len + end_here if buff[i] in ";\n\r})": i = end_here - 1 while i >= 0 and buff[i] in "\t ": i -= 1 if i >= 0 and buff[i] == "\n": break end_here = buff.find(here_word, end_here + here_len) if end_here == -1: return end return end_here + len(here_word) def walk_statement_pound(buff, pos, endchar=None): if pos and not buff[pos - 1].isspace(): return pos + 1 if endchar == "`": i = buff.find("\n", pos) i2 = buff.find(endchar, pos) if i == -1: if i2 != -1: return i2 else: if i2 != -1: return min(i, i2) return i return len(buff) - 1 pos = buff.find("\n", pos) if pos == -1: pos = len(buff) - 1 return pos def walk_command_complex(buff, pos, endchar, interpret_level): start = pos isspace = str.isspace end = len(buff) while pos < end: ch = buff[pos] if ch == endchar: if endchar != "}": return pos if start == pos: return pos if buff[pos - 1] in ";\n": return pos elif (interpret_level == COMMAND_PARSING and ch in ";\n") or ( interpret_level == SPACE_PARSING and isspace(ch) ): return pos elif ch == "\\": pos += 1 elif ch == "<": if ( pos < end - 1 and buff[pos + 1] == "<" and interpret_level == COMMAND_PARSING ): pos = walk_here_statement(buff, pos + 1) # we continue immediately; walk_here deposits us at the end # of the here op, not consuming the final delimiting char # since it may be an endchar continue else: logger.debug(f"noticed <, interpret_level={interpret_level}") elif ch == "#": if start == pos or isspace(buff[pos - 1]) or buff[pos - 1] == ";": pos = walk_statement_pound(buff, pos) continue elif ch == "$": pos = walk_dollar_expansion(buff, pos + 1, end, endchar) continue elif ch == "{": pos = walk_command_escaped_parsing(buff, pos + 1, "}") elif ch == "(" and interpret_level == COMMAND_PARSING: pos = walk_command_escaped_parsing(buff, pos + 1, ")") elif ch in '`"': pos = walk_command_escaped_parsing(buff, pos + 1, ch) elif ch == "'" and endchar != '"': pos = walk_statement_no_parsing(buff, pos + 1, "'") pos += 1 return pos def raw_walk_command_escaped_parsing(buff, pos, endchar): end = len(buff) while pos < end: ch = buff[pos] if ch == endchar: return pos elif ch == "\\": pos += 1 elif ch == "{": if endchar != '"': pos = raw_walk_command_escaped_parsing(buff, pos + 1, "}") elif ch == "(": if endchar != '"': pos = raw_walk_command_escaped_parsing(buff, pos + 1, ")") elif ch in '`"': pos = raw_walk_command_escaped_parsing(buff, pos + 1, ch) elif ch == "'" and endchar != '"': pos = walk_statement_no_parsing(buff, pos + 1, "'") elif ch == "$": pos = walk_dollar_expansion( buff, pos + 1, end, endchar, disable_quote=endchar == '"' ) continue elif ch == "#" and endchar != '"': pos = walk_statement_pound(buff, pos, endchar) continue pos += 1 return pos walk_command_escaped_parsing = raw_walk_command_escaped_parsing def walk_dollar_expansion(buff, pos, end, endchar, disable_quote=False): if buff[pos] == "(": return process_scope(None, buff, pos + 1, None, None, ")") + 1 if buff[pos] == "'" and not disable_quote: return walk_statement_dollared_quote_parsing(buff, pos + 1, "'") + 1 if buff[pos] != "{": if buff[pos] == "$": # short circuit it. return pos + 1 while pos < end and buff[pos] != endchar: if buff[pos].isspace(): return pos if buff[pos] == "$": # shouldn't this be passing disable_quote ? return walk_dollar_expansion(buff, pos + 1, end, endchar) if not buff[pos].isalnum(): if buff[pos] != "_": return pos pos += 1 if pos >= end: return end return pos pos += 1 # shortcut ${$} to avoid going too deep. ${$a} isn't valid, so no concern if pos == "$": return pos + 1 while pos < end and buff[pos] != "}": if buff[pos] == "$": # disable_quote? pos = walk_dollar_expansion(buff, pos + 1, end, endchar) else: pos += 1 return pos + 1 def main_run( out_handle, data, vars_to_filter=(), funcs_to_filter=(), vars_is_whitelist=False, funcs_is_whitelist=False, global_envvar_callback=None, func_callback=None, ): vars = funcs = None if vars_to_filter: vars = build_regex_string(vars_to_filter, invert=vars_is_whitelist).match if funcs_to_filter: if isinstance(funcs_to_filter, str): raise ValueError("funcs_str should not be a string; should be a sequence.") funcs = build_regex_string(funcs_to_filter, invert=funcs_is_whitelist).match data = data + "\0" kwds = {"global_envvar_callback": global_envvar_callback} if func_callback: kwds["func_callback"] = func_callback if out_handle is None: out_handle = io.BytesIO() run(out_handle, data, vars, funcs, **kwds)