Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cmdopts/cmdsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (cmd *SourceResolveCommand) Execute(args []string) error {
}
}
}
conns, err := foundSources.ResolveDatabases()
conns, err := foundSources.ResolveDatabases(nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/reaper/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.Sourc
return metrics.Measurements{
metrics.Measurement{
metrics.EpochColumnName: time.Now().UnixNano(),
"instance_up": func() int {
specialMetricInstanceUp: func() int {
if md.Conn.Ping(ctx) == nil {
return 1
}
Expand Down
11 changes: 7 additions & 4 deletions internal/reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (r *Reaper) Reap(ctx context.Context) {
ctx = log.WithLogger(ctx, srcL)

if monitoredSource.Connect(ctx, r.Sources) != nil {
r.WriteInstanceDown(monitoredSource)
r.WriteInstanceDown(monitoredSource.Name)
srcL.Warning("could not init connection, retrying on next iteration")
continue
}
Expand Down Expand Up @@ -405,9 +405,12 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) {
return err
}
srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool {
// filter out disabled sources and sources with group not in the list of groups to monitor
return !s.IsEnabled || len(r.Sources.Groups) > 0 && !slices.Contains(r.Sources.Groups, s.Group)
})
if newSrcs, err = srcs.ResolveDatabases(); err != nil {

if newSrcs, err = srcs.ResolveDatabases(r.WriteInstanceDown); err != nil {
// discover dtabases for continuous monitoring sources
r.logger.WithError(err).Error("could not resolve databases from sources")
}

Expand All @@ -432,9 +435,9 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) {
}

// WriteInstanceDown writes instance_up = 0 metric to sinks for the given source
func (r *Reaper) WriteInstanceDown(md *sources.SourceConn) {
func (r *Reaper) WriteInstanceDown(name string) {
r.measurementCh <- metrics.MeasurementEnvelope{
DBName: md.Name,
DBName: name,
MetricName: specialMetricInstanceUp,
Data: metrics.Measurements{metrics.Measurement{
metrics.EpochColumnName: time.Now().UnixNano(),
Expand Down
2 changes: 1 addition & 1 deletion internal/reaper/reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,5 +593,5 @@ func TestWriteMeasurements(t *testing.T) {
SinksWriter: err,
})
go r.WriteMeasurements(ctx)
r.WriteInstanceDown(&sources.SourceConn{})
r.WriteInstanceDown("foo")
}
33 changes: 25 additions & 8 deletions internal/sources/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/url"
"os"
"strings"
"sync"
"time"

jsoniter "github.com/json-iterator/go"
Expand All @@ -25,16 +26,32 @@ import (
"go.uber.org/zap"
)

// ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni
func (srcs Sources) ResolveDatabases() (_ SourceConns, err error) {
// ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni.
// Each source is resolved concurrently so that a slow or unreachable source does not block the others.
func (srcs Sources) ResolveDatabases(onError func(string)) (_ SourceConns, err error) {
type result struct {
dbs SourceConns
err error
}
results := make([]result, len(srcs))
var wg sync.WaitGroup
for i, s := range srcs {
wg.Go(func() {
dbs, e := s.ResolveDatabases()
results[i] = result{dbs, e}
})
}
wg.Wait()
resolvedDbs := make(SourceConns, 0, len(srcs))
for _, s := range srcs {
if !s.IsEnabled {
continue
for i, res := range results {
if res.err != nil {
if onError != nil {
onError(srcs[i].Name)
}
Comment on lines +48 to +50
Copy link
Copy Markdown
Collaborator

@0xgouda 0xgouda Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a concern here that can be reproduced with the following steps:

  1. Define a target that happens to be unreachable and is of the kind postgres-continuous-discovery
  2. pgwatch writes instance_up=0 for the target for a while with dbname = sourceName
  3. The target becomes alive
  4. pgwatch runs for a while and now writes the updated instance_up = 1, but with a new dbname = sourceName + _ + realDbname
  5. The target is down again and its instance_up = 0 is written with dbname = sourceName + _ + realDbname

So the full instance uptime history becomes a bit disconnected, with different dbname[s].

But generally, I think that's the best we can do, just wanted to note this behaviour.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! We could use source instead of dbname. This way we will know for sure at which point that happened

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain more?

Copy link
Copy Markdown
Collaborator Author

@pashagolub pashagolub Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// WriteInstanceDown writes instance_up = 0 metric to sinks for the given source
func (r *Reaper) WriteInstanceDown(md *sources.SourceConn) {
	r.measurementCh <- metrics.MeasurementEnvelope{
		DBName:     md.Name,
		MetricName: specialMetricInstanceUp,
		Data: metrics.Measurements{metrics.Measurement{
			metrics.EpochColumnName: time.Now().UnixNano(),
			"kind": string(md.Kind),
		//  ^^^^^^^^^^^^^^^^^^^^^^^
			specialMetricInstanceUp: 0},
		},
	}
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this way grafana could distinguish regular databases vs all other

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think adding the kind to the measurements will help.

What if we have 2+ postgres-continuous-discovery/patroni sources? Grouping by kind then will be useless.

Copy link
Copy Markdown
Collaborator

@0xgouda 0xgouda Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And again, I guess what we have already is the best we can do, since we won't be able to add any instance identifiers (i.e., sys_id) to the measurements before connecting at least once.

But then, in Grafana, one might have 2 panels like:
instance uptime history grouped by sys_id
and
instance uptime history for instances with unknown sys_id

and they should complement each other, to give the full uptime history view.

logger.WithField("source", srcs[i].Name).WithError(res.err).Error("could not resolve databases from source")
err = errors.Join(err, res.err)
}
dbs, e := s.ResolveDatabases()
err = errors.Join(err, e)
resolvedDbs = append(resolvedDbs, dbs...)
resolvedDbs = append(resolvedDbs, res.dbs...)
}
return resolvedDbs, err
}
Expand Down
Loading