From 89cb5eb0af5ad25f98555cbabdbbfab29726d368 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Fri, 24 Apr 2026 14:26:26 +0200 Subject: [PATCH 1/4] [+] implement parallel source discovery Improves dead-source handling with parallel resolution and instance_up=0 on discovery failure. `Sources.ResolveDatabases()` previously resolved each source sequentially. A single slow or unresponsive source (e.g. a continuous-discovery endpoint behind a firewall) would block discovery of all subsequent sources for the full connection timeout duration. Sources are now resolved concurrently using `sync.WaitGroup.Go()`. Results are collected into a pre-allocated indexed slice to preserve deterministic ordering. Per-source error logging with source name is included in the resolver itself. When a `SourcePostgresContinuous` or `SourcePatroni` source fails to resolve any databases, `LoadSources()` now emits `instance_up=0` to the configured sinks. This makes the failure visible in dashboards and alerting, consistent with how unreachable directly-monitored sources are handled. --- internal/reaper/reaper.go | 13 ++++++++++++- internal/sources/resolver.go | 28 +++++++++++++++++++++------- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/internal/reaper/reaper.go b/internal/reaper/reaper.go index eef427f042..e179d0eae1 100644 --- a/internal/reaper/reaper.go +++ b/internal/reaper/reaper.go @@ -407,8 +407,19 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) { srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool { return !s.IsEnabled || len(r.Sources.Groups) > 0 && !slices.Contains(r.Sources.Groups, s.Group) }) - if newSrcs, err = srcs.ResolveDatabases(); err != nil { + newSrcs, err = srcs.ResolveDatabases() + if err != nil { r.logger.WithError(err).Error("could not resolve databases from sources") + for _, s := range srcs { + if s.Kind != sources.SourcePostgresContinuous && s.Kind != sources.SourcePatroni { + continue + } + if !slices.ContainsFunc(newSrcs, func(sc *sources.SourceConn) bool { + return sc.Name == s.Name || strings.HasPrefix(sc.Name, s.Name+"_") + }) { + r.WriteInstanceDown(sources.NewSourceConn(s)) + } + } } for i, newMD := range newSrcs { diff --git a/internal/sources/resolver.go b/internal/sources/resolver.go index 90c99c681d..5ff3440d1f 100644 --- a/internal/sources/resolver.go +++ b/internal/sources/resolver.go @@ -14,6 +14,7 @@ import ( "net/url" "os" "strings" + "sync" "time" jsoniter "github.com/json-iterator/go" @@ -25,16 +26,29 @@ import ( "go.uber.org/zap" ) -// ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni +// ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni. +// Each source is resolved concurrently so that a slow or unreachable source does not block the others. func (srcs Sources) ResolveDatabases() (_ SourceConns, err error) { + type result struct { + dbs SourceConns + err error + } + results := make([]result, len(srcs)) + var wg sync.WaitGroup + for i, s := range srcs { + wg.Go(func() { + dbs, e := s.ResolveDatabases() + results[i] = result{dbs, e} + }) + } + wg.Wait() resolvedDbs := make(SourceConns, 0, len(srcs)) - for _, s := range srcs { - if !s.IsEnabled { - continue + for i, res := range results { + if res.err != nil { + logger.WithField("source", srcs[i].Name).WithError(res.err).Error("could not resolve databases from source") + err = errors.Join(err, res.err) } - dbs, e := s.ResolveDatabases() - err = errors.Join(err, e) - resolvedDbs = append(resolvedDbs, dbs...) + resolvedDbs = append(resolvedDbs, res.dbs...) } return resolvedDbs, err } From 99cc08f5a0508d465166bfe6d484a0237c508c99 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Fri, 24 Apr 2026 20:17:46 +0200 Subject: [PATCH 2/4] `reaper.WriteInstanceDown()` accepts name as an argument --- internal/reaper/reaper.go | 9 +++++---- internal/reaper/reaper_test.go | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/reaper/reaper.go b/internal/reaper/reaper.go index e179d0eae1..a982818f2e 100644 --- a/internal/reaper/reaper.go +++ b/internal/reaper/reaper.go @@ -101,7 +101,7 @@ func (r *Reaper) Reap(ctx context.Context) { ctx = log.WithLogger(ctx, srcL) if monitoredSource.Connect(ctx, r.Sources) != nil { - r.WriteInstanceDown(monitoredSource) + r.WriteInstanceDown(monitoredSource.Name) srcL.Warning("could not init connection, retrying on next iteration") continue } @@ -405,6 +405,7 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) { return err } srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool { + // filter out disabled sources and sources with group not in the list of groups to monitor return !s.IsEnabled || len(r.Sources.Groups) > 0 && !slices.Contains(r.Sources.Groups, s.Group) }) newSrcs, err = srcs.ResolveDatabases() @@ -417,7 +418,7 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) { if !slices.ContainsFunc(newSrcs, func(sc *sources.SourceConn) bool { return sc.Name == s.Name || strings.HasPrefix(sc.Name, s.Name+"_") }) { - r.WriteInstanceDown(sources.NewSourceConn(s)) + r.WriteInstanceDown(s.Name) } } } @@ -443,9 +444,9 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) { } // WriteInstanceDown writes instance_up = 0 metric to sinks for the given source -func (r *Reaper) WriteInstanceDown(md *sources.SourceConn) { +func (r *Reaper) WriteInstanceDown(name string) { r.measurementCh <- metrics.MeasurementEnvelope{ - DBName: md.Name, + DBName: name, MetricName: specialMetricInstanceUp, Data: metrics.Measurements{metrics.Measurement{ metrics.EpochColumnName: time.Now().UnixNano(), diff --git a/internal/reaper/reaper_test.go b/internal/reaper/reaper_test.go index 05a42535a9..0ff3b4acdb 100644 --- a/internal/reaper/reaper_test.go +++ b/internal/reaper/reaper_test.go @@ -593,5 +593,5 @@ func TestWriteMeasurements(t *testing.T) { SinksWriter: err, }) go r.WriteMeasurements(ctx) - r.WriteInstanceDown(&sources.SourceConn{}) + r.WriteInstanceDown("foo") } From a47b7fb6fd9d30b6d0f69492884239a96ba5fd54 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Fri, 24 Apr 2026 20:35:33 +0200 Subject: [PATCH 3/4] use `on_error` callback in `Sources.ResolveDatabases()` --- internal/cmdopts/cmdsource.go | 2 +- internal/reaper/reaper.go | 15 +++------------ internal/sources/resolver.go | 5 ++++- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/internal/cmdopts/cmdsource.go b/internal/cmdopts/cmdsource.go index 4b93e2b9d6..ff7a8f45a4 100644 --- a/internal/cmdopts/cmdsource.go +++ b/internal/cmdopts/cmdsource.go @@ -99,7 +99,7 @@ func (cmd *SourceResolveCommand) Execute(args []string) error { } } } - conns, err := foundSources.ResolveDatabases() + conns, err := foundSources.ResolveDatabases(nil) if err != nil { return err } diff --git a/internal/reaper/reaper.go b/internal/reaper/reaper.go index a982818f2e..3dadb602e1 100644 --- a/internal/reaper/reaper.go +++ b/internal/reaper/reaper.go @@ -408,19 +408,10 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) { // filter out disabled sources and sources with group not in the list of groups to monitor return !s.IsEnabled || len(r.Sources.Groups) > 0 && !slices.Contains(r.Sources.Groups, s.Group) }) - newSrcs, err = srcs.ResolveDatabases() - if err != nil { + + if newSrcs, err = srcs.ResolveDatabases(r.WriteInstanceDown); err != nil { + // discover dtabases for continuous monitoring sources r.logger.WithError(err).Error("could not resolve databases from sources") - for _, s := range srcs { - if s.Kind != sources.SourcePostgresContinuous && s.Kind != sources.SourcePatroni { - continue - } - if !slices.ContainsFunc(newSrcs, func(sc *sources.SourceConn) bool { - return sc.Name == s.Name || strings.HasPrefix(sc.Name, s.Name+"_") - }) { - r.WriteInstanceDown(s.Name) - } - } } for i, newMD := range newSrcs { diff --git a/internal/sources/resolver.go b/internal/sources/resolver.go index 5ff3440d1f..69d790e422 100644 --- a/internal/sources/resolver.go +++ b/internal/sources/resolver.go @@ -28,7 +28,7 @@ import ( // ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni. // Each source is resolved concurrently so that a slow or unreachable source does not block the others. -func (srcs Sources) ResolveDatabases() (_ SourceConns, err error) { +func (srcs Sources) ResolveDatabases(onError func(string)) (_ SourceConns, err error) { type result struct { dbs SourceConns err error @@ -45,6 +45,9 @@ func (srcs Sources) ResolveDatabases() (_ SourceConns, err error) { resolvedDbs := make(SourceConns, 0, len(srcs)) for i, res := range results { if res.err != nil { + if onError != nil { + onError(srcs[i].Name) + } logger.WithField("source", srcs[i].Name).WithError(res.err).Error("could not resolve databases from source") err = errors.Join(err, res.err) } From 791941d0d1788fe36904db6c9acd868549dc66da Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Wed, 29 Apr 2026 18:09:30 +0200 Subject: [PATCH 4/4] use const instead of hard-coded value --- internal/reaper/database.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/reaper/database.go b/internal/reaper/database.go index b550f72501..8edec38480 100644 --- a/internal/reaper/database.go +++ b/internal/reaper/database.go @@ -438,7 +438,7 @@ func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.Sourc return metrics.Measurements{ metrics.Measurement{ metrics.EpochColumnName: time.Now().UnixNano(), - "instance_up": func() int { + specialMetricInstanceUp: func() int { if md.Conn.Ping(ctx) == nil { return 1 }