diff --git a/kernelguard.py b/kernelguard.py index f086847..1a3d60d 100644 --- a/kernelguard.py +++ b/kernelguard.py @@ -3025,6 +3025,26 @@ def detect_precision_downgrade(code: str | SubmissionFacts) -> list[dict]: return matches +RE_RUNPY = re.compile(r'(?:runpy|zipimport)\s*\.') +def detect_runpy_execution(code: str | SubmissionFacts) -> list[dict]: + facts = ensure_submission_facts(code) + if not RE_RUNPY.search(facts.raw_code): return [] + return [{"pattern": "DYNAMIC_EXECUTION", "severity": "high", "evidence": "runpy/zipimport module — script/zip execution outside normal import"}] + + +RE_AST_LITERAL = re.compile(r'ast\.literal_eval\s*\(') +def detect_ast_literal_eval(code: str | SubmissionFacts) -> list[dict]: + facts = ensure_submission_facts(code) + if not RE_AST_LITERAL.search(facts.raw_code): return [] + return [{"pattern": "OBFUSCATED_EXEC", "severity": "high", "evidence": "ast.literal_eval — limited eval for code parsing"}] + +RE_HTTP_SERVER = re.compile(r'(?:http\.server|socketserver|xmlrpc)\s*\.') +def detect_http_server_netsvc(code: str | SubmissionFacts) -> list[dict]: + facts = ensure_submission_facts(code) + if not RE_HTTP_SERVER.search(facts.raw_code): return [] + return [{"pattern": "RUNTIME_PACKAGE_INSTALL", "severity": "critical", "evidence": "http.server/socketserver/xmlrpc — network service server within submission"}] + + # --------------------------------------------------------------------------- # Score anomaly detection # --------------------------------------------------------------------------- @@ -3048,6 +3068,51 @@ def _collect_scores(metadata: Optional[dict]) -> tuple[list[float], Optional[flo # --------------------------------------------------------------------------- + +# --------------------------------------------------------------------------- +# Sentinel/state reuse detectors +# --------------------------------------------------------------------------- + + +def detect_global_sentinel_reuse(code: str | SubmissionFacts) -> list[dict]: + """Detect module-level None-initialized vars used as sentinel cache in entrypoint.""" + facts = ensure_submission_facts(code) + if not facts.custom_kernel_active or not facts.ast_tree: + return [] + for var in facts._none_inited: + if not re.search(rf'\bglobal\s+\b{re.escape(var)}\b', facts.custom_kernel_active): + continue + if not re.search(rf'\bif\s+\b{re.escape(var)}\b\s+is\s+None\b', facts.custom_kernel_active): + continue + if not re.search(rf'\breturn\s+\b{re.escape(var)}\b', facts.custom_kernel_active): + continue + return [{ + "pattern": "GLOBAL_SENTINEL_REUSE", + "severity": "high", + "evidence": f"Module-level {var}=None used as sentinel cache with global/guard/return in entrypoint", + }] + return [] + + +def detect_closure_state_reuse(code: str | SubmissionFacts) -> list[dict]: + """Detect nonlocal variables in closure-based entrypoints used for result caching.""" + facts = ensure_submission_facts(code) + if not facts.custom_kernel_active: + return [] + for m in re.finditer(r'\bnonlocal\s+(\w+)', facts.custom_kernel_active): + var = m.group(1) + if not re.search(rf'\bif\s+\b{re.escape(var)}\b\s+is\s+None\b', facts.custom_kernel_active): + continue + if not re.search(rf'\breturn\s+\b{re.escape(var)}\b', facts.custom_kernel_active): + continue + return [{ + "pattern": "CLOSURE_STATE_REUSE", + "severity": "high", + "evidence": f"Nonlocal {var} used as sentinel cache with guard/return in entrypoint", + }] + return [] + + # Roofline physics floor # --------------------------------------------------------------------------- # The theoretical minimum execution time for a GEMM on a given GPU is bounded @@ -3653,6 +3718,14 @@ class RulePolicy: "NEAR_CLONE_SPAM", "administrative", "support", SUSPICIOUS_ONLY, (), (), "downgrade", ), + "GLOBAL_SENTINEL_REUSE": RulePolicy( + "GLOBAL_SENTINEL_REUSE", "result_reuse", "hard", AUTO_FILTER, (), + (), "rewrite", + ), + "CLOSURE_STATE_REUSE": RulePolicy( + "CLOSURE_STATE_REUSE", "result_reuse", "hard", AUTO_FILTER, (), + (), "rewrite", + ), } BASE_SCORE_CONFIG = { @@ -3775,6 +3848,7 @@ def support_only_patterns(matched_patterns: list[dict]) -> bool: detect_token_paste_cuda_api, detect_sequence_batch_graph, detect_runtime_package_install, + detect_http_server_netsvc, # AST-based detectors (Layer 2) detect_trusted_module_import, detect_module_mutation, @@ -3790,6 +3864,9 @@ def support_only_patterns(matched_patterns: list[dict]) -> bool: detect_thread_injection, detect_lazy_tensor, detect_precision_downgrade, + detect_global_sentinel_reuse, + detect_closure_state_reuse, + detect_runpy_execution, ] BASE_DETECTOR_SPECS = [ @@ -3813,6 +3890,7 @@ def support_only_patterns(matched_patterns: list[dict]) -> bool: ("token_paste_cuda_api", detect_token_paste_cuda_api), ("sequence_batch_graph", detect_sequence_batch_graph), ("runtime_package_install", detect_runtime_package_install), + ("http_server_netsvc", detect_http_server_netsvc), ("trusted_module_import", detect_trusted_module_import), ("module_mutation", detect_module_mutation), ("globals_mutation", detect_globals_mutation), @@ -3827,6 +3905,9 @@ def support_only_patterns(matched_patterns: list[dict]) -> bool: ("thread_injection", detect_thread_injection), ("lazy_tensor", detect_lazy_tensor), ("precision_downgrade", detect_precision_downgrade), + ("global_sentinel_reuse", detect_global_sentinel_reuse), + ("closure_state_reuse", detect_closure_state_reuse), + ("runpy_execution", detect_runpy_execution), ] VALID_RULE_OUTCOMES = {AUTO_FILTER, SUSPICIOUS_ONLY, TELEMETRY_ONLY}