diff --git a/kernelguard.py b/kernelguard.py index f086847..2a41a5d 100644 --- a/kernelguard.py +++ b/kernelguard.py @@ -3025,6 +3025,18 @@ def detect_precision_downgrade(code: str | SubmissionFacts) -> list[dict]: return matches +def detect_generator_state_reuse(code_or_facts): + """Generator send() state machine for replay.""" + facts = ensure_submission_facts(code_or_facts) + if not facts.custom_kernel_active or not facts.python_active: return [] + g1 = bool(re.search(r'def\s+\w+\s*\(\s*\)\s*:\s*\n\s+(?:[\w,\s]*=[^#\n]*\n\s+)*while\s+True\s*:', facts.python_active)) + g2 = bool(re.search(r'\byield\b', facts.raw_code)) + g3 = bool(re.search(r'\.send\s*\(', facts.custom_kernel_active)) + if g1 and g2 and g3: + return [{"pattern": "LAST_CALL_REPLAY", "severity": "critical", "evidence": "Generator send() state machine — coroutine-based replay evades all cache detectors"}] + return [] + + # --------------------------------------------------------------------------- # Score anomaly detection # --------------------------------------------------------------------------- @@ -3790,6 +3802,7 @@ def support_only_patterns(matched_patterns: list[dict]) -> bool: detect_thread_injection, detect_lazy_tensor, detect_precision_downgrade, + detect_generator_state_reuse, ] BASE_DETECTOR_SPECS = [ @@ -3827,6 +3840,7 @@ def support_only_patterns(matched_patterns: list[dict]) -> bool: ("thread_injection", detect_thread_injection), ("lazy_tensor", detect_lazy_tensor), ("precision_downgrade", detect_precision_downgrade), + ("generator_state_reuse", detect_generator_state_reuse), ] VALID_RULE_OUTCOMES = {AUTO_FILTER, SUSPICIOUS_ONLY, TELEMETRY_ONLY}