Skip to content

Commit 2f85792

Browse files
committed
fix: preserve replica data on refresh_schema
Will be mostly made obsolete later by #10
1 parent e2829b7 commit 2f85792

1 file changed

Lines changed: 54 additions & 32 deletions

File tree

crates/dry_run_cli/src/mcp/server.rs

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,22 @@ pub fn wrap_schema_only(schema: SchemaSnapshot) -> AnnotatedSnapshot {
5151
}
5252
}
5353

54+
fn build_inline(
55+
schema: SchemaSnapshot,
56+
planner: Option<dry_run_core::PlannerStatsSnapshot>,
57+
primary_activity: Option<dry_run_core::ActivityStatsSnapshot>,
58+
) -> AnnotatedSnapshot {
59+
let mut activity_by_node = std::collections::BTreeMap::new();
60+
if let Some(a) = primary_activity {
61+
activity_by_node.insert("primary".to_string(), a);
62+
}
63+
AnnotatedSnapshot {
64+
schema,
65+
planner,
66+
activity_by_node,
67+
}
68+
}
69+
5470
#[derive(Clone)]
5571
pub struct DryRunServer {
5672
ctx: Option<Arc<DryRun>>,
@@ -1454,52 +1470,58 @@ impl DryRunServer {
14541470
.introspect_schema()
14551471
.await
14561472
.map_err(|e| McpError::internal_error(format!("introspection failed: {e}"), None))?;
1457-
let schema_hash = schema.content_hash.clone();
1458-
1459-
let planner = match ctx.introspect_planner_stats(&schema_hash).await {
1460-
Ok(p) => Some(p),
1461-
Err(e) => {
1462-
tracing::warn!(error = %e, "planner stats introspection failed; continuing without");
1463-
None
1464-
}
1465-
};
1473+
let hash = schema.content_hash.clone();
1474+
let planner = ctx
1475+
.introspect_planner_stats(&hash)
1476+
.await
1477+
.inspect_err(|e| tracing::warn!(error = %e, "planner stats unavailable"))
1478+
.ok();
1479+
let primary = ctx
1480+
.introspect_activity_stats(&hash, "primary")
1481+
.await
1482+
.inspect_err(|e| tracing::warn!(error = %e, "primary activity unavailable"))
1483+
.ok();
14661484

1467-
let mut activity_by_node = std::collections::BTreeMap::new();
1468-
match ctx.introspect_activity_stats(&schema_hash, "primary").await {
1469-
Ok(a) => {
1470-
activity_by_node.insert("primary".to_string(), a);
1471-
}
1472-
Err(e) => {
1473-
tracing::warn!(error = %e, "activity stats introspection failed; continuing without");
1474-
}
1475-
}
1485+
let mut annotated = build_inline(schema, planner, primary);
14761486

14771487
if let (Some(store), Some(key)) = (self.history.as_ref(), self.snapshot_key.as_ref()) {
1478-
persist_refresh(store, key, &schema, planner.as_ref(), &activity_by_node).await;
1488+
persist_refresh(
1489+
store,
1490+
key,
1491+
&annotated.schema,
1492+
annotated.planner.as_ref(),
1493+
&annotated.activity_by_node,
1494+
)
1495+
.await;
1496+
match store.get_annotated(key, SnapshotRef::Latest).await {
1497+
Ok(a) => annotated = a,
1498+
Err(e) => tracing::warn!(error = %e, "history reload after refresh failed"),
1499+
}
14791500
}
14801501

14811502
let body = format!(
14821503
"Schema refreshed: {} tables, {} views, {} functions (hash: {})\n\
14831504
Planner stats: {}\n\
1484-
Activity stats: {} node(s)",
1485-
schema.tables.len(),
1486-
schema.views.len(),
1487-
schema.functions.len(),
1488-
&schema_hash[..16],
1489-
if planner.is_some() {
1505+
Activity stats: {} node(s) [{}]",
1506+
annotated.schema.tables.len(),
1507+
annotated.schema.views.len(),
1508+
annotated.schema.functions.len(),
1509+
&annotated.schema.content_hash[..16],
1510+
if annotated.planner.is_some() {
14901511
"captured"
14911512
} else {
14921513
"unavailable"
14931514
},
1494-
activity_by_node.len(),
1515+
annotated.activity_by_node.len(),
1516+
annotated
1517+
.activity_by_node
1518+
.keys()
1519+
.cloned()
1520+
.collect::<Vec<_>>()
1521+
.join(", "),
14951522
);
14961523

1497-
*self.schema.write().await = Some(AnnotatedSnapshot {
1498-
schema,
1499-
planner,
1500-
activity_by_node,
1501-
});
1502-
1524+
*self.schema.write().await = Some(annotated);
15031525
let text = self.wrap_text(&body, None);
15041526
Ok(CallToolResult::success(vec![Content::text(text)]))
15051527
}

0 commit comments

Comments
 (0)