|
15 | 15 | from seclab_taskflow_agent.path_utils import mcp_data_dir, log_file_name |
16 | 16 |
|
17 | 17 | from .repo_context_models import Application, EntryPoint, UserAction, WebEntryPoint, ApplicationIssue, AuditResult, Base |
| 18 | +from .repo_context_models import LowSeverityAuditResult |
18 | 19 | from .utils import process_repo |
19 | 20 |
|
20 | 21 | logging.basicConfig( |
|
26 | 27 |
|
27 | 28 | MEMORY = mcp_data_dir("seclab-taskflows", "repo_context", "REPO_CONTEXT_DIR") |
28 | 29 |
|
29 | | - |
30 | 30 | def app_to_dict(result): |
31 | 31 | return { |
32 | 32 | "app_id": result.id, |
@@ -107,6 +107,7 @@ def __init__(self, memcache_state_dir: str): |
107 | 107 | WebEntryPoint.__table__, |
108 | 108 | ApplicationIssue.__table__, |
109 | 109 | AuditResult.__table__, |
| 110 | + LowSeverityAuditResult.__table__, |
110 | 111 | ], |
111 | 112 | ) |
112 | 113 |
|
@@ -239,6 +240,22 @@ def store_new_user_action(self, repo, app_id, file, line, notes, update=False): |
239 | 240 | session.add(new_user_action) |
240 | 241 | session.commit() |
241 | 242 | return f"Updated or added user action for {file} and {line} in {repo}." |
| 243 | + |
| 244 | + def store_low_severity_reason(self, repo, component_id, result_id, reason): |
| 245 | + with Session(self.engine) as session: |
| 246 | + existing = session.query(LowSeverityAuditResult).filter_by(repo=repo, result_id=result_id).first() |
| 247 | + if existing: |
| 248 | + existing.reason += reason |
| 249 | + else: |
| 250 | + new_low_severity_result = LowSeverityAuditResult( |
| 251 | + repo=repo, |
| 252 | + component_id=component_id, |
| 253 | + result_id=result_id, |
| 254 | + reason=reason, |
| 255 | + ) |
| 256 | + session.add(new_low_severity_result) |
| 257 | + session.commit() |
| 258 | + return f"Updated or added low severity result for {repo} and result id {result_id}" |
242 | 259 |
|
243 | 260 | def get_app(self, repo, location): |
244 | 261 | with Session(self.engine) as session: |
@@ -294,6 +311,7 @@ def get_app_audit_results(self, repo, component_id, has_non_security_error, has_ |
294 | 311 | "repo": app.repo, |
295 | 312 | "issue_type": issue.issue_type, |
296 | 313 | "issue_id": issue.issue_id, |
| 314 | + "result_id" : issue.id, |
297 | 315 | "notes": issue.notes, |
298 | 316 | "has_vulnerability": issue.has_vulnerability, |
299 | 317 | "has_non_security_error": issue.has_non_security_error, |
@@ -389,6 +407,7 @@ def clear_repo(self, repo): |
389 | 407 | session.query(ApplicationIssue).filter_by(repo=repo).delete() |
390 | 408 | session.query(WebEntryPoint).filter_by(repo=repo).delete() |
391 | 409 | session.query(AuditResult).filter_by(repo=repo).delete() |
| 410 | + session.query(LowSeverityAuditResult).filter_by(repo=repo).delete() |
392 | 411 | session.commit() |
393 | 412 | return f"Cleared results for repo {repo}" |
394 | 413 |
|
@@ -782,6 +801,19 @@ def get_potential_audit_results_for_repo( |
782 | 801 | backend.get_app_audit_results(repo, component_id=None, has_non_security_error=True, has_vulnerability=None) |
783 | 802 | ) |
784 | 803 |
|
| 804 | +@mcp.tool() |
| 805 | +def store_low_severity_reason( |
| 806 | + owner: str = Field(description="The owner of the GitHub repository"), |
| 807 | + repo: str = Field(description="The name of the GitHub repository"), |
| 808 | + component_id: int = Field(description="The ID of the component"), |
| 809 | + result_id: int = Field(description="The ID of the audit result"), |
| 810 | + reason: str = Field(description="The reason why this issue is not considered high severity"), |
| 811 | +): |
| 812 | + """ |
| 813 | + Store the reason for auditing an issue as low severity. |
| 814 | + """ |
| 815 | + repo = process_repo(owner, repo) |
| 816 | + return backend.store_low_severity_reason(repo, component_id, result_id, reason) |
785 | 817 |
|
786 | 818 | @mcp.tool() |
787 | 819 | def clear_repo( |
|
0 commit comments