Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ coverage.xml
*.cover
.hypothesis/

# Claude Code & Serena
.claude/
.serena/

# FastAPI Radar specific
radar.db
example.db
Expand Down
233 changes: 164 additions & 69 deletions fastapi_radar/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""API endpoints for FastAPI Radar dashboard."""

from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING

from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
Expand All @@ -11,6 +11,9 @@
from .models import CapturedRequest, CapturedQuery, CapturedException, Trace, Span
from .tracing import TracingManager

if TYPE_CHECKING:
from .background_tasks import BackgroundTaskTracker


def round_float(value: Optional[float], decimals: int = 2) -> Optional[float]:
"""Round a float value to specified decimal places."""
Expand Down Expand Up @@ -119,15 +122,66 @@ class TraceDetail(BaseModel):
created_at: datetime
spans: List[WaterfallSpan]

class BackgroundTaskParams(BaseModel):
args: List[Any]
kwargs: Dict[str, Any]


class BackgroundTaskInfo(BaseModel):
id: str
function_key: str
function_name: str
status: str
queued_at: datetime
started_at: Optional[datetime]
ended_at: Optional[datetime]
duration_ms: Optional[float]
params: BackgroundTaskParams
error_message: Optional[str]
error_trace: Optional[str]

def create_api_router(get_session_context) -> APIRouter:

def create_api_router(
get_session_context,
get_session_local_context,
task_tracker: Optional["BackgroundTaskTracker"] = None,
) -> APIRouter:
router = APIRouter(prefix="/__radar/api", tags=["radar"])

def get_db():
"""Dependency function for FastAPI to get database session."""
with get_session_context() as session:
yield session

if task_tracker is not None:

@router.get(
"/background-tasks",
response_model=List[BackgroundTaskInfo],
tags=["radar-tasks"],
)
async def get_background_tasks() -> List[BackgroundTaskInfo]:
return task_tracker.list_tasks()

@router.delete(
"/background-tasks",
tags=["radar-tasks"],
)
async def clear_background_tasks() -> Dict[str, bool]:
task_tracker.clear()
return {"ok": True}

@router.post(
"/background-tasks/{task_id}/rerun",
tags=["radar-tasks"],
)
async def rerun_background_task(task_id: str) -> Dict[str, bool]:
try:
task_tracker.rerun(task_id)
except KeyError:
raise HTTPException(status_code=404, detail="Task not found")
return {"ok": True}

@router.get("/requests", response_model=List[RequestSummary])
async def get_requests(
limit: int = Query(100, ge=1, le=1000),
Expand Down Expand Up @@ -300,37 +354,41 @@ async def get_exceptions(
async def get_stats(
hours: int = Query(1, ge=1, le=720), # Allow up to 30 days
slow_threshold: int = Query(100),
session: Session = Depends(get_db),

):
since = datetime.now(timezone.utc) - timedelta(hours=hours)
SessionLocal = get_session_local_context() # 获取SessionLocal (sessionmaker)

requests = (
session.query(
func.count().label("total_requests"),
func.avg(CapturedRequest.duration_ms).label("avg_response_time"),
with SessionLocal() as requests_session:
requests = (
requests_session.query(
func.count().label("total_requests"),
func.avg(CapturedRequest.duration_ms).label("avg_response_time"),
)
.filter(CapturedRequest.created_at >= since)
.one()
)
.filter(CapturedRequest.created_at >= since)
.one()
)

queries = (
session.query(
func.count().label("total_queries"),
func.avg(CapturedQuery.duration_ms).label("avg_query_time"),
func.sum(
case((CapturedQuery.duration_ms >= slow_threshold, 1), else_=0)
).label("slow_queries"),
with SessionLocal() as queries_session:
queries = (
queries_session.query(
func.count().label("total_queries"),
func.avg(CapturedQuery.duration_ms).label("avg_query_time"),
func.sum(
case((CapturedQuery.duration_ms >= slow_threshold, 1), else_=0)
).label("slow_queries"),
)
.filter(CapturedQuery.created_at >= since)
.one()
)
.filter(CapturedQuery.created_at >= since)
.one()
)

exceptions = (
session.query(func.count().label("total_exceptions"))
.filter(CapturedException.created_at >= since)
.one()
)

with SessionLocal() as exceptions_session:
exceptions = (
exceptions_session.query(func.count().label("total_exceptions"))
.filter(CapturedException.created_at >= since)
.one()
)

total_requests = requests.total_requests
avg_response_time = requests.avg_response_time

Expand Down Expand Up @@ -377,6 +435,7 @@ async def get_traces(
service_name: Optional[str] = Query(None),
min_duration_ms: Optional[float] = Query(None),
hours: int = Query(24, ge=1, le=720),
search: Optional[str] = Query(None),
session: Session = Depends(get_db),
):
"""List traces."""
Expand All @@ -389,6 +448,8 @@ async def get_traces(
query = query.filter(Trace.service_name == service_name)
if min_duration_ms:
query = query.filter(Trace.duration_ms >= min_duration_ms)
if search:
query = query.filter(Trace.operation_name.ilike(f"%{search}%"))

traces = (
query.order_by(desc(Trace.start_time)).offset(offset).limit(limit).all()
Expand All @@ -412,56 +473,90 @@ async def get_traces(
@router.get("/traces/{trace_id}", response_model=TraceDetail)
async def get_trace_detail(
trace_id: str,
session: Session = Depends(get_db),
):
"""Get trace details."""
trace = session.query(Trace).filter(Trace.trace_id == trace_id).first()
if not trace:
raise HTTPException(status_code=404, detail="Trace not found")

# Fetch waterfall data
tracing_manager = TracingManager(lambda: get_session_context())
waterfall_spans = tracing_manager.get_waterfall_data(trace_id)

return TraceDetail(
trace_id=trace.trace_id,
service_name=trace.service_name,
operation_name=trace.operation_name,
start_time=trace.start_time,
end_time=trace.end_time,
duration_ms=round_float(trace.duration_ms),
span_count=trace.span_count,
status=trace.status,
tags=trace.tags,
created_at=trace.created_at,
spans=[WaterfallSpan(**span) for span in waterfall_spans],
)

import traceback
SessionLocal = get_session_local_context() # 获取SessionLocal (sessionmaker)
session = SessionLocal() # 创建实际的session实例
try:
#print(f"[DEBUG] Getting trace detail for trace_id: {trace_id}")
trace = session.query(Trace).filter(Trace.trace_id == trace_id).first()
# 不需要 commit,因为这是 SELECT 查询
if not trace:
#print(f"[DEBUG] Trace not found for trace_id: {trace_id}")
raise HTTPException(status_code=404, detail="Trace not found")

#print(f"[DEBUG] Trace found, fetching waterfall data...")
# Fetch waterfall data
session.close()
waterfall_spans = TracingManager.get_waterfall_data(get_session_local_context, trace_id)
#print(f"[DEBUG] Waterfall data fetched, {len(waterfall_spans)} spans")

return TraceDetail(
trace_id=trace.trace_id,
service_name=trace.service_name,
operation_name=trace.operation_name,
start_time=trace.start_time,
end_time=trace.end_time,
duration_ms=round_float(trace.duration_ms),
span_count=trace.span_count,
status=trace.status,
tags=trace.tags,
created_at=trace.created_at,
spans=[WaterfallSpan(**span) for span in waterfall_spans],
)
except HTTPException:
raise # 直接抛出 HTTPException
except Exception as e:
print(f"[ERROR] Exception in get_trace_detail: {str(e)}")
print(f"[ERROR] Traceback: {traceback.format_exc()}")
Comment on lines +511 to +512
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Error logging uses print statements in API endpoints.

Use a logging library instead of print statements to handle errors more effectively in production environments.

Suggested implementation:

            import logging
            logging.error(f"Exception in get_trace_detail: {str(e)}")
            logging.error(f"Traceback: {traceback.format_exc()}")

If logging is already imported at the top of the file, you can remove the import logging line from inside the function.
For production readiness, consider configuring the logging level and handlers at the application entry point (not shown in this snippet).

session.rollback()
raise HTTPException(status_code=500, detail=str(e))
finally:
session.close()
Comment on lines +508 to +516
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Catching and re-raising HTTPException is redundant.

Remove the except block for HTTPException and allow it to propagate without catching.

Suggested change
except HTTPException:
raise # 直接抛出 HTTPException
except Exception as e:
print(f"[ERROR] Exception in get_trace_detail: {str(e)}")
print(f"[ERROR] Traceback: {traceback.format_exc()}")
session.rollback()
raise HTTPException(status_code=500, detail=str(e))
finally:
session.close()
except Exception as e:
print(f"[ERROR] Exception in get_trace_detail: {str(e)}")
print(f"[ERROR] Traceback: {traceback.format_exc()}")
session.rollback()
raise HTTPException(status_code=500, detail=str(e))
finally:
session.close()


@router.get("/traces/{trace_id}/waterfall")
async def get_trace_waterfall(
trace_id: str,
session: Session = Depends(get_db),
):
"""Get optimized waterfall data for a trace."""
# Ensure the trace exists
trace = session.query(Trace).filter(Trace.trace_id == trace_id).first()
if not trace:
raise HTTPException(status_code=404, detail="Trace not found")

tracing_manager = TracingManager(lambda: get_session_context())
waterfall_data = tracing_manager.get_waterfall_data(trace_id)

return {
"trace_id": trace_id,
"spans": waterfall_data,
"trace_info": {
"service_name": trace.service_name,
"operation_name": trace.operation_name,
"total_duration_ms": trace.duration_ms,
"span_count": trace.span_count,
"status": trace.status,
},
}
import traceback
SessionLocal = get_session_local_context() # 获取SessionLocal (sessionmaker)
session = SessionLocal() # 创建实际的session实例
try:
#print(f"[DEBUG] Getting waterfall for trace_id: {trace_id}")
# Ensure the trace exists
trace = session.query(Trace).filter(Trace.trace_id == trace_id).first()
# 不需要 commit,因为这是 SELECT 查询
if not trace:
#print(f"[DEBUG] Trace not found for trace_id: {trace_id}")
raise HTTPException(status_code=404, detail="Trace not found")

#print(f"[DEBUG] Trace found, fetching waterfall data...")
session.close()
waterfall_data = TracingManager.get_waterfall_data(get_session_local_context, trace_id)
#print(f"[DEBUG] Waterfall data fetched, {len(waterfall_data)} spans")

return {
"trace_id": trace_id,
"spans": waterfall_data,
"trace_info": {
"service_name": trace.service_name,
"operation_name": trace.operation_name,
"total_duration_ms": trace.duration_ms,
"span_count": trace.span_count,
"status": trace.status,
},
}
except HTTPException:
raise # 直接抛出 HTTPException
except Exception as e:
print(f"[ERROR] Exception in get_trace_waterfall: {str(e)}")
print(f"[ERROR] Traceback: {traceback.format_exc()}")
session.rollback()
raise HTTPException(status_code=500, detail=str(e))
finally:
session.close()

@router.get("/spans/{span_id}")
async def get_span_detail(
Expand Down
15 changes: 15 additions & 0 deletions fastapi_radar/background_tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Background task monitoring utilities for FastAPI Radar."""

from .tracker import (
BackgroundTaskTracker,
create_tasks_websocket_router,
get_background_task_tracker,
install_background_task_tracker,
)

__all__ = [
"BackgroundTaskTracker",
"create_tasks_websocket_router",
"get_background_task_tracker",
"install_background_task_tracker",
]
Loading
Loading