Skip to content

Commit 35961f8

Browse files
authored
[*] fix hard coded epoch_ns measurement field name (#712)
1 parent 92c8411 commit 35961f8

11 files changed

Lines changed: 157 additions & 182 deletions

File tree

internal/metrics/logparse.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, erro
8585

8686
// 1. add zero counts for severity levels that didn't have any occurrences in the log
8787
func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]int64, mdb *sources.SourceConn) []MeasurementEnvelope {
88-
allSeverityCounts := make(Measurement)
88+
allSeverityCounts := NewMeasurement(time.Now().UnixNano())
8989
for _, s := range PgSeverities {
9090
parsedCount, ok := eventCounts[s]
9191
if ok {
@@ -100,7 +100,6 @@ func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]i
100100
allSeverityCounts[strings.ToLower(s)+"_total"] = 0
101101
}
102102
}
103-
allSeverityCounts["epoch_ns"] = time.Now().UnixNano()
104103
return []MeasurementEnvelope{{
105104
DBName: mdb.Name,
106105
SourceType: string(mdb.Kind),

internal/metrics/psutil/psutil.go

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99
"time"
1010

11+
"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
1112
"github.com/shirou/gopsutil/v4/cpu"
1213
"github.com/shirou/gopsutil/v4/disk"
1314
"github.com/shirou/gopsutil/v4/load"
@@ -60,8 +61,7 @@ func GetGoPsutilCPU(interval time.Duration) ([]map[string]any, error) {
6061
return nil, err
6162
}
6263

63-
retMap := make(map[string]any)
64-
retMap["epoch_ns"] = time.Now().UnixNano()
64+
retMap := metrics.NewMeasurement(time.Now().UnixNano())
6565
retMap["cpu_utilization"] = math.Round(100*goPsutilCalcCPUUtilization(prevTimeStat, curCallStats[0])) / 100
6666
retMap["load_1m_norm"] = math.Round(100*la.Load1/float64(cpus)) / 100
6767
retMap["load_1m"] = math.Round(100*la.Load1) / 100
@@ -83,8 +83,7 @@ func GetGoPsutilMem() ([]map[string]any, error) {
8383
return nil, err
8484
}
8585

86-
retMap := make(map[string]any)
87-
retMap["epoch_ns"] = time.Now().UnixNano()
86+
retMap := metrics.NewMeasurement(time.Now().UnixNano())
8887
retMap["total"] = int64(vm.Total)
8988
retMap["used"] = int64(vm.Used)
9089
retMap["free"] = int64(vm.Free)
@@ -105,10 +104,9 @@ func GetGoPsutilDiskTotals() ([]map[string]any, error) {
105104
return nil, err
106105
}
107106

108-
retMap := make(map[string]any)
107+
retMap := metrics.NewMeasurement(time.Now().UnixNano())
109108
var readBytes, writeBytes, reads, writes float64
110109

111-
retMap["epoch_ns"] = time.Now().UnixNano()
112110
for _, v := range d { // summarize all disk devices
113111
readBytes += float64(v.ReadBytes) // datatype float is just an oversight in the original psutil helper
114112
// but can't change it without causing problems on storage level (InfluxDB)
@@ -130,8 +128,7 @@ func GetLoadAvgLocal() ([]map[string]any, error) {
130128
return nil, err
131129
}
132130

133-
row := make(map[string]any)
134-
row["epoch_ns"] = time.Now().UnixNano()
131+
row := metrics.NewMeasurement(time.Now().UnixNano())
135132
row["load_1min"] = la.Load1
136133
row["load_5min"] = la.Load5
137134
row["load_15min"] = la.Load15
@@ -157,8 +154,7 @@ func GetGoPsutilDiskPG(DataDirs, TblspaceDirs []map[string]any) ([]map[string]an
157154

158155
retRows := make([]map[string]any, 0)
159156
epochNs := time.Now().UnixNano()
160-
dd := make(map[string]any)
161-
dd["epoch_ns"] = epochNs
157+
dd := metrics.NewMeasurement(epochNs)
162158
dd["tag_dir_or_tablespace"] = "data_directory"
163159
dd["tag_path"] = dataDirPath
164160
dd["total"] = float64(ddUsage.Total)
@@ -182,13 +178,12 @@ func GetGoPsutilDiskPG(DataDirs, TblspaceDirs []map[string]any) ([]map[string]an
182178
return nil, err
183179
}
184180
if ldDevice != ddDevice { // no point to report same data in case of single folder configuration
185-
ld := make(map[string]any)
181+
ld := metrics.NewMeasurement(epochNs)
186182
ldUsage, err := disk.Usage(logDirPath)
187183
if err != nil {
188184
return nil, err
189185
}
190186

191-
ld["epoch_ns"] = epochNs
192187
ld["tag_dir_or_tablespace"] = "log_directory"
193188
ld["tag_path"] = logDirPath
194189
ld["total"] = float64(ldUsage.Total)
@@ -218,8 +213,7 @@ func GetGoPsutilDiskPG(DataDirs, TblspaceDirs []map[string]any) ([]map[string]an
218213
return nil, err
219214
}
220215

221-
wd := make(map[string]any)
222-
wd["epoch_ns"] = epochNs
216+
wd := metrics.NewMeasurement(epochNs)
223217
wd["tag_dir_or_tablespace"] = "pg_wal"
224218
wd["tag_path"] = walDirPath
225219
wd["total"] = float64(walUsage.Total)
@@ -248,8 +242,7 @@ func GetGoPsutilDiskPG(DataDirs, TblspaceDirs []map[string]any) ([]map[string]an
248242
if err != nil {
249243
return nil, err
250244
}
251-
ts := make(map[string]any)
252-
ts["epoch_ns"] = epochNs
245+
ts := metrics.NewMeasurement(epochNs)
253246
ts["tag_dir_or_tablespace"] = tsName
254247
ts["tag_path"] = tsPath
255248
ts["total"] = float64(tsUsage.Total)

internal/metrics/psutil/psutil_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"testing"
55
"time"
66

7+
"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
8+
79
"github.com/stretchr/testify/assert"
810
"golang.org/x/exp/maps"
911
)
@@ -17,7 +19,7 @@ func TestGetGoPsutilCPU(t *testing.T) {
1719
a.NotEmpty(result)
1820

1921
// Check if the result contains the expected keys
20-
expectedKeys := []string{"epoch_ns", "cpu_utilization", "load_1m_norm", "load_1m", "load_5m_norm", "load_5m", "user", "system", "idle", "iowait", "irqs", "other"}
22+
expectedKeys := []string{metrics.EpochColumnName, "cpu_utilization", "load_1m_norm", "load_1m", "load_5m_norm", "load_5m", "user", "system", "idle", "iowait", "irqs", "other"}
2123
resultKeys := maps.Keys(result[0])
2224
a.ElementsMatch(resultKeys, expectedKeys)
2325

@@ -36,7 +38,7 @@ func TestGetGoPsutilMem(t *testing.T) {
3638
a.NotEmpty(result)
3739

3840
// Check if the result contains the expected keys
39-
expectedKeys := []string{"epoch_ns", "total", "used", "free", "buff_cache", "available", "percent", "swap_total", "swap_used", "swap_free", "swap_percent"}
41+
expectedKeys := []string{metrics.EpochColumnName, "total", "used", "free", "buff_cache", "available", "percent", "swap_total", "swap_used", "swap_free", "swap_percent"}
4042
resultKeys := maps.Keys(result[0])
4143
a.ElementsMatch(resultKeys, expectedKeys)
4244
}
@@ -52,7 +54,7 @@ func TestGetGoPsutilDiskTotals(t *testing.T) {
5254
a.NotEmpty(result)
5355

5456
// Check if the result contains the expected keys
55-
expectedKeys := []string{"epoch_ns", "read_bytes", "write_bytes", "read_count", "write_count"}
57+
expectedKeys := []string{metrics.EpochColumnName, "read_bytes", "write_bytes", "read_count", "write_count"}
5658
resultKeys := maps.Keys(result[0])
5759
a.ElementsMatch(resultKeys, expectedKeys)
5860
}
@@ -66,7 +68,7 @@ func TestGetLoadAvgLocal(t *testing.T) {
6668
a.NotEmpty(result)
6769

6870
// Check if the result contains the expected keys
69-
expectedKeys := []string{"epoch_ns", "load_1min", "load_5min", "load_15min"}
71+
expectedKeys := []string{metrics.EpochColumnName, "load_1min", "load_5min", "load_15min"}
7072
resultKeys := maps.Keys(result[0])
7173
a.ElementsMatch(resultKeys, expectedKeys)
7274
}

internal/metrics/types.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package metrics
22

3+
import "time"
4+
35
type (
46
ExtensionInfo struct {
57
ExtName string `yaml:"ext_name"`
@@ -65,9 +67,37 @@ type Preset struct {
6567
Metrics map[string]float64
6668
}
6769

70+
const (
71+
EpochColumnName string = "epoch_ns" // this column (epoch in nanoseconds) is expected in every metric query
72+
TagPrefix string = "tag_"
73+
)
74+
6875
type Measurement map[string]any
76+
77+
func NewMeasurement(epoch int64) Measurement {
78+
m := make(Measurement)
79+
m[EpochColumnName] = epoch
80+
return m
81+
}
82+
83+
func (m Measurement) GetEpoch() int64 {
84+
if v, ok := m[EpochColumnName]; ok {
85+
if epoch, ok := v.(int64); ok {
86+
return epoch
87+
}
88+
}
89+
return time.Now().UnixNano()
90+
}
91+
6992
type Measurements []map[string]any
7093

94+
func (m Measurements) GetEpoch() int64 {
95+
if len(m) == 0 {
96+
return time.Now().UnixNano()
97+
}
98+
return Measurement(m[0]).GetEpoch()
99+
}
100+
71101
type MeasurementEnvelope struct {
72102
DBName string
73103
SourceType string

internal/reaper/cache.go

Lines changed: 36 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
package reaper
22

33
import (
4-
"context"
54
"fmt"
5+
"maps"
66
"sync"
77
"time"
88

99
"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
1010
"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
11-
"github.com/sirupsen/logrus"
1211
)
1312

1413
var monitoredDbCache map[string]*sources.SourceConn
@@ -53,53 +52,10 @@ func GetMonitoredDatabaseByUniqueName(name string) (*sources.SourceConn, error)
5352
return md, nil
5453
}
5554

56-
// LoadMetrics loads metric definitions from the reader
57-
func (r *Reaper) LoadMetrics() (err error) {
58-
var newDefs *metrics.Metrics
59-
if newDefs, err = r.MetricsReaderWriter.GetMetrics(); err != nil {
60-
return
61-
}
62-
metricDefs.Assign(newDefs)
63-
r.logger.
64-
WithField("metrics", len(newDefs.MetricDefs)).
65-
WithField("presets", len(newDefs.PresetDefs)).
66-
Log(func() logrus.Level {
67-
if len(newDefs.PresetDefs)*len(newDefs.MetricDefs) == 0 {
68-
return logrus.WarnLevel
69-
}
70-
return logrus.InfoLevel
71-
}(), "metrics and presets refreshed")
72-
return
73-
}
74-
75-
// LoadSources loads sources from the reader
76-
func (r *Reaper) LoadSources() (err error) {
77-
if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
78-
r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
79-
monitoredSources = make([]*sources.SourceConn, 0)
80-
return nil
81-
}
82-
if monitoredSources, err = monitoredSources.SyncFromReader(r.SourcesReaderWriter); err != nil {
83-
return err
84-
}
85-
r.logger.WithField("sources", len(monitoredSources)).Info("sources refreshed")
86-
return nil
87-
}
88-
89-
// WriteMeasurements() writes the metrics to the sinks
90-
func (r *Reaper) WriteMeasurements(ctx context.Context) {
91-
var err error
92-
for {
93-
select {
94-
case <-ctx.Done():
95-
return
96-
case msg := <-r.measurementCh:
97-
if err = r.SinksWriter.Write(msg); err != nil {
98-
r.logger.Error(err)
99-
}
100-
}
101-
}
102-
}
55+
var instanceMetricCache = make(map[string](metrics.Measurements)) // [dbUnique+metric]lastly_fetched_data
56+
var instanceMetricCacheLock = sync.RWMutex{}
57+
var instanceMetricCacheTimestamp = make(map[string]time.Time) // [dbUnique+metric]last_fetch_time
58+
var instanceMetricCacheTimestampLock = sync.RWMutex{}
10359

10460
func GetFromInstanceCacheIfNotOlderThanSeconds(msg MetricFetchConfig, maxAgeSeconds int64) metrics.Measurements {
10561
var clonedData metrics.Measurements
@@ -121,3 +77,34 @@ func GetFromInstanceCacheIfNotOlderThanSeconds(msg MetricFetchConfig, maxAgeSeco
12177

12278
return clonedData
12379
}
80+
81+
func IsCacheableMetric(msg MetricFetchConfig, mvp metrics.Metric) bool {
82+
switch msg.Source {
83+
case sources.SourcePostgresContinuous, sources.SourcePatroniContinuous:
84+
return false
85+
default:
86+
return mvp.IsInstanceLevel
87+
}
88+
}
89+
90+
func PutToInstanceCache(msg MetricFetchConfig, data metrics.Measurements) {
91+
if len(data) == 0 {
92+
return
93+
}
94+
dataCopy := deepCopyMetricData(data)
95+
instanceMetricCacheLock.Lock()
96+
instanceMetricCache[msg.DBUniqueNameOrig+msg.MetricName] = dataCopy
97+
instanceMetricCacheLock.Unlock()
98+
99+
instanceMetricCacheTimestampLock.Lock()
100+
instanceMetricCacheTimestamp[msg.DBUniqueNameOrig+msg.MetricName] = time.Now()
101+
instanceMetricCacheTimestampLock.Unlock()
102+
}
103+
104+
func deepCopyMetricData(data metrics.Measurements) metrics.Measurements {
105+
newData := make(metrics.Measurements, len(data))
106+
for i, dr := range data {
107+
newData[i] = maps.Clone(dr)
108+
}
109+
return newData
110+
}

internal/reaper/database.go

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -289,15 +289,10 @@ func DetectSprocChanges(ctx context.Context, dbUnique string, vme MonitoredDatab
289289
if !ok {
290290
splits := strings.Split(sprocIdent, dbMetricJoinStr)
291291
log.GetLogger(ctx).Info("detected delete of sproc:", splits[0], ", oid:", splits[1])
292-
influxEntry := make(metrics.Measurement)
292+
influxEntry := metrics.NewMeasurement(data.GetEpoch())
293293
influxEntry["event"] = "drop"
294294
influxEntry["tag_sproc"] = splits[0]
295295
influxEntry["tag_oid"] = splits[1]
296-
if len(data) > 0 {
297-
influxEntry["epoch_ns"] = data[0]["epoch_ns"]
298-
} else {
299-
influxEntry["epoch_ns"] = time.Now().UnixNano()
300-
}
301296
detectedChanges = append(detectedChanges, influxEntry)
302297
deletedSProcs = append(deletedSProcs, sprocIdent)
303298
changeCounts.Dropped++
@@ -373,14 +368,9 @@ func DetectTableChanges(ctx context.Context, dbUnique string, vme MonitoredDatab
373368
_, ok := currentTableMap[table]
374369
if !ok {
375370
log.GetLogger(ctx).Info("detected drop of table:", table)
376-
influxEntry := make(metrics.Measurement)
371+
influxEntry := metrics.NewMeasurement(data.GetEpoch())
377372
influxEntry["event"] = "drop"
378373
influxEntry["tag_table"] = table
379-
if len(data) > 0 {
380-
influxEntry["epoch_ns"] = data[0]["epoch_ns"]
381-
} else {
382-
influxEntry["epoch_ns"] = time.Now().UnixNano()
383-
}
384374
detectedChanges = append(detectedChanges, influxEntry)
385375
deletedTables = append(deletedTables, table)
386376
changeCounts.Dropped++
@@ -456,14 +446,9 @@ func DetectIndexChanges(ctx context.Context, dbUnique string, vme MonitoredDatab
456446
_, ok := currentIndexMap[indexName]
457447
if !ok {
458448
log.GetLogger(ctx).Info("detected drop of index_name:", indexName)
459-
influxEntry := make(metrics.Measurement)
449+
influxEntry := metrics.NewMeasurement(data.GetEpoch())
460450
influxEntry["event"] = "drop"
461451
influxEntry["tag_index"] = indexName
462-
if len(data) > 0 {
463-
influxEntry["epoch_ns"] = data[0]["epoch_ns"]
464-
} else {
465-
influxEntry["epoch_ns"] = time.Now().UnixNano()
466-
}
467452
detectedChanges = append(detectedChanges, influxEntry)
468453
deletedIndexes = append(deletedIndexes, indexName)
469454
changeCounts.Dropped++
@@ -531,12 +516,7 @@ func DetectPrivilegeChanges(ctx context.Context, dbUnique string, vme MonitoredD
531516
splits := strings.Split(objPrevRun, "#:#")
532517
log.GetLogger(ctx).Infof("[%s][%s] detected removed object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
533518
dbUnique, specialMetricChangeEvents, splits[1], splits[0], splits[2], splits[3])
534-
revokeEntry := make(metrics.Measurement)
535-
if epochNs, ok := data[0]["epoch_ns"]; ok {
536-
revokeEntry["epoch_ns"] = epochNs
537-
} else {
538-
revokeEntry["epoch_ns"] = time.Now().UnixNano()
539-
}
519+
revokeEntry := metrics.NewMeasurement(data.GetEpoch())
540520
revokeEntry["object_type"] = splits[0]
541521
revokeEntry["tag_role"] = splits[1]
542522
revokeEntry["tag_object"] = splits[2]
@@ -658,9 +638,8 @@ func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique s
658638
message = "Detected changes for \"" + dbUnique + "\" [Created/Altered/Dropped]:" + message
659639
log.GetLogger(ctx).Info(message)
660640
detectedChangesSummary := make(metrics.Measurements, 0)
661-
influxEntry := make(metrics.Measurement)
641+
influxEntry := metrics.NewMeasurement(time.Now().UnixNano())
662642
influxEntry["details"] = message
663-
influxEntry["epoch_ns"] = time.Now().UnixNano()
664643
detectedChangesSummary = append(detectedChangesSummary, influxEntry)
665644
md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
666645
storageCh <- []metrics.MeasurementEnvelope{{DBName: dbUnique,
@@ -689,8 +668,7 @@ func FetchMetricsPgpool(ctx context.Context, msg MetricFetchConfig, vme Monitore
689668
}
690669

691670
for _, row := range data {
692-
retRow := make(metrics.Measurement)
693-
retRow[epochColumnName] = epochNs
671+
retRow := metrics.NewMeasurement(epochNs)
694672
for k, v := range row {
695673
vs := string(v.([]byte))
696674
// need 1 tag so that Influx would not merge rows

0 commit comments

Comments
 (0)