Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions kernelguard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2459,6 +2459,103 @@ def _has_ver(expr: ast.AST | None) -> bool:
),
}]

# Quaternary pass: captured-state identity replay via subscript or
# attribute. Pattern:
# state = {"last": None, "out": None} # or any captured object
# def custom_kernel(data):
# if state["last"] is data:
# return state["out"]
# state["last"] = data
# state["out"] = data.clone()
# return state["out"]
# The first three passes only key off module-level `Name` storage slots
# whose name matches a stateful heuristic (last/prev/cache/...) or
# whose initial value is None. They miss the same exploit when the
# storage slot is one level removed (dict key, instance attribute) on a
# captured non-parameter name.
param_names = {a.arg for a in node.args.args}
param_names.update(a.arg for a in getattr(node.args, "posonlyargs", []) or [])
param_names.update(a.arg for a in getattr(node.args, "kwonlyargs", []) or [])

captured_subscript_or_attr_targets: set[str] = set()
for child in ast.walk(node):
if not isinstance(child, ast.Assign):
continue
for target in child.targets:
if not isinstance(target, (ast.Subscript, ast.Attribute)):
continue
root = _ast_root_name(target)
if root and root not in param_names:
captured_subscript_or_attr_targets.add(root)

if captured_subscript_or_attr_targets:
for child in ast.walk(node):
if not isinstance(child, ast.If):
continue
if _body_has_calls(child.body):
continue

triggered = False
for cmp in ast.walk(child.test):
if not isinstance(cmp, ast.Compare):
continue
operands = [cmp.left] + list(cmp.comparators)
for i, op in enumerate(cmp.ops):
if not isinstance(op, ast.Is):
continue
left = operands[i]
right = operands[i + 1]

def _is_captured_slot(expr: ast.AST) -> bool:
if not isinstance(expr, (ast.Subscript, ast.Attribute)):
return False
root = _ast_root_name(expr)
return (
root is not None
and root not in param_names
and root in captured_subscript_or_attr_targets
)

def _is_param_ref(expr: ast.AST) -> bool:
if isinstance(expr, ast.Name):
return expr.id in param_names
return _ast_root_name(expr) in param_names

if (
(_is_captured_slot(left) and _is_param_ref(right))
or (_is_captured_slot(right) and _is_param_ref(left))
):
triggered = True
break
if triggered:
break

if not triggered:
continue

# Confirm the if-body returns something rooted on a captured
# non-parameter name (the saved output slot).
returns_captured = False
for stmt in child.body:
if not isinstance(stmt, ast.Return) or stmt.value is None:
continue
rroot = _ast_root_name(stmt.value)
if rroot and rroot not in param_names:
returns_captured = True
break

if not returns_captured:
continue

return [{
"pattern": "LAST_CALL_REPLAY",
"severity": "critical",
"evidence": (
f"{entrypoint_name} returns saved output on object-identity "
f"match between captured-state slot and call input"
),
}]

return []


Expand Down