diff --git a/kernelguard.py b/kernelguard.py index f086847..7549329 100644 --- a/kernelguard.py +++ b/kernelguard.py @@ -2459,6 +2459,122 @@ def _has_ver(expr: ast.AST | None) -> bool: ), }] + # Quaternary pass: cursor-indexed reference replay. + # Pattern: + # _REFS = [] + # _idx = 0 + # def custom_kernel(data): + # global _idx + # if _idx < len(_REFS): + # out = _REFS[_idx] + # _idx += 1 + # return out + # out = data.clone() + # _REFS.append(out) + # _idx += 1 + # return out + # The replay decision is a counter/length test that does not look at + # the input; the returned value is a Subscript on a captured collection + # (read from the enclosing module scope). The earlier passes don't + # fire because there is no ``is``-comparison and the returned value is + # a local name assigned from a Subscript rather than the global itself. + param_names = {a.arg for a in node.args.args} + param_names.update(a.arg for a in getattr(node.args, "posonlyargs", []) or []) + param_names.update(a.arg for a in getattr(node.args, "kwonlyargs", []) or []) + + # Names that are *captured* from an enclosing scope: referenced by the + # function body but never assigned as a top-level Name target inside + # the body (an AugAssign on a captured global is a mutation, not a + # local definition). Excludes parameters and locally-bound helpers. + local_only_assigned: set[str] = set() + for child in ast.walk(node): + if isinstance(child, ast.Assign): + for target in child.targets: + if isinstance(target, ast.Name): + local_only_assigned.add(target.id) + # Globals declared with `global X` are explicitly captured even when + # also locally assigned, so we exempt them from the local-only set. + global_decls: set[str] = set() + for n in ast.walk(node): + if isinstance(n, ast.Global): + global_decls.update(n.names) + local_only_assigned -= global_decls + + captured_names = ( + _expr_names(node) - param_names - local_only_assigned + ) + + # Captured names that the function *mutates* in some way: AugAssign on + # the name (counter increment), or attribute method call on the name + # (e.g. ``_REFS.append(...)``). Plain reads — read-only config dicts + # — are excluded so legitimate config lookups are not flagged. + mutated_captures: set[str] = set() + for child in ast.walk(node): + if isinstance(child, ast.AugAssign): + root = _ast_root_name(child.target) + if root and root in captured_names: + mutated_captures.add(root) + elif isinstance(child, ast.Assign): + for target in child.targets: + root = _ast_root_name(target) + if root and root in captured_names and root in global_decls: + mutated_captures.add(root) + elif isinstance(child, ast.Call) and isinstance(child.func, ast.Attribute): + root = _ast_root_name(child.func.value) + if root and root in captured_names: + mutated_captures.add(root) + + if mutated_captures: + def _returns_subscripted_capture(if_body: list[ast.stmt]) -> bool: + """True iff the if-body returns a Subscript rooted on a + mutated capture, either directly (return _G[i]) or via a + local that was assigned from such a Subscript.""" + for stmt in if_body: + if not isinstance(stmt, ast.Return) or stmt.value is None: + continue + rv = stmt.value + if isinstance(rv, ast.Subscript): + sroot = _ast_root_name(rv.value) + if sroot and sroot in mutated_captures: + return True + elif isinstance(rv, ast.Name): + for prior in if_body: + if not isinstance(prior, ast.Assign): + continue + if not any( + isinstance(t, ast.Name) and t.id == rv.id + for t in prior.targets + ): + continue + if isinstance(prior.value, ast.Subscript): + sroot = _ast_root_name(prior.value.value) + if sroot and sroot in mutated_captures: + return True + return False + + for child in ast.walk(node): + if not isinstance(child, ast.If): + continue + if _body_has_calls(child.body): + continue + test_names = _expr_names(child.test) + if test_names & param_names: + continue + if not (test_names & mutated_captures): + continue + if not _returns_subscripted_capture(child.body): + continue + + return [{ + "pattern": "LAST_CALL_REPLAY", + "severity": "critical", + "evidence": ( + f"{entrypoint_name} returns a precomputed value " + f"indexed out of a captured collection — gating " + f"decision ignores the input" + ), + }] + return []