From 6e2289f2174df572724aa073896112de8d7c450c Mon Sep 17 00:00:00 2001 From: Prasanna Date: Sat, 2 May 2026 20:14:30 +0530 Subject: [PATCH] Detect lambda setitem replay --- kernelguard.py | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/kernelguard.py b/kernelguard.py index f086847..505db6b 100644 --- a/kernelguard.py +++ b/kernelguard.py @@ -2197,6 +2197,61 @@ def _has_ver(expr: ast.AST | None) -> bool: for n in ast.walk(expr) ) + for stmt in getattr(tree, "body", []): + if not isinstance(stmt, ast.Assign): + continue + if not any( + isinstance(target, ast.Name) and is_entrypoint_name(target.id) + for target in stmt.targets + ): + continue + if not isinstance(stmt.value, ast.Lambda): + continue + lambda_node = stmt.value + if not isinstance(lambda_node.body, ast.IfExp): + continue + + params = {arg.arg for arg in lambda_node.args.args} + if lambda_node.args.vararg is not None: + params.add(lambda_node.args.vararg.arg) + if lambda_node.args.kwarg is not None: + params.add(lambda_node.args.kwarg.arg) + if _expr_names(lambda_node.body.test) & params: + continue + + captured = _expr_names(lambda_node.body) - params + returned_root = ( + _ast_root_name(lambda_node.body.body) + if isinstance(lambda_node.body.body, ast.Subscript) + else None + ) + if returned_root not in captured: + continue + + mutates_returned = any( + isinstance(child, ast.Call) + and isinstance(child.func, ast.Attribute) + and child.func.attr == "__setitem__" + and _ast_root_name(child.func.value) == returned_root + for child in ast.walk(lambda_node.body.orelse) + ) + returns_after_mutation = any( + isinstance(child, ast.Subscript) + and _ast_root_name(child) == returned_root + for child in ast.walk(lambda_node.body.orelse) + ) + if not (mutates_returned and returns_after_mutation): + continue + + return [{ + "pattern": "LAST_CALL_REPLAY", + "severity": "critical", + "evidence": ( + f"{entrypoint_name} lambda replays from captured state after " + "__setitem__ mutation" + ), + }] + for node in ast.walk(tree): if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): continue