66import json
77from urllib .parse import urlparse , parse_qs
88from .gh_code_scanning import call_api
9- from seclab_taskflow_agent .path_utils import log_file_name
9+ from seclab_taskflow_agent .path_utils import mcp_data_dir , log_file_name
10+ from .ghsa_models import GHSA , GHSASummary , Base
11+ from pathlib import Path
12+ from sqlalchemy import create_engine
13+ from sqlalchemy .orm import Session
14+ from .utils import process_repo
1015
1116logging .basicConfig (
1217 level = logging .DEBUG ,
1722
1823mcp = FastMCP ("GitHubRepoAdvisories" )
1924
25+ MEMORY = mcp_data_dir ("seclab-taskflows" , "ghsa" , "GHSA_DIR" )
26+
27+
28+ def ghsa_to_dict (result ):
29+ return {
30+ "id" : result .id ,
31+ "ghsa_id" : result .ghsa_id ,
32+ "repo" : result .repo .lower (),
33+ "severity" : result .severity ,
34+ "cve_id" : result .cve_id ,
35+ "description" : result .description ,
36+ "summary" : result .summary ,
37+ "published_at" : result .published_at ,
38+ "state" : result .state ,
39+ }
40+
41+
42+ def ghsa_summary_to_dict (summary ):
43+ return {
44+ "id" : summary .id ,
45+ "repo" : summary .repo .lower (),
46+ "total_advisories" : summary .total_advisories ,
47+ "high_severity_count" : summary .high_severity_count ,
48+ "medium_severity_count" : summary .medium_severity_count ,
49+ "low_severity_count" : summary .low_severity_count ,
50+ "summary_notes" : summary .summary_notes ,
51+ }
52+
53+ class GHSABackend :
54+ def __init__ (self , memcache_state_dir : str ):
55+ self .memcache_state_dir = memcache_state_dir
56+ self .location_pattern = r"^([a-zA-Z]+)(:\d+){4}$"
57+ if not Path (self .memcache_state_dir ).exists ():
58+ db_dir = "sqlite://"
59+ else :
60+ db_dir = f"sqlite:///{ self .memcache_state_dir } /ghsa.db"
61+ self .engine = create_engine (db_dir , echo = False )
62+ Base .metadata .create_all (
63+ self .engine ,
64+ tables = [
65+ GHSA .__table__ ,
66+ GHSASummary .__table__ ,
67+ ],
68+ )
69+
70+ def store_new_ghsa (self , repo , ghsa_id , severity , cve_id , description , summary , published_at , state ):
71+ with Session (self .engine ) as session :
72+ existing = session .query (GHSA ).filter_by (repo = repo , ghsa_id = ghsa_id ).first ()
73+ if existing :
74+ if severity :
75+ existing .severity = severity
76+ if cve_id :
77+ existing .cve_id = cve_id
78+ if description :
79+ existing .description = description
80+ if summary :
81+ existing .summary = summary
82+ if published_at :
83+ existing .published_at = published_at
84+ if state :
85+ existing .state = state
86+ else :
87+ new_ghsa = GHSA (
88+ repo = repo ,
89+ ghsa_id = ghsa_id ,
90+ severity = severity ,
91+ cve_id = cve_id ,
92+ description = description ,
93+ summary = summary ,
94+ published_at = published_at ,
95+ state = state ,
96+ )
97+ session .add (new_ghsa )
98+ session .commit ()
99+ return f"Updated or added GHSA { ghsa_id } for { repo } "
100+
101+ def get_ghsa (self , repo , ghsa_id ):
102+ with Session (self .engine ) as session :
103+ existing = session .query (GHSA ).filter_by (repo = repo , ghsa_id = ghsa_id ).first ()
104+ if not existing :
105+ return None
106+ return ghsa_to_dict (existing )
107+
108+ def get_ghsas (self , repo ):
109+ with Session (self .engine ) as session :
110+ existing = session .query (GHSA ).filter_by (repo = repo ).all ()
111+ return [ghsa_to_dict (ghsa ) for ghsa in existing ]
112+
113+ def store_new_ghsa_summary (
114+ self ,
115+ repo ,
116+ total_advisories ,
117+ high_severity_count ,
118+ medium_severity_count ,
119+ low_severity_count ,
120+ summary_notes ,
121+ ):
122+ with Session (self .engine ) as session :
123+ existing = session .query (GHSASummary ).filter_by (repo = repo ).first ()
124+ if existing :
125+ existing .total_advisories = total_advisories
126+ existing .high_severity_count = high_severity_count
127+ existing .medium_severity_count = medium_severity_count
128+ existing .low_severity_count = low_severity_count
129+ existing .summary_notes = (existing .summary_notes or "" ) + (summary_notes or "" )
130+ else :
131+ new_summary = GHSASummary (
132+ repo = repo ,
133+ total_advisories = total_advisories ,
134+ high_severity_count = high_severity_count ,
135+ medium_severity_count = medium_severity_count ,
136+ low_severity_count = low_severity_count ,
137+ summary_notes = summary_notes ,
138+ )
139+ session .add (new_summary )
140+ session .commit ()
141+ return f"Updated or added GHSA summary for { repo } "
142+
143+ def get_ghsa_summary (self , repo ):
144+ with Session (self .engine ) as session :
145+ existing = session .query (GHSASummary ).filter_by (repo = repo ).first ()
146+ if not existing :
147+ return None
148+ return ghsa_summary_to_dict (existing )
149+
150+ def clear_repo (self , repo ):
151+ with Session (self .engine ) as session :
152+ session .query (GHSA ).filter_by (repo = repo ).delete ()
153+ session .query (GHSASummary ).filter_by (repo = repo ).delete ()
154+ session .commit ()
155+ return f"Cleared GHSA results for repo { repo } "
156+
157+
158+ backend = GHSABackend (MEMORY )
20159
21160# The advisories contain a lot of information, so we need to filter
22161# some of it out to avoid exceeding the maximum prompt size.
@@ -26,6 +165,8 @@ def parse_advisory(advisory: dict) -> dict:
26165 "ghsa_id" : advisory .get ("ghsa_id" , "" ),
27166 "cve_id" : advisory .get ("cve_id" , "" ),
28167 "summary" : advisory .get ("summary" , "" ),
168+ "description" : advisory .get ("description" , "" ),
169+ "severity" : advisory .get ("severity" , "" ),
29170 "published_at" : advisory .get ("published_at" , "" ),
30171 "state" : advisory .get ("state" , "" ),
31172 }
@@ -70,6 +211,145 @@ async def fetch_GHSA_list(
70211 return results
71212 return json .dumps (results , indent = 2 )
72213
214+ @mcp .tool ()
215+ async def fetch_and_store_GHSA_list (
216+ owner : str = Field (description = "The owner of the repo" ), repo : str = Field (description = "The repository name" ),
217+ return_results : bool = Field (description = "Whether to return the fetched results as a JSON string" , default = False )
218+ ) -> str :
219+ """Fetch all GitHub Security Advisories (GHSAs) for a specific repository and store them in the database."""
220+ results = await fetch_GHSA_list_from_gh (owner , repo )
221+ if isinstance (results , str ):
222+ return results
223+ for advisory in results :
224+ backend .store_new_ghsa (
225+ process_repo (owner , repo ),
226+ advisory ["ghsa_id" ],
227+ advisory ["severity" ],
228+ advisory ["cve_id" ],
229+ advisory ["description" ],
230+ advisory ["summary" ],
231+ advisory ["published_at" ],
232+ advisory ["state" ],
233+ )
234+ if return_results :
235+ return json .dumps (results , indent = 2 )
236+ return f"Fetched and stored { len (results )} GHSAs for { owner } /{ repo } "
237+
238+ @mcp .tool ()
239+ def store_new_ghsa (
240+ owner : str = Field (description = "The owner of the GitHub repository" ),
241+ repo : str = Field (description = "The name of the GitHub repository" ),
242+ ghsa_id : str = Field (description = "The GHSA ID of the advisory" ),
243+ severity : str = Field (description = "The severity of the advisory" ),
244+ cve_id : str = Field (description = "The CVE ID if available" , default = "" ),
245+ description : str = Field (description = "Description for this advisory" , default = "" ),
246+ summary : str = Field (description = "Summary for this advisory" , default = "" ),
247+ published_at : str = Field (description = "Published timestamp for this advisory" , default = "" ),
248+ state : str = Field (description = "State for this advisory (e.g. published, withdrawn)" , default = "" ),
249+ ):
250+ """Store a GHSA advisory record in the database."""
251+ return backend .store_new_ghsa (
252+ process_repo (owner , repo ), ghsa_id , severity , cve_id , description , summary , published_at , state
253+ )
254+
255+ @mcp .tool ()
256+ def get_ghsa_from_db (
257+ owner : str = Field (description = "The owner of the GitHub repository" ),
258+ repo : str = Field (description = "The name of the GitHub repository" ),
259+ ghsa_id : str = Field (description = "The GHSA ID of the advisory" ),
260+ ):
261+ """Get a GHSA advisory record from the database."""
262+ repo_name = process_repo (owner , repo )
263+ result = backend .get_ghsa (repo_name , ghsa_id )
264+ if not result :
265+ return f"Error: No GHSA entry exists in repo: { repo_name } and ghsa_id { ghsa_id } "
266+ return json .dumps (result )
267+
268+
269+ @mcp .tool ()
270+ def get_ghsas_for_repo_from_db (
271+ owner : str = Field (description = "The owner of the GitHub repository" ),
272+ repo : str = Field (description = "The name of the GitHub repository" ),
273+ ):
274+ """Get all GHSA advisory records for a repository."""
275+ return json .dumps (backend .get_ghsas (process_repo (owner , repo )))
276+
277+ @mcp .tool ()
278+ def get_ghsa_with_id_from_db (
279+ owner : str = Field (description = "The owner of the GitHub repository" ),
280+ repo : str = Field (description = "The name of the GitHub repository" ),
281+ ghsa_id : str = Field (description = "The GHSA ID of the advisory" ),
282+ ):
283+ """Get a GHSA advisory record with a specific GHSA ID from the database."""
284+ repo_name = process_repo (owner , repo )
285+ result = backend .get_ghsa (repo_name , ghsa_id )
286+ if not result :
287+ return f"Error: No GHSA entry exists in repo: { repo_name } and ghsa_id { ghsa_id } "
288+ return json .dumps (result )
289+
290+ @mcp .tool ()
291+ def store_new_ghsa_summary (
292+ owner : str = Field (description = "The owner of the GitHub repository" ),
293+ repo : str = Field (description = "The name of the GitHub repository" ),
294+ total_advisories : int = Field (description = "Total number of advisories" ),
295+ high_severity_count : int = Field (description = "Number of high severity advisories" ),
296+ medium_severity_count : int = Field (description = "Number of medium severity advisories" ),
297+ low_severity_count : int = Field (description = "Number of low severity advisories" ),
298+ summary_notes : str = Field (description = "Notes for the advisory summary" , default = "" ),
299+ ):
300+ """Store GHSA summary statistics for a repository."""
301+ return backend .store_new_ghsa_summary (
302+ process_repo (owner , repo ),
303+ total_advisories ,
304+ high_severity_count ,
305+ medium_severity_count ,
306+ low_severity_count ,
307+ summary_notes ,
308+ )
309+
310+
311+ @mcp .tool ()
312+ def add_ghsa_summary_notes (
313+ owner : str = Field (description = "The owner of the GitHub repository" ),
314+ repo : str = Field (description = "The name of the GitHub repository" ),
315+ summary_notes : str = Field (description = "New notes for the advisory summary" , default = "" ),
316+ ):
317+ """Append notes to the GHSA summary for a repository."""
318+ repo_name = process_repo (owner , repo )
319+ existing = backend .get_ghsa_summary (repo_name )
320+ if not existing :
321+ return f"Error: No GHSA summary exists in repo: { repo_name } "
322+ return backend .store_new_ghsa_summary (
323+ repo_name ,
324+ existing ["total_advisories" ],
325+ existing ["high_severity_count" ],
326+ existing ["medium_severity_count" ],
327+ existing ["low_severity_count" ],
328+ summary_notes ,
329+ )
330+
331+
332+ @mcp .tool ()
333+ def get_ghsa_summary (
334+ owner : str = Field (description = "The owner of the GitHub repository" ),
335+ repo : str = Field (description = "The name of the GitHub repository" ),
336+ ):
337+ """Get the GHSA summary for a repository."""
338+ repo_name = process_repo (owner , repo )
339+ result = backend .get_ghsa_summary (repo_name )
340+ if not result :
341+ return f"Error: No GHSA summary exists in repo: { repo_name } "
342+ return json .dumps (result )
343+
344+
345+ @mcp .tool ()
346+ def clear_repo (
347+ owner : str = Field (description = "The owner of the GitHub repository" ),
348+ repo : str = Field (description = "The name of the GitHub repository" ),
349+ ):
350+ """Clear GHSA and GHSA summary records for a repository."""
351+ return backend .clear_repo (process_repo (owner , repo ))
352+
73353
74354async def fetch_GHSA_details_from_gh (owner : str , repo : str , ghsa_id : str ) -> str | dict :
75355 """Fetch the details of a repository security advisory."""
0 commit comments