diff --git a/kernelguard.py b/kernelguard.py index f086847..16c8125 100644 --- a/kernelguard.py +++ b/kernelguard.py @@ -2459,6 +2459,103 @@ def _has_ver(expr: ast.AST | None) -> bool: ), }] + # Quaternary pass: captured-state identity replay via subscript or + # attribute. Pattern: + # state = {"last": None, "out": None} # or any captured object + # def custom_kernel(data): + # if state["last"] is data: + # return state["out"] + # state["last"] = data + # state["out"] = data.clone() + # return state["out"] + # The first three passes only key off module-level `Name` storage slots + # whose name matches a stateful heuristic (last/prev/cache/...) or + # whose initial value is None. They miss the same exploit when the + # storage slot is one level removed (dict key, instance attribute) on a + # captured non-parameter name. + param_names = {a.arg for a in node.args.args} + param_names.update(a.arg for a in getattr(node.args, "posonlyargs", []) or []) + param_names.update(a.arg for a in getattr(node.args, "kwonlyargs", []) or []) + + captured_subscript_or_attr_targets: set[str] = set() + for child in ast.walk(node): + if not isinstance(child, ast.Assign): + continue + for target in child.targets: + if not isinstance(target, (ast.Subscript, ast.Attribute)): + continue + root = _ast_root_name(target) + if root and root not in param_names: + captured_subscript_or_attr_targets.add(root) + + if captured_subscript_or_attr_targets: + for child in ast.walk(node): + if not isinstance(child, ast.If): + continue + if _body_has_calls(child.body): + continue + + triggered = False + for cmp in ast.walk(child.test): + if not isinstance(cmp, ast.Compare): + continue + operands = [cmp.left] + list(cmp.comparators) + for i, op in enumerate(cmp.ops): + if not isinstance(op, ast.Is): + continue + left = operands[i] + right = operands[i + 1] + + def _is_captured_slot(expr: ast.AST) -> bool: + if not isinstance(expr, (ast.Subscript, ast.Attribute)): + return False + root = _ast_root_name(expr) + return ( + root is not None + and root not in param_names + and root in captured_subscript_or_attr_targets + ) + + def _is_param_ref(expr: ast.AST) -> bool: + if isinstance(expr, ast.Name): + return expr.id in param_names + return _ast_root_name(expr) in param_names + + if ( + (_is_captured_slot(left) and _is_param_ref(right)) + or (_is_captured_slot(right) and _is_param_ref(left)) + ): + triggered = True + break + if triggered: + break + + if not triggered: + continue + + # Confirm the if-body returns something rooted on a captured + # non-parameter name (the saved output slot). + returns_captured = False + for stmt in child.body: + if not isinstance(stmt, ast.Return) or stmt.value is None: + continue + rroot = _ast_root_name(stmt.value) + if rroot and rroot not in param_names: + returns_captured = True + break + + if not returns_captured: + continue + + return [{ + "pattern": "LAST_CALL_REPLAY", + "severity": "critical", + "evidence": ( + f"{entrypoint_name} returns saved output on object-identity " + f"match between captured-state slot and call input" + ), + }] + return []