Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions kernelguard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,61 @@ def _has_ver(expr: ast.AST | None) -> bool:
for n in ast.walk(expr)
)

for stmt in getattr(tree, "body", []):
if not isinstance(stmt, ast.Assign):
continue
if not any(
isinstance(target, ast.Name) and is_entrypoint_name(target.id)
for target in stmt.targets
):
continue
if not isinstance(stmt.value, ast.Lambda):
continue
lambda_node = stmt.value
if not isinstance(lambda_node.body, ast.IfExp):
continue

params = {arg.arg for arg in lambda_node.args.args}
if lambda_node.args.vararg is not None:
params.add(lambda_node.args.vararg.arg)
if lambda_node.args.kwarg is not None:
params.add(lambda_node.args.kwarg.arg)
if _expr_names(lambda_node.body.test) & params:
continue

captured = _expr_names(lambda_node.body) - params
returned_root = (
_ast_root_name(lambda_node.body.body)
if isinstance(lambda_node.body.body, ast.Subscript)
else None
)
if returned_root not in captured:
continue

mutates_returned = any(
isinstance(child, ast.Call)
and isinstance(child.func, ast.Attribute)
and child.func.attr == "__setitem__"
and _ast_root_name(child.func.value) == returned_root
for child in ast.walk(lambda_node.body.orelse)
)
returns_after_mutation = any(
isinstance(child, ast.Subscript)
and _ast_root_name(child) == returned_root
for child in ast.walk(lambda_node.body.orelse)
)
if not (mutates_returned and returns_after_mutation):
continue

return [{
"pattern": "LAST_CALL_REPLAY",
"severity": "critical",
"evidence": (
f"{entrypoint_name} lambda replays from captured state after "
"__setitem__ mutation"
),
}]

for node in ast.walk(tree):
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue
Expand Down