Skip to content

Commit a51c6fe

Browse files
committed
fix: address code review issues in metrics collector
- Fix CPU calculation: maintain previous CPU stats history since ContainerStatsOneShot returns empty PreCPUStats - Add mutex to Collect method to prevent race conditions - Add defer for response body close to prevent leaks - Fix dataset metrics calculation (total datasets from snapshots) - Store pool reference in variable to avoid repeated calls
1 parent ffd30a2 commit a51c6fe

1 file changed

Lines changed: 83 additions & 37 deletions

File tree

engine/internal/srv/metrics/collector.go

Lines changed: 83 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"encoding/json"
1010
"fmt"
1111
"strconv"
12+
"sync"
1213
"time"
1314

1415
"github.com/docker/docker/api/types/container"
@@ -22,15 +23,25 @@ import (
2223
"gitlab.com/postgres-ai/database-lab/v3/pkg/models"
2324
)
2425

26+
// containerCPUState stores previous CPU stats for delta calculation.
27+
type containerCPUState struct {
28+
totalUsage uint64
29+
systemUsage uint64
30+
timestamp time.Time
31+
}
32+
2533
// Collector collects metrics from DBLab components.
2634
type Collector struct {
27-
metrics *Metrics
28-
cloning *cloning.Base
29-
retrieval *retrieval.Retrieval
30-
pm *pool.Manager
31-
engProps *global.EngineProps
32-
dockerClient *client.Client
33-
startedAt time.Time
35+
mu sync.Mutex
36+
metrics *Metrics
37+
cloning *cloning.Base
38+
retrieval *retrieval.Retrieval
39+
pm *pool.Manager
40+
engProps *global.EngineProps
41+
dockerClient *client.Client
42+
startedAt time.Time
43+
prevCPUStats map[string]containerCPUState
44+
prevCPUStatsMu sync.RWMutex
3445
}
3546

3647
// NewCollector creates a new metrics collector.
@@ -51,11 +62,15 @@ func NewCollector(
5162
engProps: engProps,
5263
dockerClient: dockerClient,
5364
startedAt: startedAt,
65+
prevCPUStats: make(map[string]containerCPUState),
5466
}
5567
}
5668

5769
// Collect gathers all metrics.
5870
func (c *Collector) Collect(ctx context.Context) {
71+
c.mu.Lock()
72+
defer c.mu.Unlock()
73+
5974
c.metrics.Reset()
6075

6176
c.collectInstanceMetrics()
@@ -87,13 +102,14 @@ func (c *Collector) collectPoolMetrics() {
87102
fsmList := c.pm.GetFSManagerList()
88103

89104
for _, fsm := range fsmList {
90-
if fsm.Pool() == nil {
105+
p := fsm.Pool()
106+
if p == nil {
91107
continue
92108
}
93109

94-
poolName := fsm.Pool().Name
95-
poolMode := fsm.Pool().Mode
96-
poolStatus := string(fsm.Pool().Status())
110+
poolName := p.Name
111+
poolMode := p.Mode
112+
poolStatus := string(p.Status())
97113

98114
c.metrics.PoolStatus.WithLabelValues(poolName, poolMode, poolStatus).Set(1)
99115

@@ -122,19 +138,20 @@ func (c *Collector) collectDatasetMetrics(fsm pool.FSManager, poolName string) {
122138
return
123139
}
124140

125-
branches, err := fsm.ListBranches()
126-
if err != nil {
127-
log.Err("failed to list branches for pool", poolName, err)
128-
return
129-
}
130-
131141
snapshotList := fsm.SnapshotList()
132142

133-
totalDatasets := len(cloneNames) + len(branches) + len(snapshotList)
143+
// total datasets = snapshots (each snapshot is a dataset slot)
144+
// available = snapshots without active clones (can be reused after full refresh)
145+
totalDatasets := len(snapshotList)
134146
busyDatasets := len(cloneNames)
147+
availableDatasets := totalDatasets - busyDatasets
148+
149+
if availableDatasets < 0 {
150+
availableDatasets = 0
151+
}
135152

136153
c.metrics.DatasetsTotal.WithLabelValues(poolName).Set(float64(totalDatasets))
137-
c.metrics.DatasetsAvailable.WithLabelValues(poolName).Set(float64(totalDatasets - busyDatasets))
154+
c.metrics.DatasetsAvailable.WithLabelValues(poolName).Set(float64(availableDatasets))
138155
}
139156

140157
func (c *Collector) collectCloneMetrics(ctx context.Context) {
@@ -215,16 +232,15 @@ func (c *Collector) getContainerStats(ctx context.Context, clones []*models.Clon
215232
continue
216233
}
217234

235+
defer stats.Body.Close()
236+
218237
var statsJSON container.StatsResponse
219238
if err := json.NewDecoder(stats.Body).Decode(&statsJSON); err != nil {
220239
log.Dbg(fmt.Sprintf("failed to decode container stats for clone %s: %v", clone.ID, err))
221-
stats.Body.Close()
222240
continue
223241
}
224242

225-
stats.Body.Close()
226-
227-
cpuPercent := calculateCPUPercent(&statsJSON)
243+
cpuPercent := c.calculateCPUPercent(clone.ID, &statsJSON)
228244
memoryUsage := statsJSON.MemoryStats.Usage
229245
memoryLimit := statsJSON.MemoryStats.Limit
230246

@@ -238,22 +254,51 @@ func (c *Collector) getContainerStats(ctx context.Context, clones []*models.Clon
238254
return result
239255
}
240256

241-
func calculateCPUPercent(stats *container.StatsResponse) float64 {
242-
cpuDelta := float64(stats.CPUStats.CPUUsage.TotalUsage - stats.PreCPUStats.CPUUsage.TotalUsage)
243-
systemDelta := float64(stats.CPUStats.SystemUsage - stats.PreCPUStats.SystemUsage)
257+
// calculateCPUPercent calculates CPU percentage using stored previous stats.
258+
// ContainerStatsOneShot returns empty PreCPUStats, so we maintain our own history.
259+
func (c *Collector) calculateCPUPercent(cloneID string, stats *container.StatsResponse) float64 {
260+
currentTotalUsage := stats.CPUStats.CPUUsage.TotalUsage
261+
currentSystemUsage := stats.CPUStats.SystemUsage
262+
now := time.Now()
244263

245-
if systemDelta > 0 && cpuDelta > 0 {
246-
cpuCount := float64(stats.CPUStats.OnlineCPUs)
247-
if cpuCount == 0 {
248-
cpuCount = float64(len(stats.CPUStats.CPUUsage.PercpuUsage))
249-
}
264+
c.prevCPUStatsMu.RLock()
265+
prevStats, hasPrev := c.prevCPUStats[cloneID]
266+
c.prevCPUStatsMu.RUnlock()
250267

251-
if cpuCount > 0 {
252-
return (cpuDelta / systemDelta) * cpuCount * 100.0
253-
}
268+
c.prevCPUStatsMu.Lock()
269+
c.prevCPUStats[cloneID] = containerCPUState{
270+
totalUsage: currentTotalUsage,
271+
systemUsage: currentSystemUsage,
272+
timestamp: now,
273+
}
274+
c.prevCPUStatsMu.Unlock()
275+
276+
if !hasPrev {
277+
return 0
278+
}
279+
280+
timeDelta := now.Sub(prevStats.timestamp)
281+
if timeDelta < time.Second {
282+
return 0
283+
}
284+
285+
cpuDelta := float64(currentTotalUsage - prevStats.totalUsage)
286+
systemDelta := float64(currentSystemUsage - prevStats.systemUsage)
287+
288+
if systemDelta <= 0 || cpuDelta < 0 {
289+
return 0
290+
}
291+
292+
cpuCount := float64(stats.CPUStats.OnlineCPUs)
293+
if cpuCount == 0 {
294+
cpuCount = float64(len(stats.CPUStats.CPUUsage.PercpuUsage))
295+
}
296+
297+
if cpuCount <= 0 {
298+
cpuCount = 1
254299
}
255300

256-
return 0
301+
return (cpuDelta / systemDelta) * cpuCount * 100.0
257302
}
258303

259304
func (c *Collector) collectSnapshotMetrics() {
@@ -309,11 +354,12 @@ func (c *Collector) collectBranchMetrics() {
309354
totalBranches := 0
310355

311356
for _, fsm := range fsmList {
312-
if fsm.Pool() == nil {
357+
p := fsm.Pool()
358+
if p == nil {
313359
continue
314360
}
315361

316-
poolName := fsm.Pool().Name
362+
poolName := p.Name
317363
branches, err := fsm.ListBranches()
318364

319365
if err != nil {

0 commit comments

Comments
 (0)