Skip to content

Commit 5b24ae3

Browse files
committed
[*] update monitored sources with real metric definitions from presets
1 parent b0f28e1 commit 5b24ae3

2 files changed

Lines changed: 31 additions & 43 deletions

File tree

internal/reaper/metric.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,5 +97,14 @@ func (r *Reaper) LoadMetrics() (err error) {
9797
}
9898
return logrus.InfoLevel
9999
}(), "metrics and presets refreshed")
100+
// update the monitored sources with real metric definitions from presets
101+
for _, md := range r.monitoredSources {
102+
if md.PresetMetrics > "" {
103+
md.Metrics = metricDefs.GetPresetMetrics(md.PresetMetrics)
104+
}
105+
if md.PresetMetricsStandby > "" {
106+
md.MetricsStandby = metricDefs.GetPresetMetrics(md.PresetMetricsStandby)
107+
}
108+
}
100109
return
101110
}

internal/reaper/reaper.go

Lines changed: 22 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -91,27 +91,12 @@ func (r *Reaper) Reap(ctx context.Context) (err error) {
9191
continue
9292
}
9393

94-
metricsConfig = func() map[string]float64 {
95-
if len(monitoredSource.Metrics) > 0 {
96-
return monitoredSource.Metrics
97-
}
98-
if monitoredSource.PresetMetrics > "" {
99-
return metricDefs.GetPresetMetrics(monitoredSource.PresetMetrics)
100-
}
101-
return nil
102-
}()
103-
hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
104-
if monitoredSource.IsInRecovery {
105-
metricsConfig = func() map[string]float64 {
106-
if len(monitoredSource.MetricsStandby) > 0 {
107-
return monitoredSource.MetricsStandby
108-
}
109-
if monitoredSource.PresetMetricsStandby > "" {
110-
return metricDefs.GetPresetMetrics(monitoredSource.PresetMetricsStandby)
111-
}
112-
return nil
113-
}()
94+
if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
95+
metricsConfig = monitoredSource.MetricsStandby
96+
} else {
97+
metricsConfig = monitoredSource.Metrics
11498
}
99+
hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
115100

116101
r.CreateSourceHelpers(ctx, srcL, monitoredSource)
117102

@@ -178,7 +163,7 @@ func (r *Reaper) Reap(ctx context.Context) (err error) {
178163
srcL.Error(err)
179164
}
180165

181-
go r.reapMetricMeasurements(metricCtx, monitoredSource, metric, metricsConfig[metric])
166+
go r.reapMetricMeasurements(metricCtx, monitoredSource, metric)
182167
}
183168
} else if (!metricDefExists && cancelFuncExists) || interval <= 0 {
184169
// metric definition files were recently removed or interval set to zero
@@ -260,23 +245,17 @@ func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRol
260245
}
261246

262247
if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) { // maybe some single metric was disabled
263-
if md.IsInRecovery && md.PresetMetricsStandby > "" || !md.IsInRecovery && md.PresetMetrics > "" {
264-
continue // no need to check presets for single metric disabling
265-
}
266248
if md.IsInRecovery && len(md.MetricsStandby) > 0 {
267249
currentMetricConfig = md.MetricsStandby
268250
} else {
269251
currentMetricConfig = md.Metrics
270252
}
271-
272253
interval, isMetricActive := currentMetricConfig[metric]
273-
if !isMetricActive || interval <= 0 {
274-
singleMetricDisabled = true
275-
}
254+
singleMetricDisabled = !isMetricActive || interval <= 0
276255
}
277256

278257
if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
279-
logger.WithField("source", db).WithField("metric", metric).Info("stoppin gatherer...")
258+
logger.WithField("source", db).WithField("metric", metric).Info("stopping gatherer...")
280259
cancelFunc()
281260
delete(r.cancelFuncs, dbMetric)
282261
if err := r.SinksWriter.SyncMetric(db, metric, "remove"); err != nil {
@@ -290,7 +269,7 @@ func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRol
290269
}
291270

292271
// metrics.ControlMessage notifies of shutdown + interval change
293-
func (r *Reaper) reapMetricMeasurements(ctx context.Context, mdb *sources.SourceConn, metricName string, interval float64) {
272+
func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceConn, metricName string) {
294273
hostState := make(map[string]map[string]string)
295274
var lastUptimeS int64 = -1 // used for "server restarted" event detection
296275
var lastErrorNotificationTime time.Time
@@ -303,39 +282,39 @@ func (r *Reaper) reapMetricMeasurements(ctx context.Context, mdb *sources.Source
303282
failedFetches := 0
304283
lastDBVersionFetchTime := time.Unix(0, 0) // check DB ver. ev. 5 min
305284

306-
l := r.logger.WithField("source", mdb.Name).WithField("metric", metricName)
285+
l := r.logger.WithField("source", md.Name).WithField("metric", metricName)
307286
if metricName == specialMetricServerLogEventCounts {
308-
metrics.ParseLogs(ctx, mdb, mdb.RealDbname, interval, r.measurementCh) // no return
287+
metrics.ParseLogs(ctx, md, md.RealDbname, md.GetMetricInterval(metricName), r.measurementCh) // no return
309288
return
310289
}
311290

312291
for {
292+
interval := md.GetMetricInterval(metricName)
313293
if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
314-
_, err = mdb.GetMonitoredDatabaseSettings(ctx, false) // in case of errors just ignore metric "disabled" time ranges
315-
if err != nil {
294+
// in case of errors just ignore metric "disabled" time ranges
295+
if _, err = md.GetMonitoredDatabaseSettings(ctx, false); err != nil {
316296
lastDBVersionFetchTime = time.Now()
317297
}
318298

319-
mvp, ok = metricDefs.GetMetricDef(metricName)
320-
if !ok {
299+
if mvp, ok = metricDefs.GetMetricDef(metricName); !ok {
321300
l.Errorf("Could not get metric version properties: %s", metricName)
322301
return
323302
}
324303
}
325304

326305
var metricStoreMessages *metrics.MeasurementEnvelope
327306
mfm := MetricFetchConfig{
328-
DBUniqueName: mdb.Name,
329-
DBUniqueNameOrig: mdb.GetDatabaseName(),
307+
DBUniqueName: md.Name,
308+
DBUniqueNameOrig: md.GetDatabaseName(),
330309
MetricName: metricName,
331-
Source: mdb.Kind,
310+
Source: md.Kind,
332311
Interval: time.Second * time.Duration(interval),
333312
StmtTimeoutOverride: 0,
334313
}
335314

336315
// 1st try local overrides for some metrics if operating in push mode
337316
if r.Metrics.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
338-
metricStoreMessages, err = FetchStatsDirectlyFromOS(ctx, mfm, mdb, mvp)
317+
metricStoreMessages, err = FetchStatsDirectlyFromOS(ctx, mfm, md, mvp)
339318
if err != nil {
340319
l.WithError(err).Errorf("Could not reader metric directly from OS")
341320
}
@@ -368,16 +347,16 @@ func (r *Reaper) reapMetricMeasurements(ctx context.Context, mdb *sources.Source
368347
if ok {
369348
if lastUptimeS != -1 {
370349
if postmasterUptimeS.(int64) < lastUptimeS { // restart (or possibly also failover when host is routed) happened
371-
message := "Detected server restart (or failover) of \"" + mdb.Name + "\""
350+
message := "Detected server restart (or failover) of \"" + md.Name + "\""
372351
l.Warning(message)
373352
detectedChangesSummary := make(metrics.Measurements, 0)
374353
entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
375354
entry["details"] = message
376355
detectedChangesSummary = append(detectedChangesSummary, entry)
377356
envelopes = append(envelopes,
378357
metrics.MeasurementEnvelope{
379-
DBName: mdb.Name,
380-
SourceType: string(mdb.Kind),
358+
DBName: md.Name,
359+
SourceType: string(md.Kind),
381360
MetricName: "object_changes",
382361
Data: detectedChangesSummary,
383362
CustomTags: metricStoreMessages.CustomTags,

0 commit comments

Comments
 (0)