Skip to content

Commit 29446d0

Browse files
committed
[*] extract CreateSourceObjects and ShutdownOldWorkers from Reap
1 parent 334b390 commit 29446d0

3 files changed

Lines changed: 131 additions & 120 deletions

File tree

internal/metrics/cmdopts.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
// CmdOpts specifies metric command-line options
88
type CmdOpts struct {
99
Metrics string `short:"m" long:"metrics" mapstructure:"metrics" description:"File or folder of YAML files with metrics definitions" env:"PW_METRICS"`
10-
CreateHelpers bool `long:"create-helpers" mapstructure:"create-helpers" description:"Create helper database objects from metric definitions" env:"PW_CREATE_HELPERS"`
1110
DirectOSStats bool `long:"direct-os-stats" mapstructure:"direct-os-stats" description:"Extract OS related psutil statistics not via PL/Python wrappers but directly on host" env:"PW_DIRECT_OS_STATS"`
1211
InstanceLevelCacheMaxSeconds int64 `long:"instance-level-cache-max-seconds" mapstructure:"instance-level-cache-max-seconds" description:"Max allowed staleness for instance level metric data shared between DBs of an instance. Set to 0 to disable" env:"PW_INSTANCE_LEVEL_CACHE_MAX_SECONDS" default:"30"`
1312
EmergencyPauseTriggerfile string `long:"emergency-pause-triggerfile" mapstructure:"emergency-pause-triggerfile" description:"When the file exists no metrics will be temporarily fetched / scraped" env:"PW_EMERGENCY_PAUSE_TRIGGERFILE" default:"/tmp/pgwatch-emergency-pause"`

internal/reaper/reaper.go

Lines changed: 130 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,31 @@ import (
1616
)
1717

1818
var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
19-
var metricConfig map[string]float64 // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
19+
var metricsConfig map[string]float64 // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
2020
var metricDefs = NewConcurrentMetricDefs()
2121

2222
// Reaper is the struct that responsible for fetching metrics measurements from the sources and storing them to the sinks
2323
type Reaper struct {
2424
*cmdopts.Options
25-
ready atomic.Bool
26-
measurementCh chan []metrics.MeasurementEnvelope
27-
measurementCache *InstanceMetricCache
28-
logger log.LoggerIface
29-
monitoredSources sources.SourceConns
25+
ready atomic.Bool
26+
measurementCh chan []metrics.MeasurementEnvelope
27+
measurementCache *InstanceMetricCache
28+
logger log.LoggerIface
29+
monitoredSources sources.SourceConns
30+
prevLoopMonitoredDBs sources.SourceConns
31+
cancelFuncs map[string]context.CancelFunc
3032
}
3133

3234
// NewReaper creates a new Reaper instance
3335
func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper, err error) {
3436
r = &Reaper{
35-
Options: opts,
36-
measurementCh: make(chan []metrics.MeasurementEnvelope, 10000),
37-
measurementCache: NewInstanceMetricCache(),
38-
logger: log.GetLogger(ctx),
39-
monitoredSources: make(sources.SourceConns, 0),
37+
Options: opts,
38+
measurementCh: make(chan []metrics.MeasurementEnvelope, 10000),
39+
measurementCache: NewInstanceMetricCache(),
40+
logger: log.GetLogger(ctx),
41+
monitoredSources: make(sources.SourceConns, 0),
42+
prevLoopMonitoredDBs: make(sources.SourceConns, 0),
43+
cancelFuncs: make(map[string]context.CancelFunc), // [db1+metric1]cancel()
4044
}
4145
return r, nil
4246
}
@@ -51,10 +55,7 @@ func (r *Reaper) Ready() bool {
5155
// the metric gatherers. In case of a source or metric definition change, it will
5256
// start or stop the gatherers accordingly.
5357
func (r *Reaper) Reap(ctx context.Context) (err error) {
54-
var prevLoopMonitoredDBs sources.SourceConns // to be able to detect DBs removed from config
55-
cancelFuncs := make(map[string]context.CancelFunc) // [db1+metric1]=chan
5658

57-
mainLoopCount := 0
5859
logger := r.logger
5960

6061
go r.WriteMeasurements(ctx)
@@ -94,7 +95,7 @@ func (r *Reaper) Reap(ctx context.Context) (err error) {
9495
srcL.Info("not added to monitoring due to 'master only' property")
9596
continue
9697
}
97-
metricConfig = func() map[string]float64 {
98+
metricsConfig = func() map[string]float64 {
9899
if len(monitoredSource.Metrics) > 0 {
99100
return monitoredSource.Metrics
100101
}
@@ -105,7 +106,7 @@ func (r *Reaper) Reap(ctx context.Context) (err error) {
105106
}()
106107
hostLastKnownStatusInRecovery[monitoredSource.Name] = monitoredSource.IsInRecovery
107108
if monitoredSource.IsInRecovery {
108-
metricConfig = func() map[string]float64 {
109+
metricsConfig = func() map[string]float64 {
109110
if len(monitoredSource.MetricsStandby) > 0 {
110111
return monitoredSource.MetricsStandby
111112
}
@@ -116,12 +117,7 @@ func (r *Reaper) Reap(ctx context.Context) (err error) {
116117
}()
117118
}
118119

119-
if monitoredSource.IsPostgresSource() && !monitoredSource.IsInRecovery && r.Metrics.CreateHelpers {
120-
srcL.Info("trying to create helper objects if missing")
121-
if err = TryCreateMetricsFetchingHelpers(ctx, monitoredSource); err != nil {
122-
srcL.WithError(err).Warning("failed to create helper functions")
123-
}
124-
}
120+
r.CreateSourceObjects(ctx, srcL, monitoredSource)
125121

126122
if monitoredSource.IsPostgresSource() {
127123
var DBSizeMB int64
@@ -136,53 +132,46 @@ func (r *Reaper) Reap(ctx context.Context) (err error) {
136132
}
137133
}
138134
}
139-
// ver, err := GetMonitoredDatabaseSettings(ctx, monitoredSource, false)
140-
if err == nil { // ok to ignore error, re-tried on next loop
141-
lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
142-
if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
143-
srcL.Info("to be removed from monitoring due to 'master only' property and status change")
144-
hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
145-
continue
146-
} else if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
147-
if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
148-
srcL.Warning("Switching metrics collection to standby config...")
149-
metricConfig = monitoredSource.MetricsStandby
150-
hostLastKnownStatusInRecovery[monitoredSource.Name] = true
151-
} else {
152-
srcL.Warning("Switching metrics collection to primary config...")
153-
metricConfig = monitoredSource.Metrics
154-
hostLastKnownStatusInRecovery[monitoredSource.Name] = false
155-
}
135+
136+
lastKnownStatusInRecovery := hostLastKnownStatusInRecovery[monitoredSource.Name]
137+
if monitoredSource.IsInRecovery && monitoredSource.OnlyIfMaster {
138+
srcL.Info("to be removed from monitoring due to 'master only' property and status change")
139+
hostsToShutDownDueToRoleChange[monitoredSource.Name] = true
140+
continue
141+
} else if lastKnownStatusInRecovery != monitoredSource.IsInRecovery {
142+
if monitoredSource.IsInRecovery && len(monitoredSource.MetricsStandby) > 0 {
143+
srcL.Warning("Switching metrics collection to standby config...")
144+
metricsConfig = monitoredSource.MetricsStandby
145+
hostLastKnownStatusInRecovery[monitoredSource.Name] = true
146+
} else {
147+
srcL.Warning("Switching metrics collection to primary config...")
148+
metricsConfig = monitoredSource.Metrics
149+
hostLastKnownStatusInRecovery[monitoredSource.Name] = false
156150
}
157151
}
158152

159-
if mainLoopCount == 0 && r.Sources.TryCreateListedExtsIfMissing != "" && !monitoredSource.IsInRecovery {
160-
extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
161-
extsCreated := TryCreateMissingExtensions(ctx, monitoredSource.Name, extsToCreate, monitoredSource.Extensions)
162-
srcL.Infof("%d/%d extensions created based on --try-create-listed-exts-if-missing input %v", len(extsCreated), len(extsToCreate), extsCreated)
163-
}
164153
}
165154

166-
for metricName, interval := range metricConfig {
155+
for metricName, interval := range metricsConfig {
167156
metric := metricName
168-
metricDefOk := false
157+
metricDefExists := false
169158
var mvp metrics.Metric
170159

171160
if strings.HasPrefix(metric, recoPrefix) {
172161
metric = recoMetricName
173-
metricDefOk = true
162+
metricDefExists = true
174163
} else {
175-
mvp, metricDefOk = metricDefs.GetMetricDef(metric)
164+
mvp, metricDefExists = metricDefs.GetMetricDef(metric)
176165
}
177166

178167
dbMetric := monitoredSource.Name + dbMetricJoinStr + metric
179-
_, chOk := cancelFuncs[dbMetric]
168+
_, cancelFuncExists := r.cancelFuncs[dbMetric]
180169

181-
if metricDefOk && !chOk { // initialize a new per db/per metric control channel
170+
if metricDefExists && !cancelFuncExists { // initialize a new per db/per metric control channel
182171
if interval > 0 {
183172
srcL.WithField("metric", metric).WithField("interval", interval).Info("starting gatherer")
184173
metricCtx, cancelFunc := context.WithCancel(ctx)
185-
cancelFuncs[dbMetric] = cancelFunc
174+
r.cancelFuncs[dbMetric] = cancelFunc
186175

187176
metricNameForStorage := metricName
188177
if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric {
@@ -193,16 +182,16 @@ func (r *Reaper) Reap(ctx context.Context) (err error) {
193182
srcL.Error(err)
194183
}
195184

196-
go r.reapMetricMeasurements(metricCtx, monitoredSource, metric, metricConfig[metric])
185+
go r.reapMetricMeasurements(metricCtx, monitoredSource, metric, metricsConfig[metric])
197186
}
198-
} else if (!metricDefOk && chOk) || interval <= 0 {
187+
} else if (!metricDefExists && cancelFuncExists) || interval <= 0 {
199188
// metric definition files were recently removed or interval set to zero
200-
if cancelFunc, isOk := cancelFuncs[dbMetric]; isOk {
189+
if cancelFunc, isOk := r.cancelFuncs[dbMetric]; isOk {
201190
cancelFunc()
202191
}
203192
srcL.WithField("metric", metric).Warning("shutting down gatherer...")
204-
delete(cancelFuncs, dbMetric)
205-
} else if !metricDefOk {
193+
delete(r.cancelFuncs, dbMetric)
194+
} else if !metricDefExists {
206195
epoch, ok := lastSQLFetchError.Load(metric)
207196
if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) { // complain only 1x per hour
208197
srcL.WithField("metric", metric).Warning("metric definition not found")
@@ -212,81 +201,103 @@ func (r *Reaper) Reap(ctx context.Context) (err error) {
212201
}
213202
}
214203

215-
if mainLoopCount == 0 {
216-
goto MainLoopSleep
204+
r.ShutdownOldWorkers(ctx, hostsToShutDownDueToRoleChange)
205+
206+
r.prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
207+
select {
208+
case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
209+
logger.Debugf("wake up after %d seconds", r.Sources.Refresh)
210+
case <-ctx.Done():
211+
return
217212
}
213+
}
214+
}
218215

219-
// loop over existing channels and stop workers if DB or metric removed from config
220-
// or state change makes it uninteresting
221-
logger.Debug("checking if any workers need to be shut down...")
222-
for dbMetric, cancelFunc := range cancelFuncs {
223-
var currentMetricConfig map[string]float64
224-
var dbInfo *sources.SourceConn
225-
var ok, dbRemovedFromConfig bool
226-
singleMetricDisabled := false
227-
splits := strings.Split(dbMetric, dbMetricJoinStr)
228-
db := splits[0]
229-
metric := splits[1]
230-
231-
_, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db]
232-
if !wholeDbShutDownDueToRoleChange {
233-
monitoredDbCacheLock.RLock()
234-
dbInfo, ok = monitoredDbCache[db]
235-
monitoredDbCacheLock.RUnlock()
236-
if !ok { // normal removing of DB from config
237-
dbRemovedFromConfig = true
238-
logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
239-
}
240-
}
216+
// CreateSourceObjects creates the extensions and metric helpers for the monitored source
217+
func (r *Reaper) CreateSourceObjects(ctx context.Context, srcL log.LoggerIface, monitoredSource *sources.SourceConn) {
218+
if r.prevLoopMonitoredDBs.GetMonitoredDatabase(monitoredSource.Name) != nil {
219+
return // already created
220+
}
221+
if !monitoredSource.IsPostgresSource() || monitoredSource.IsInRecovery {
222+
return // no need to create anything for non-postgres sources
223+
}
241224

242-
if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) { // maybe some single metric was disabled
243-
MonitoredDatabasesSettingsLock.RLock()
244-
verInfo, ok := MonitoredDatabasesSettings[db]
245-
MonitoredDatabasesSettingsLock.RUnlock()
246-
if !ok {
247-
logger.Warningf("Could not find PG version info for DB %s, skipping shutdown check of metric worker process for %s", db, metric)
248-
continue
249-
}
250-
if verInfo.IsInRecovery && dbInfo.PresetMetricsStandby > "" || !verInfo.IsInRecovery && dbInfo.PresetMetrics > "" {
251-
continue // no need to check presets for single metric disabling
252-
}
253-
if verInfo.IsInRecovery && len(dbInfo.MetricsStandby) > 0 {
254-
currentMetricConfig = dbInfo.MetricsStandby
255-
} else {
256-
currentMetricConfig = dbInfo.Metrics
257-
}
225+
if r.Sources.TryCreateListedExtsIfMissing > "" {
226+
srcL.Info("trying to create extensions if missing")
227+
extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
228+
extsCreated := TryCreateMissingExtensions(ctx, monitoredSource.Name, extsToCreate, monitoredSource.Extensions)
229+
srcL.Infof("%d/%d extensions created based on --try-create-listed-exts-if-missing input %v", len(extsCreated), len(extsToCreate), extsCreated)
230+
}
258231

259-
interval, isMetricActive := currentMetricConfig[metric]
260-
if !isMetricActive || interval <= 0 {
261-
singleMetricDisabled = true
262-
}
263-
}
232+
if r.Sources.CreateHelpers {
233+
srcL.Info("trying to create helper objects if missing")
234+
if err := TryCreateMetricsFetchingHelpers(ctx, monitoredSource); err != nil {
235+
srcL.WithError(err).Warning("failed to create helper functions")
236+
}
237+
}
264238

265-
if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
266-
logger.WithField("source", db).WithField("metric", metric).Info("stoppin gatherer...")
267-
cancelFunc()
268-
delete(cancelFuncs, dbMetric)
269-
if err := r.SinksWriter.SyncMetric(db, metric, "remove"); err != nil {
270-
logger.Error(err)
271-
}
239+
}
240+
241+
func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRoleChange map[string]bool) {
242+
logger := r.logger
243+
// loop over existing channels and stop workers if DB or metric removed from config
244+
// or state change makes it uninteresting
245+
logger.Debug("checking if any workers need to be shut down...")
246+
for dbMetric, cancelFunc := range r.cancelFuncs {
247+
var currentMetricConfig map[string]float64
248+
var dbInfo *sources.SourceConn
249+
var ok, dbRemovedFromConfig bool
250+
singleMetricDisabled := false
251+
splits := strings.Split(dbMetric, dbMetricJoinStr)
252+
db := splits[0]
253+
metric := splits[1]
254+
255+
_, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db]
256+
if !wholeDbShutDownDueToRoleChange {
257+
monitoredDbCacheLock.RLock()
258+
dbInfo, ok = monitoredDbCache[db]
259+
monitoredDbCacheLock.RUnlock()
260+
if !ok { // normal removing of DB from config
261+
dbRemovedFromConfig = true
262+
logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
272263
}
273264
}
274265

275-
// Destroy conn pools and metric writers
276-
CloseResourcesForRemovedMonitoredDBs(r.SinksWriter, r.monitoredSources, prevLoopMonitoredDBs, hostsToShutDownDueToRoleChange)
266+
if !(wholeDbShutDownDueToRoleChange || dbRemovedFromConfig) { // maybe some single metric was disabled
267+
MonitoredDatabasesSettingsLock.RLock()
268+
verInfo, ok := MonitoredDatabasesSettings[db]
269+
MonitoredDatabasesSettingsLock.RUnlock()
270+
if !ok {
271+
logger.Warningf("Could not find PG version info for DB %s, skipping shutdown check of metric worker process for %s", db, metric)
272+
continue
273+
}
274+
if verInfo.IsInRecovery && dbInfo.PresetMetricsStandby > "" || !verInfo.IsInRecovery && dbInfo.PresetMetrics > "" {
275+
continue // no need to check presets for single metric disabling
276+
}
277+
if verInfo.IsInRecovery && len(dbInfo.MetricsStandby) > 0 {
278+
currentMetricConfig = dbInfo.MetricsStandby
279+
} else {
280+
currentMetricConfig = dbInfo.Metrics
281+
}
277282

278-
MainLoopSleep:
279-
mainLoopCount++
280-
prevLoopMonitoredDBs = slices.Clone(r.monitoredSources)
283+
interval, isMetricActive := currentMetricConfig[metric]
284+
if !isMetricActive || interval <= 0 {
285+
singleMetricDisabled = true
286+
}
287+
}
281288

282-
logger.Debugf("main sleeping %ds...", r.Sources.Refresh)
283-
select {
284-
case <-time.After(time.Second * time.Duration(r.Sources.Refresh)):
285-
// pass
286-
case <-ctx.Done():
287-
return
289+
if ctx.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
290+
logger.WithField("source", db).WithField("metric", metric).Info("stoppin gatherer...")
291+
cancelFunc()
292+
delete(r.cancelFuncs, dbMetric)
293+
if err := r.SinksWriter.SyncMetric(db, metric, "remove"); err != nil {
294+
logger.Error(err)
295+
}
288296
}
289297
}
298+
299+
// Destroy conn pools and metric writers
300+
CloseResourcesForRemovedMonitoredDBs(r.SinksWriter, r.monitoredSources, r.prevLoopMonitoredDBs, hostsToShutDownDueToRoleChange)
290301
}
291302

292303
// metrics.ControlMessage notifies of shutdown + interval change

internal/sources/cmdopts.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ type CmdOpts struct {
88
MinDbSizeMB int64 `long:"min-db-size-mb" mapstructure:"min-db-size-mb" description:"Smaller size DBs will be ignored and not monitored until they reach the threshold." env:"PW_MIN_DB_SIZE_MB" default:"0"`
99
MaxParallelConnectionsPerDb int `long:"max-parallel-connections-per-db" mapstructure:"max-parallel-connections-per-db" description:"Max parallel metric fetches per DB. Note the multiplication effect on multi-DB instances" env:"PW_MAX_PARALLEL_CONNECTIONS_PER_DB" default:"4"`
1010
TryCreateListedExtsIfMissing string `long:"try-create-listed-exts-if-missing" mapstructure:"try-create-listed-exts-if-missing" description:"Try creating the listed extensions (comma sep.) on first connect for all monitored DBs when missing. Main usage - pg_stat_statements" env:"PW_TRY_CREATE_LISTED_EXTS_IF_MISSING" default:""`
11+
CreateHelpers bool `long:"create-helpers" mapstructure:"create-helpers" description:"Create helper database objects from metric definitions" env:"PW_CREATE_HELPERS"`
1112
}

0 commit comments

Comments
 (0)