diff --git a/internal/cmdopts/cmdsource.go b/internal/cmdopts/cmdsource.go index 4b93e2b9d6..ff7a8f45a4 100644 --- a/internal/cmdopts/cmdsource.go +++ b/internal/cmdopts/cmdsource.go @@ -99,7 +99,7 @@ func (cmd *SourceResolveCommand) Execute(args []string) error { } } } - conns, err := foundSources.ResolveDatabases() + conns, err := foundSources.ResolveDatabases(nil) if err != nil { return err } diff --git a/internal/reaper/database.go b/internal/reaper/database.go index b550f72501..8edec38480 100644 --- a/internal/reaper/database.go +++ b/internal/reaper/database.go @@ -438,7 +438,7 @@ func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.Sourc return metrics.Measurements{ metrics.Measurement{ metrics.EpochColumnName: time.Now().UnixNano(), - "instance_up": func() int { + specialMetricInstanceUp: func() int { if md.Conn.Ping(ctx) == nil { return 1 } diff --git a/internal/reaper/reaper.go b/internal/reaper/reaper.go index eef427f042..3dadb602e1 100644 --- a/internal/reaper/reaper.go +++ b/internal/reaper/reaper.go @@ -101,7 +101,7 @@ func (r *Reaper) Reap(ctx context.Context) { ctx = log.WithLogger(ctx, srcL) if monitoredSource.Connect(ctx, r.Sources) != nil { - r.WriteInstanceDown(monitoredSource) + r.WriteInstanceDown(monitoredSource.Name) srcL.Warning("could not init connection, retrying on next iteration") continue } @@ -405,9 +405,12 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) { return err } srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool { + // filter out disabled sources and sources with group not in the list of groups to monitor return !s.IsEnabled || len(r.Sources.Groups) > 0 && !slices.Contains(r.Sources.Groups, s.Group) }) - if newSrcs, err = srcs.ResolveDatabases(); err != nil { + + if newSrcs, err = srcs.ResolveDatabases(r.WriteInstanceDown); err != nil { + // discover dtabases for continuous monitoring sources r.logger.WithError(err).Error("could not resolve databases from sources") } @@ -432,9 +435,9 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) { } // WriteInstanceDown writes instance_up = 0 metric to sinks for the given source -func (r *Reaper) WriteInstanceDown(md *sources.SourceConn) { +func (r *Reaper) WriteInstanceDown(name string) { r.measurementCh <- metrics.MeasurementEnvelope{ - DBName: md.Name, + DBName: name, MetricName: specialMetricInstanceUp, Data: metrics.Measurements{metrics.Measurement{ metrics.EpochColumnName: time.Now().UnixNano(), diff --git a/internal/reaper/reaper_test.go b/internal/reaper/reaper_test.go index 05a42535a9..0ff3b4acdb 100644 --- a/internal/reaper/reaper_test.go +++ b/internal/reaper/reaper_test.go @@ -593,5 +593,5 @@ func TestWriteMeasurements(t *testing.T) { SinksWriter: err, }) go r.WriteMeasurements(ctx) - r.WriteInstanceDown(&sources.SourceConn{}) + r.WriteInstanceDown("foo") } diff --git a/internal/sources/resolver.go b/internal/sources/resolver.go index 90c99c681d..69d790e422 100644 --- a/internal/sources/resolver.go +++ b/internal/sources/resolver.go @@ -14,6 +14,7 @@ import ( "net/url" "os" "strings" + "sync" "time" jsoniter "github.com/json-iterator/go" @@ -25,16 +26,32 @@ import ( "go.uber.org/zap" ) -// ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni -func (srcs Sources) ResolveDatabases() (_ SourceConns, err error) { +// ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni. +// Each source is resolved concurrently so that a slow or unreachable source does not block the others. +func (srcs Sources) ResolveDatabases(onError func(string)) (_ SourceConns, err error) { + type result struct { + dbs SourceConns + err error + } + results := make([]result, len(srcs)) + var wg sync.WaitGroup + for i, s := range srcs { + wg.Go(func() { + dbs, e := s.ResolveDatabases() + results[i] = result{dbs, e} + }) + } + wg.Wait() resolvedDbs := make(SourceConns, 0, len(srcs)) - for _, s := range srcs { - if !s.IsEnabled { - continue + for i, res := range results { + if res.err != nil { + if onError != nil { + onError(srcs[i].Name) + } + logger.WithField("source", srcs[i].Name).WithError(res.err).Error("could not resolve databases from source") + err = errors.Join(err, res.err) } - dbs, e := s.ResolveDatabases() - err = errors.Join(err, e) - resolvedDbs = append(resolvedDbs, dbs...) + resolvedDbs = append(resolvedDbs, res.dbs...) } return resolvedDbs, err }