Skip to content

Commit 60e89a9

Browse files
authored
add tbs storage limit and disk-related metrics (#20464)
* Add tbs storage limit and disk-related metrics * Revert dependencies update
1 parent a556cc6 commit 60e89a9

4 files changed

Lines changed: 95 additions & 8 deletions

File tree

internal/beater/monitoringtest/opentelemetry.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,19 @@ func assertOtelMetrics(
9696
assert.Fail(t, "unexpected metric", m.Name)
9797
}
9898

99+
case metricdata.Gauge[float64]:
100+
assert.Equal(t, 1, len(d.DataPoints))
101+
foundMetrics = append(foundMetrics, m.Name)
102+
if skipValAssert {
103+
continue
104+
}
105+
106+
if v, ok := expectedMetrics[m.Name]; ok {
107+
assert.EqualValues(t, v, d.DataPoints[0].Value, m.Name)
108+
} else if fullMatch {
109+
assert.Fail(t, "unexpected metric", m.Name)
110+
}
111+
99112
case metricdata.Sum[int64]:
100113
assert.Equal(t, 1, len(d.DataPoints))
101114
foundMetrics = append(foundMetrics, m.Name)

x-pack/apm-server/main_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ func TestMonitoring(t *testing.T) {
5858
monitoringtest.ExpectContainOtelMetricsKeys(c, reader, []string{
5959
"apm-server.sampling.tail.storage.lsm_size",
6060
"apm-server.sampling.tail.storage.value_log_size",
61+
"apm-server.sampling.tail.storage.storage_limit",
62+
"apm-server.sampling.tail.storage.disk_used",
63+
"apm-server.sampling.tail.storage.disk_total",
64+
"apm-server.sampling.tail.storage.disk_usage_threshold_pct",
6165
})
6266
}, time.Second, 10*time.Millisecond)
6367

x-pack/apm-server/sampling/eventstorage/storage_manager.go

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import (
2020
"go.opentelemetry.io/otel/metric"
2121
"golang.org/x/sync/errgroup"
2222

23-
"github.com/elastic/apm-server/internal/logs"
2423
"github.com/elastic/elastic-agent-libs/logp"
24+
25+
"github.com/elastic/apm-server/internal/logs"
2526
)
2627

2728
const (
@@ -56,6 +57,10 @@ const (
5657
defaultValueLogSize = 0
5758

5859
gb = float64(1 << 30)
60+
61+
// configuredDiskUsageThresholdMultiplier is the multiplier for the stored configured disk usage threshold.
62+
// It is used to convert float64 disk usage to uint64 to be stored in atomic.Uint64.
63+
configuredDiskUsageThresholdMultiplier = 1000
5964
)
6065

6166
type StorageManagerOptions func(*StorageManager)
@@ -140,11 +145,21 @@ type StorageManager struct {
140145
// meterProvider is the OTel meter provider
141146
meterProvider metric.MeterProvider
142147
storageMetrics storageMetrics
148+
149+
// configuredStorageLimit stores the configured storage limit (0 means unlimited)
150+
configuredStorageLimit atomic.Uint64
151+
// configuredDiskUsageThreshold stores the configured disk usage threshold as percentage (0-1),
152+
// multiplied by configuredDiskUsageThresholdMultiplier
153+
configuredDiskUsageThreshold atomic.Uint64
143154
}
144155

145156
type storageMetrics struct {
146-
lsmSizeGauge metric.Int64Gauge
147-
valueLogSizeGauge metric.Int64Gauge
157+
lsmSizeGauge metric.Int64Gauge
158+
valueLogSizeGauge metric.Int64Gauge
159+
storageLimitGauge metric.Int64Gauge
160+
diskUsedGauge metric.Int64Gauge
161+
diskTotalGauge metric.Int64Gauge
162+
diskUsageThresholdGauge metric.Float64Gauge
148163
}
149164

150165
// NewStorageManager returns a new StorageManager with pebble DB at storageDir.
@@ -175,6 +190,10 @@ func NewStorageManager(storageDir string, logger *logp.Logger, opts ...StorageMa
175190

176191
sm.storageMetrics.lsmSizeGauge, _ = meter.Int64Gauge("apm-server.sampling.tail.storage.lsm_size")
177192
sm.storageMetrics.valueLogSizeGauge, _ = meter.Int64Gauge("apm-server.sampling.tail.storage.value_log_size")
193+
sm.storageMetrics.storageLimitGauge, _ = meter.Int64Gauge("apm-server.sampling.tail.storage.storage_limit")
194+
sm.storageMetrics.diskUsedGauge, _ = meter.Int64Gauge("apm-server.sampling.tail.storage.disk_used")
195+
sm.storageMetrics.diskTotalGauge, _ = meter.Int64Gauge("apm-server.sampling.tail.storage.disk_total")
196+
sm.storageMetrics.diskUsageThresholdGauge, _ = meter.Float64Gauge("apm-server.sampling.tail.storage.disk_usage_threshold_pct")
178197
}
179198

180199
if err := sm.reset(); err != nil {
@@ -280,10 +299,22 @@ func (sm *StorageManager) updateDiskUsage() {
280299
sm.storageMetrics.valueLogSizeGauge.Record(context.Background(), int64(defaultValueLogSize))
281300
}
282301

302+
// Record storage limit metric
303+
if sm.storageMetrics.storageLimitGauge != nil {
304+
sm.storageMetrics.storageLimitGauge.Record(context.Background(), int64(sm.configuredStorageLimit.Load()))
305+
}
306+
283307
if sm.getDiskUsageFailed.Load() {
284308
// Skip GetDiskUsage under the assumption that
285309
// it will always get the same error if GetDiskUsage ever returns one,
286310
// such that it does not keep logging GetDiskUsage errors.
311+
// Record zero values for disk metrics when disk usage check failed
312+
if sm.storageMetrics.diskUsedGauge != nil {
313+
sm.storageMetrics.diskUsedGauge.Record(context.Background(), 0)
314+
}
315+
if sm.storageMetrics.diskTotalGauge != nil {
316+
sm.storageMetrics.diskTotalGauge.Record(context.Background(), 0)
317+
}
287318
return
288319
}
289320
usage, err := sm.getDiskUsage()
@@ -292,10 +323,32 @@ func (sm *StorageManager) updateDiskUsage() {
292323
sm.getDiskUsageFailed.Store(true)
293324
sm.cachedDiskStat.used.Store(0)
294325
sm.cachedDiskStat.total.Store(0) // setting total to 0 to disable any running disk usage threshold checks
326+
// Record zero values for disk metrics when disk usage check failed
327+
if sm.storageMetrics.diskUsedGauge != nil {
328+
sm.storageMetrics.diskUsedGauge.Record(context.Background(), 0)
329+
}
330+
if sm.storageMetrics.diskTotalGauge != nil {
331+
sm.storageMetrics.diskTotalGauge.Record(context.Background(), 0)
332+
}
295333
return
296334
}
297335
sm.cachedDiskStat.used.Store(usage.UsedBytes)
298336
sm.cachedDiskStat.total.Store(usage.TotalBytes)
337+
338+
// Record disk utilization metrics
339+
if sm.storageMetrics.diskUsedGauge != nil {
340+
sm.storageMetrics.diskUsedGauge.Record(context.Background(), int64(usage.UsedBytes))
341+
}
342+
if sm.storageMetrics.diskTotalGauge != nil {
343+
sm.storageMetrics.diskTotalGauge.Record(context.Background(), int64(usage.TotalBytes))
344+
}
345+
// Record disk usage threshold as a percentage (0-1)
346+
if sm.storageMetrics.diskUsageThresholdGauge != nil {
347+
sm.storageMetrics.diskUsageThresholdGauge.Record(
348+
context.Background(),
349+
float64(sm.configuredDiskUsageThreshold.Load())/configuredDiskUsageThresholdMultiplier,
350+
)
351+
}
299352
}
300353

301354
// diskUsed returns the actual used disk space in bytes.
@@ -429,6 +482,11 @@ func (sm *StorageManager) WriteSubscriberPosition(data []byte) error {
429482

430483
// NewReadWriter returns a read writer configured with storage limit and disk usage threshold.
431484
func (sm *StorageManager) NewReadWriter(storageLimit uint64, diskUsageThreshold float64) RW {
485+
// Store configured values for monitoring metrics
486+
sm.configuredStorageLimit.Store(storageLimit)
487+
// Store disk usage threshold as percentage (0-1), multiplied by configuredDiskUsageThresholdMultiplier
488+
sm.configuredDiskUsageThreshold.Store(uint64(diskUsageThreshold * configuredDiskUsageThresholdMultiplier))
489+
432490
var rw RW = SplitReadWriter{
433491
eventRW: sm.eventStorage.NewReadWriter(),
434492
decisionRW: sm.decisionStorage.NewReadWriter(),

x-pack/apm-server/sampling/processor_test.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -633,18 +633,23 @@ func TestGroupsMonitoring(t *testing.T) {
633633
//
634634
// It is helpful to provide multiple names for synchronous metrics to avoid losing data when collecting.
635635
// Observable metrics report everytime Collect is called, so there will be no data loss.
636-
func getGaugeValues(t testing.TB, reader sdkmetric.Reader, names ...string) []int64 {
636+
func getGaugeValues(t testing.TB, reader sdkmetric.Reader, names ...string) []float64 {
637637
var rm metricdata.ResourceMetrics
638638
assert.NoError(t, reader.Collect(context.Background(), &rm))
639639

640640
assert.NotEqual(t, 0, len(rm.ScopeMetrics))
641641

642-
values := make([]int64, len(names))
642+
values := make([]float64, len(names))
643643
for i, name := range names {
644644
for _, sm := range rm.ScopeMetrics {
645645
for _, m := range sm.Metrics {
646646
if m.Name == name {
647-
values[i] = m.Data.(metricdata.Gauge[int64]).DataPoints[0].Value
647+
switch g := m.Data.(type) {
648+
case metricdata.Gauge[int64]:
649+
values[i] = float64(g.DataPoints[0].Value)
650+
case metricdata.Gauge[float64]:
651+
values[i] = g.DataPoints[0].Value
652+
}
648653
}
649654
}
650655
}
@@ -685,9 +690,16 @@ func TestStorageMonitoring(t *testing.T) {
685690

686691
require.NoError(t, config.DB.Flush())
687692

688-
metricsNames := []string{"apm-server.sampling.tail.storage.lsm_size", "apm-server.sampling.tail.storage.value_log_size"}
693+
metricsNames := []string{
694+
"apm-server.sampling.tail.storage.lsm_size",
695+
"apm-server.sampling.tail.storage.value_log_size",
696+
"apm-server.sampling.tail.storage.storage_limit",
697+
"apm-server.sampling.tail.storage.disk_used",
698+
"apm-server.sampling.tail.storage.disk_total",
699+
"apm-server.sampling.tail.storage.disk_usage_threshold_pct",
700+
}
689701
gaugeValues := getGaugeValues(t, tempdirConfig.metricReader, metricsNames...)
690-
assert.Len(t, gaugeValues, 2)
702+
assert.Len(t, gaugeValues, 6)
691703

692704
lsmSize := gaugeValues[0]
693705
assert.NotZero(t, lsmSize)

0 commit comments

Comments
 (0)