Source code for snakeoil.tools.find_unused_exports

"If you're having to work on this, python -m ast <path-to-a-file> is your best friend"

__all__ = ("main",)


import argparse
import ast
import logging
import sys
from collections import defaultdict
from importlib import import_module
from pathlib import Path
from textwrap import dedent
from typing import NamedTuple, Optional, Self, cast

from snakeoil.python_namespaces import get_submodules_of

# Generally hard requirement- avoid relying on snakeoil here.  At somepoint this
# should be able to be pointed right back at snakeoil for finding components internally
# that are unused.

logger = logging.getLogger(__name__)


class CtxAccess(NamedTuple):
    attr: str
    module: "ModuleImport"


# This classes are effectively a tree that can be walked backwards as
# we recurse into the import pathways where they reference back down the pathways.
# It is cyclic as all hell.
class ModuleImport(dict[str, "ModuleImport"]):
    def __init__(self, root: Self | None, parent: Self | None, name: str) -> None:
        self.root = self if root is None else root  # oh yeah, cyclic baby.
        self.parent = self.root if parent is None else parent
        self.name = name
        # this is recordings of other modules accessing us.
        self.accessed_by: dict[str, set["ModuleImport"]] = defaultdict(set)
        # This is a mapping of the local name to the target namespace
        self.ctx_imports = dict[str, CtxAccess]()
        self.unscoped_accessers: set[str] = set()
        self.requires_reprocessing = False
        self.alls = None

    def __hash__(self) -> int:  # type: ignore
        return hash(self.qualname)

    def __eq__(self, other):
        return self is other

    @property
    def qualname(self):
        l = []
        current = self
        while current is not self.root:
            l.append(current.name)
            current = current.parent
        return ".".join(reversed(l))

    def create(self, chunks: list[str]) -> "ModuleImport":
        assert len(chunks)
        name, chunks = chunks[0], chunks[1:]
        obj = self.setdefault(name, self.__class__(self.root, parent=self, name=name))
        if chunks:
            return obj.create(chunks)
        return obj

    def resolve_import(
        self,
        name: str,
        requester: Optional["ModuleImport"],
    ) -> tuple[list[str], "ModuleImport"]:
        parts = name.split(".")
        assert all(parts)
        current = self

        while parts:
            if requester is not None:
                current.accessed_by[parts[0]].add(requester)
            if parts[0] not in current:
                break
            current = current[parts[0]]
            parts = parts[1:]

        try:
            assert parts or self.root is not current
        except AssertionError as _e:
            # structured this way to make debugging easier
            raise

        return (parts, current)

    def __str__(self) -> str:
        return f"{self.qualname}: access={self.accessed_by!r} unscoped={self.unscoped_accessers!r} known ctx={list(sorted(self.ctx_imports.keys()))!r}"

    def __repr__(self):
        return str(self)


class ImportCollector(ast.NodeVisitor):
    __slotting_intentionally_disabled__ = True

    def __init__(
        self, root: ModuleImport, current: ModuleImport, name: str, path: Path
    ) -> None:
        self.root = root
        self.current = current
        self.path = path
        # from semantics are directory traversals, despite how they look.  __init__ is special.
        self.level_adjustment = 1 if path.name.startswith("__init__.") else 0
        self.requires_reprocessing = True

    def visit(self, node):
        # reset our status
        self.current.requires_reprocessing = False
        super().visit(node)

    def get_asname(self, alias) -> str:
        if alias.asname:
            return alias.asname
        return alias.name.split(
            ".",
        )[0]

    def update_must_reprocess(self, asname: str):
        assert "." not in asname, asname
        for must_reprocess in self.current.accessed_by.pop(asname, []):
            must_reprocess.requires_reprocessing = True

    def visit_Import(self, node):
        for alias in node.names:
            asname = self.get_asname(alias)
            self.update_must_reprocess(asname)

            attrs, result = self.root.resolve_import(alias.name, requester=self.current)

            if attrs:
                # failed to fully import.  Don't inject the result into ctx;
                # the traversal to get there will notify of us of the rebuild
                # if necessary.  It's possible we're importing through a module
                # that assembles an API via doing it's own internal imports.
                continue
            self.current.ctx_imports[asname] = CtxAccess(
                alias.name,
                result,
            )
            result.unscoped_accessers.add(self.current.qualname)

    def visit_ImportFrom(self, node):
        # just rewrite into absolute pathing
        base: list[str]
        if node.level:
            base = self.current.qualname.split(".")
            level = node.level - self.level_adjustment
            if level:
                base = base[:-level]
            if node.module:
                base.extend(node.module.split("."))
        else:
            base = node.module.split(".")
        for alias in node.names:
            asname = self.get_asname(alias)
            self.update_must_reprocess(asname)
            l = base[:]
            l.append(alias.name)

            attrs, result = self.root.resolve_import(
                ".".join(l), requester=self.current
            )
            if attrs:
                if len(attrs) == 1:
                    # `from module import some_func`
                    result.accessed_by[attrs[0]].add(self.current)
                # lacking that, we couldn't import it fully.
                continue

            self.current.ctx_imports[asname] = CtxAccess(
                alias.name,
                result,
            )


class AttributeCollector(ast.NodeVisitor):
    def __init__(self, root: ModuleImport, current: ModuleImport) -> None:
        self.root = root
        self.current = current

    def visit_Attribute(self, node):
        if not isinstance(node.ctx, ast.Load):
            return

        lookup = [node.attr]
        value = node.value
        try:
            while isinstance(value, ast.Name):
                if (last := getattr(value, "id", None)) is not None:
                    # terminus.  This node won't have attr.
                    lookup.append(last)
                    break
                lookup.append(value.attr)
                node = node.value

        except Exception as e:
            print(
                f"ast traversal bug in {self.current.qualname} for original {type(node)}={node} sub-value {type(value)}={value}"
            )
            import pdb

            pdb.set_trace()
            raise e

        lookup.reverse()

        # this isn't confirming there isn't shadowing-
        # import os
        # def foon(os): ... # just got shadowed, 'os' in that ctx is not globals()['os']
        # it takes effort, and it's not worth it; this tool is already known loose.

        if (target := self.current.ctx_imports.get(lookup[0], None)) is None:
            # it's an attribute, or an import we don't care about.
            return
        # build an absolute path, use resolve machinery to sort this.
        parts = target.module.qualname.split(".") + lookup[1:]
        parts, mod = self.root.resolve_import(".".join(parts), requester=self.current)
        assert mod is not self.root

        if parts:
            # attribute access into that module.
            mod.accessed_by[parts[0]].add(self.current)


parser = argparse.ArgumentParser(
    __name__.rsplit(".", 1)[-1],
    description=dedent(
        """\
        Tool for finding potentially dead code

        This imports all modules of the source namespace, then scans the target
        namespaces actual imports to find identify if a member of the sources __all__ is
        actually used somewhere in the targets.  It specifically knows how to 'see' through
        snakeoil mechanisms to thunk an import- a lazy import.

        It is not authorative; code doing imports within a function it isn't written to 'see'.
        Consider this tooling as a way to get suggestions of what is dead code from the
        standpoint of nothing in the target namespaces holds a reference to the object, thus
        either they do dynamic imports or getattrs- which we can't see- during code execution-
        or it's not in use.
        """
    ),
)
parser.add_argument(
    "source",
    action="store",
    type=str,
    help="the python module to import and scan recursively, using __all__ to find things only used within that codebase.",
)
parser.add_argument(
    "targets", type=str, nargs="+", help="python namespaces to scan for usage."
)
parser.add_argument(
    "-v", action="store_true", default=False, dest="verbose", help="Increase verbosity"
)


[docs] def main(options, out, err) -> int: root = ModuleImport(None, None, "") source_modules: list[ModuleImport] = [] ast_sources = {} # pre-initialize the module tree of what we care about. for target in tuple(options.targets) + (options.source,): for module in get_submodules_of(import_module(target), include_root=True): obj = root.create(module.__name__.split(".")) obj.alls = getattr(module, "__all__", None) p = Path(cast(str, module.__file__)) with p.open("r") as f: ast_sources[obj] = (p, ast.parse(f.read(), str(p))) if target == options.source: source_modules.append(obj) # collect and finalize imports, then run analysis based on attribute access. # Note: the import collection may need to run multiple times. Consider: # klass.py: # __all__ = ('blah', 'foon') # from .other import blah, foon # # If some other module tries to travers klass.py before those from imports have been placed, the # other module will think it stopped at an attribute for 'blah'. Which isn't correct. # They internally detect this conflict and mark a boolean to indicate if a reprocessing is needed. must_be_processed = list(ast_sources) for run in range(0, 10): for mod in must_be_processed: p, tree = ast_sources[mod] ImportCollector(root, mod, mod.qualname, p).visit(tree) if new_reprocess := [mod for mod in ast_sources if mod.requires_reprocessing]: if len(new_reprocess) == len(must_be_processed): raise Exception("cycle encountered") must_be_processed = new_reprocess else: break for mod, (p, tree) in ast_sources.items(): AttributeCollector(root, mod).visit(tree) results = [] for mod in source_modules: results.append(result := [mod.qualname]) if mod.alls is None: result.append(f"{mod.qualname} has no __all__. Not analyzing") continue if options.verbose: result.append("__all__ = (" + ", ".join(sorted(mod.alls)) + ")") missing = list(sorted(set(mod.alls).difference(mod.accessed_by))) if not missing: continue # result.append(f"all is {list(sorted(mod.alls))}") if mod.unscoped_accessers: result.append( f"unscoped access exists from {mod.unscoped_accessers!r}. Results may be inaccurate" ) result.append(f"possibly unused {missing}") first = "" for block in sorted(results, key=lambda l: l[0]): if len(block) == 1 and not options.verbose: continue out.write(f"{first}{block[0]}\n") first = "\n" if len(block) == 1: out.write(" __all__ is fully used\n") continue for lines in block[1:]: out.write(f" {lines}\n") return 0
if __name__ == "__main__": options = parser.parse_args() sys.exit(main(options, sys.stdout, sys.stderr))