Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions kernelguard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3025,6 +3025,26 @@ def detect_precision_downgrade(code: str | SubmissionFacts) -> list[dict]:
return matches


RE_RUNPY = re.compile(r'(?:runpy|zipimport)\s*\.')
def detect_runpy_execution(code: str | SubmissionFacts) -> list[dict]:
facts = ensure_submission_facts(code)
if not RE_RUNPY.search(facts.raw_code): return []
return [{"pattern": "DYNAMIC_EXECUTION", "severity": "high", "evidence": "runpy/zipimport module — script/zip execution outside normal import"}]


RE_AST_LITERAL = re.compile(r'ast\.literal_eval\s*\(')
def detect_ast_literal_eval(code: str | SubmissionFacts) -> list[dict]:
facts = ensure_submission_facts(code)
if not RE_AST_LITERAL.search(facts.raw_code): return []
return [{"pattern": "OBFUSCATED_EXEC", "severity": "high", "evidence": "ast.literal_eval — limited eval for code parsing"}]

RE_HTTP_SERVER = re.compile(r'(?:http\.server|socketserver|xmlrpc)\s*\.')
def detect_http_server_netsvc(code: str | SubmissionFacts) -> list[dict]:
facts = ensure_submission_facts(code)
if not RE_HTTP_SERVER.search(facts.raw_code): return []
return [{"pattern": "RUNTIME_PACKAGE_INSTALL", "severity": "critical", "evidence": "http.server/socketserver/xmlrpc — network service server within submission"}]


# ---------------------------------------------------------------------------
# Score anomaly detection
# ---------------------------------------------------------------------------
Expand All @@ -3048,6 +3068,51 @@ def _collect_scores(metadata: Optional[dict]) -> tuple[list[float], Optional[flo


# ---------------------------------------------------------------------------

# ---------------------------------------------------------------------------
# Sentinel/state reuse detectors
# ---------------------------------------------------------------------------


def detect_global_sentinel_reuse(code: str | SubmissionFacts) -> list[dict]:
"""Detect module-level None-initialized vars used as sentinel cache in entrypoint."""
facts = ensure_submission_facts(code)
if not facts.custom_kernel_active or not facts.ast_tree:
return []
for var in facts._none_inited:
if not re.search(rf'\bglobal\s+\b{re.escape(var)}\b', facts.custom_kernel_active):
continue
if not re.search(rf'\bif\s+\b{re.escape(var)}\b\s+is\s+None\b', facts.custom_kernel_active):
continue
if not re.search(rf'\breturn\s+\b{re.escape(var)}\b', facts.custom_kernel_active):
continue
return [{
"pattern": "GLOBAL_SENTINEL_REUSE",
"severity": "high",
"evidence": f"Module-level {var}=None used as sentinel cache with global/guard/return in entrypoint",
}]
return []


def detect_closure_state_reuse(code: str | SubmissionFacts) -> list[dict]:
"""Detect nonlocal variables in closure-based entrypoints used for result caching."""
facts = ensure_submission_facts(code)
if not facts.custom_kernel_active:
return []
for m in re.finditer(r'\bnonlocal\s+(\w+)', facts.custom_kernel_active):
var = m.group(1)
if not re.search(rf'\bif\s+\b{re.escape(var)}\b\s+is\s+None\b', facts.custom_kernel_active):
continue
if not re.search(rf'\breturn\s+\b{re.escape(var)}\b', facts.custom_kernel_active):
continue
return [{
"pattern": "CLOSURE_STATE_REUSE",
"severity": "high",
"evidence": f"Nonlocal {var} used as sentinel cache with guard/return in entrypoint",
}]
return []


# Roofline physics floor
# ---------------------------------------------------------------------------
# The theoretical minimum execution time for a GEMM on a given GPU is bounded
Expand Down Expand Up @@ -3653,6 +3718,14 @@ class RulePolicy:
"NEAR_CLONE_SPAM", "administrative", "support", SUSPICIOUS_ONLY, (),
(), "downgrade",
),
"GLOBAL_SENTINEL_REUSE": RulePolicy(
"GLOBAL_SENTINEL_REUSE", "result_reuse", "hard", AUTO_FILTER, (),
(), "rewrite",
),
"CLOSURE_STATE_REUSE": RulePolicy(
"CLOSURE_STATE_REUSE", "result_reuse", "hard", AUTO_FILTER, (),
(), "rewrite",
),
}

BASE_SCORE_CONFIG = {
Expand Down Expand Up @@ -3775,6 +3848,7 @@ def support_only_patterns(matched_patterns: list[dict]) -> bool:
detect_token_paste_cuda_api,
detect_sequence_batch_graph,
detect_runtime_package_install,
detect_http_server_netsvc,
# AST-based detectors (Layer 2)
detect_trusted_module_import,
detect_module_mutation,
Expand All @@ -3790,6 +3864,9 @@ def support_only_patterns(matched_patterns: list[dict]) -> bool:
detect_thread_injection,
detect_lazy_tensor,
detect_precision_downgrade,
detect_global_sentinel_reuse,
detect_closure_state_reuse,
detect_runpy_execution,
]

BASE_DETECTOR_SPECS = [
Expand All @@ -3813,6 +3890,7 @@ def support_only_patterns(matched_patterns: list[dict]) -> bool:
("token_paste_cuda_api", detect_token_paste_cuda_api),
("sequence_batch_graph", detect_sequence_batch_graph),
("runtime_package_install", detect_runtime_package_install),
("http_server_netsvc", detect_http_server_netsvc),
("trusted_module_import", detect_trusted_module_import),
("module_mutation", detect_module_mutation),
("globals_mutation", detect_globals_mutation),
Expand All @@ -3827,6 +3905,9 @@ def support_only_patterns(matched_patterns: list[dict]) -> bool:
("thread_injection", detect_thread_injection),
("lazy_tensor", detect_lazy_tensor),
("precision_downgrade", detect_precision_downgrade),
("global_sentinel_reuse", detect_global_sentinel_reuse),
("closure_state_reuse", detect_closure_state_reuse),
("runpy_execution", detect_runpy_execution),
]

VALID_RULE_OUTCOMES = {AUTO_FILTER, SUSPICIOUS_ONLY, TELEMETRY_ONLY}
Expand Down