Skip to content

Commit 443f7ba

Browse files
authored
[-] fix Pgpool-II support, closes #730 (#740)
* [-] fix Pgpool-II support, closes #730 - split `pgpool_stats` into two separate metrics - update documentation - update Grafana dashboards - add `metrics.RowToMeasurement()` - use `pgx.QueryExecModeSimpleProtocol` for non-Postgres connections - remove `FetchMetricsPgpool()` and use standard fetching * [-] fix revive receiver-naming warning * [-] fix staticcheck QF1003 warning: could use tagged switch * [-] fix tests
1 parent 913c1f0 commit 443f7ba

10 files changed

Lines changed: 72 additions & 144 deletions

File tree

docs/reference/advanced_features.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ standard way a bit tricky. But luckily Patroni cluster members
1717
information is stored in a DCS (Distributed Consensus Store), like
1818
*etcd*, so it can be fetched from there periodically.
1919

20-
When 'patroni' is selected as a **source type** then the usual Postgres
21-
host/port fields should be left empty ("dbname" can still be filled if only a
20+
When 'patroni' is selected as a **source type** then the usual Postgres
21+
host/port fields should be left empty ("dbname" can still be filled if only a
2222
specific single database is
2323
to be monitored) and instead "Host config" JSON field should be filled
2424
with DCS address, type and scope (cluster name) information. A sample
@@ -71,15 +71,15 @@ to be specified manually under "Host config" as seen for example
7171

7272
On Postgres side (on the monitored DB)
7373

74-
```
74+
```ini
7575
# Debian / Ubuntu default log_line_prefix actually
7676
log_line_prefix = '%m [%p] %q%u@%d '
7777
```
7878

7979
YAML config (recommended when "pushing" metrics from DB nodes to a
8080
central metrics DB)
8181

82-
```
82+
```yaml
8383
## logs_glob_path is only needed if the monitoring user is cannot auto-detect it (i.e. not a superuser / pg_monitor role)
8484
# logs_glob_path:
8585
logs_match_regex: '^(?P<log_time>.*) \[(?P<process_id>\d+)\] (?P<user_name>.*)@(?P<database_name>.*?) (?P<error_severity>.*?): '
@@ -113,7 +113,8 @@ Quite similar to PgBouncer, also Pgpool offers some statistics on pool
113113
performance and status, which might be of interest especially if using
114114
the load balancing features. To enable it choose the according *DB
115115
Type*, provide connection info to the pooler port and make sure the
116-
**pgpool_stats** metric / preset config is selected for the host.
116+
**pgpool_stats** and **pgpool_processes** metrics or **pgpool** preset config
117+
is selected for the host.
117118

118119
The built-in Grafana dashboard for Pgpool data looks something like
119120
that:

grafana/postgres/v10/pgpool-stats.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@
8181
"group": [],
8282
"metricColumn": "none",
8383
"rawQuery": true,
84-
"rawSql": "select\n dbname,\n tag_data->>'node_id' as node_id,\n data->>'role' as role,\n data->>'status' as status,\n case when data->>'status' = 'up' then 1 else 0 end as status_num,\n data->>'hostname' as hostname,\n data->>'port' as port,\n data->>'lb_weight' as lb_weight,\n data->>'load_balance_node' as load_balance_node,\n data->>'replication_delay' as replication_delay,\n data->>'select_cnt' as select_cnt,\n data->>'processes_total' as processes_total,\n data->>'processes_active' as processes_active, \n data->>'last_status_change' as last_status_change, \n date_trunc('second', time::timestamp)::time as time\nfrom\n pgpool_stats\nwhere\n time = (select max(time) from pgpool_stats where dbname in ($dbname) and $__timeFilter(time) group by dbname)\n and dbname in ($dbname)\norder by\n dbname,\n node_id,\n time desc\n",
84+
"rawSql": "select\n dbname,\n tag_data->>'node_id' as node_id,\n data->>'role' as role,\n data->>'status' as status,\n case when data->>'status' = 'up' then 1 else 0 end as status_num,\n data->>'hostname' as hostname,\n data->>'port' as port,\n data->>'lb_weight' as lb_weight,\n data->>'load_balance_node' as load_balance_node,\n data->>'replication_delay' as replication_delay,\n data->>'select_cnt' as select_cnt, \n data->>'last_status_change' as last_status_change, \n time::text as \"updated\"\nfrom\n pgpool_stats\nwhere\n time = (select max(time) from pgpool_stats where dbname in ($dbname) and $__timeFilter(time) group by dbname)\n and dbname in ($dbname)\norder by\n dbname,\n node_id,\n time desc\n",
8585
"refId": "A",
8686
"select": [
8787
[
@@ -154,7 +154,7 @@
154154
"group": [],
155155
"metricColumn": "none",
156156
"rawQuery": true,
157-
"rawSql": "SELECT\n $__timeGroup(time, $agg_interval),\n (max((data->'select_cnt')::int8) - min((data->'select_cnt')::int8))::int8 as select_cnt,\n dbname || ' (node: ' || (tag_data->>'node_id')::text || ')' as node_id\nFROM\n pgpool_stats\nWHERE\n $__timeFilter(time)\n AND dbname in ($dbname)\nGROUP BY\n 1, 3\nORDER BY\n 1\n",
157+
"rawSql": "SELECT\n $__timeGroup(time, $agg_interval),\n (max((data->>'select_cnt')::int8) - min((data->>'select_cnt')::int8))::int8 as select_cnt,\n dbname || ' (node: ' || (tag_data->>'node_id')::text || ')' as node_id\nFROM\n pgpool_stats\nWHERE\n $__timeFilter(time)\n AND dbname in ($dbname)\nGROUP BY\n 1, 3\nORDER BY\n 1\n",
158158
"refId": "A",
159159
"select": [
160160
[

grafana/postgres/v11/pgpool-stats.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@
142142
"group": [],
143143
"metricColumn": "none",
144144
"rawQuery": true,
145-
"rawSql": "select\n dbname,\n tag_data->>'node_id' as node_id,\n data->>'role' as role,\n data->>'status' as status,\n case when data->>'status' = 'up' then 1 else 0 end as status_num,\n data->>'hostname' as hostname,\n data->>'port' as port,\n data->>'lb_weight' as lb_weight,\n data->>'load_balance_node' as load_balance_node,\n data->>'replication_delay' as replication_delay,\n data->>'select_cnt' as select_cnt,\n data->>'processes_total' as processes_total,\n data->>'processes_active' as processes_active, \n data->>'last_status_change' as last_status_change, \n date_trunc('second', time::timestamp)::time as time\nfrom\n pgpool_stats\nwhere\n time = (select max(time) from pgpool_stats where dbname in ($dbname) and $__timeFilter(time) group by dbname)\n and dbname in ($dbname)\norder by\n dbname,\n node_id,\n time desc\n",
145+
"rawSql": "select\n dbname,\n tag_data->>'node_id' as node_id,\n data->>'role' as role,\n data->>'status' as status,\n case when data->>'status' = 'up' then 1 else 0 end as status_num,\n data->>'hostname' as hostname,\n data->>'port' as port,\n data->>'lb_weight' as lb_weight,\n data->>'load_balance_node' as load_balance_node,\n data->>'replication_delay' as replication_delay,\n data->>'select_cnt' as select_cnt, \n data->>'last_status_change' as last_status_change, \n time::text as \"updated\"\nfrom\n pgpool_stats\nwhere\n time = (select max(time) from pgpool_stats where dbname in ($dbname) and $__timeFilter(time) group by dbname)\n and dbname in ($dbname)\norder by\n dbname,\n node_id,\n time desc\n",
146146
"refId": "A",
147147
"select": [
148148
[
@@ -268,7 +268,7 @@
268268
"group": [],
269269
"metricColumn": "none",
270270
"rawQuery": true,
271-
"rawSql": "SELECT\n $__timeGroup(time, $agg_interval),\n (max((data->'select_cnt')::int8) - min((data->'select_cnt')::int8))::int8 as select_cnt,\n dbname || ' (node: ' || (tag_data->>'node_id')::text || ')' as node_id\nFROM\n pgpool_stats\nWHERE\n $__timeFilter(time)\n AND dbname in ($dbname)\nGROUP BY\n 1, 3\nORDER BY\n 1\n",
271+
"rawSql": "SELECT\n $__timeGroup(time, $agg_interval),\n (max((data->>'select_cnt')::int8) - min((data->>'select_cnt')::int8))::int8 as select_cnt,\n dbname || ' (node: ' || (tag_data->>'node_id')::text || ')' as node_id\nFROM\n pgpool_stats\nWHERE\n $__timeFilter(time)\n AND dbname in ($dbname)\nGROUP BY\n 1, 3\nORDER BY\n 1\n",
272272
"refId": "A",
273273
"select": [
274274
[

internal/metrics/metrics.yaml

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -992,14 +992,12 @@ metrics:
992992
pgbouncer_clients:
993993
sqls:
994994
0: show clients
995+
pgpool_processes:
996+
sqls:
997+
3: show pool_processes
995998
pgpool_stats:
996999
sqls:
997-
3: |-
998-
/* SHOW POOL_NODES expected to be 1st "command" */
999-
SHOW POOL_NODES;
1000-
/* special handling in code - when below SHOW POOL_PROCESSES line is defined pgpool_stats will have additional summary columns:
1001-
processes_total, processes_active */
1002-
SHOW POOL_PROCESSES
1000+
3: show pool_nodes
10031001
privilege_changes:
10041002
sqls:
10051003
11: |-
@@ -4161,9 +4159,10 @@ presets:
41614159
pgbouncer_stats: 60
41624160
pgbouncer_clients: 60
41634161
pgpool:
4164-
description: pool global stats, 1 row per node ID
4162+
description: pool global stats
41654163
metrics:
41664164
pgpool_stats: 60
4165+
pgpool_processes: 60
41674166
prometheus:
41684167
description: similar to "exhaustive" but without some possibly longer-running metrics and those keeping state
41694168
metrics:

internal/metrics/types.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package metrics
33
import (
44
"maps"
55
"time"
6+
7+
"github.com/jackc/pgx/v5"
68
)
79

810
type (
@@ -77,6 +79,25 @@ const (
7779

7880
type Measurement map[string]any
7981

82+
// RowToMap returns a map scanned from row.
83+
func RowToMeasurement(row pgx.CollectableRow) (map[string]any, error) {
84+
value := NewMeasurement(time.Now().UnixNano())
85+
err := row.Scan((*Measurement)(&value))
86+
return value, err
87+
}
88+
89+
func (m *Measurement) ScanRow(rows pgx.Rows) error {
90+
values, err := rows.Values()
91+
if err != nil {
92+
return err
93+
}
94+
// *rs = make(Measurement, len(values))
95+
for i := range values {
96+
(*m)[string(rows.FieldDescriptions()[i].Name)] = values[i]
97+
}
98+
return nil
99+
}
100+
80101
func NewMeasurement(epoch int64) Measurement {
81102
m := make(Measurement)
82103
m[EpochColumnName] = epoch

internal/reaper/database.go

Lines changed: 6 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"strconv"
87
"strings"
98
"time"
109

@@ -38,10 +37,13 @@ func QueryMeasurements(ctx context.Context, dbUnique string, sql string, args ..
3837
return nil, err
3938
}
4039
conn = tx
40+
} else {
41+
// we want simple protocol for non-postgres connections, e.g. pgpool
42+
args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
4143
}
4244
rows, err := conn.Query(ctx, sql, args...)
4345
if err == nil {
44-
return pgx.CollectRows(rows, pgx.RowToMap)
46+
return pgx.CollectRows(rows, metrics.RowToMeasurement)
4547
}
4648
return nil, err
4749
}
@@ -461,103 +463,11 @@ func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique s
461463
}
462464
}
463465

464-
// some extra work needed as pgpool SHOW commands don't specify the return data types for some reason
465-
func FetchMetricsPgpool(ctx context.Context, msg MetricFetchConfig, md *sources.SourceConn, mvp metrics.Metric) (metrics.Measurements, error) {
466-
var retData = make(metrics.Measurements, 0)
467-
epochNs := time.Now().UnixNano()
468-
469-
sqlLines := strings.Split(strings.ToUpper(mvp.GetSQL(int(md.Version))), "\n")
470-
471-
for _, sql := range sqlLines {
472-
if strings.HasPrefix(sql, "SHOW POOL_NODES") {
473-
data, err := QueryMeasurements(ctx, msg.DBUniqueName, sql)
474-
if err != nil {
475-
log.GetLogger(ctx).Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
476-
return data, err
477-
}
478-
479-
for _, row := range data {
480-
retRow := metrics.NewMeasurement(epochNs)
481-
for k, v := range row {
482-
vs := string(v.([]byte))
483-
// need 1 tag so that Influx would not merge rows
484-
if k == "node_id" {
485-
retRow["tag_node_id"] = vs
486-
continue
487-
}
488-
489-
retRow[k] = vs
490-
if k == "status" { // was changed from numeric to string at some pgpool version so leave the string
491-
// but also add "status_num" field
492-
switch vs {
493-
case "up":
494-
retRow["status_num"] = 1
495-
case "down":
496-
retRow["status_num"] = 0
497-
default:
498-
i, err := strconv.ParseInt(vs, 10, 64)
499-
if err == nil {
500-
retRow["status_num"] = i
501-
}
502-
}
503-
continue
504-
}
505-
// everything is returned as text, so try to convert all numerics into ints / floats
506-
if k != "lb_weight" {
507-
i, err := strconv.ParseInt(vs, 10, 64)
508-
if err == nil {
509-
retRow[k] = i
510-
continue
511-
}
512-
}
513-
f, err := strconv.ParseFloat(vs, 64)
514-
if err == nil {
515-
retRow[k] = f
516-
continue
517-
}
518-
}
519-
retData = append(retData, retRow)
520-
}
521-
} else if strings.HasPrefix(sql, "SHOW POOL_PROCESSES") {
522-
if len(retData) == 0 {
523-
log.GetLogger(ctx).Warningf("[%s][%s] SHOW POOL_NODES needs to be placed before SHOW POOL_PROCESSES. ignoring SHOW POOL_PROCESSES", msg.DBUniqueName, msg.MetricName)
524-
continue
525-
}
526-
527-
data, err := QueryMeasurements(ctx, msg.DBUniqueName, sql)
528-
if err != nil {
529-
log.GetLogger(ctx).Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
530-
continue
531-
}
532-
533-
// summarize processesTotal / processes_active over all rows
534-
processesTotal := 0
535-
processesActive := 0
536-
for _, row := range data {
537-
processesTotal++
538-
v, ok := row["database"]
539-
if !ok {
540-
log.GetLogger(ctx).Infof("[%s][%s] column 'database' not found from data returned by SHOW POOL_PROCESSES, check pool version / SQL definition", msg.DBUniqueName, msg.MetricName)
541-
continue
542-
}
543-
if len(v.([]byte)) > 0 {
544-
processesActive++
545-
}
546-
}
547-
548-
for _, retRow := range retData {
549-
retRow["processes_total"] = processesTotal
550-
retRow["processes_active"] = processesActive
551-
}
552-
}
553-
}
554-
return retData, nil
555-
}
556-
557466
// Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing.
558467
// With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific
559468
// whitelisting or "trusted extensions" (a feature from v13). Ignores errors.
560469
func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionNames []string, existingExtensions map[string]int) []string {
470+
// TODO: move to sources package and use direct pgx connection
561471
sqlAvailable := `select name::text from pg_available_extensions`
562472
extsCreated := make([]string, 0)
563473

@@ -595,6 +505,7 @@ func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionN
595505

596506
// Called once on daemon startup to try to create "metric fething helper" functions automatically
597507
func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error) {
508+
// TODO: replace with md.GetMetricDefs() and move to sources package
598509
metricConfig := func() map[string]float64 {
599510
if len(md.Metrics) > 0 {
600511
return md.Metrics

internal/reaper/reaper.go

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,6 @@ func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceC
269269
hostState := make(map[string]map[string]string)
270270
var lastUptimeS int64 = -1 // used for "server restarted" event detection
271271
var lastErrorNotificationTime time.Time
272-
// var vme MonitoredDatabaseSettings
273272
var mvp metrics.Metric
274273
var err error
275274
var ok bool
@@ -458,7 +457,7 @@ func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, ver *source
458457
}
459458

460459
func (r *Reaper) FetchMetric(ctx context.Context, msg MetricFetchConfig, hostState map[string]map[string]string) (*metrics.MeasurementEnvelope, error) {
461-
var dbVersion int
460+
// TODO: replace MetricFetchConfig with SourceConn
462461
var err error
463462
var sql string
464463
var data, cachedData metrics.Measurements
@@ -476,14 +475,6 @@ func (r *Reaper) FetchMetric(ctx context.Context, msg MetricFetchConfig, hostSta
476475
return nil, metrics.ErrMetricNotFound
477476
}
478477

479-
if err = md.FetchRuntimeInfo(ctx, false); err != nil {
480-
return nil, err
481-
}
482-
dbVersion = md.Version
483-
if msg.Source == sources.SourcePgBouncer {
484-
dbVersion = 0 // version is 0.0 for all pgbouncer sql per convention
485-
}
486-
487478
if metric.IsInstanceLevel && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && msg.Interval < r.Metrics.CacheAge() {
488479
cacheKey = fmt.Sprintf("%s:%s:%d:%s",
489480
md.SystemIdentifier,
@@ -496,7 +487,7 @@ func (r *Reaper) FetchMetric(ctx context.Context, msg MetricFetchConfig, hostSta
496487
goto send_to_storageChannel
497488
}
498489

499-
sql = metric.GetSQL(dbVersion)
490+
sql = metric.GetSQL(md.Version)
500491
if sql == "" && !(msg.MetricName == specialMetricChangeEvents || msg.MetricName == recoMetricName) {
501492
// let's ignore dummy SQLs
502493
log.GetLogger(ctx).Debugf("[%s:%s] Ignoring fetch message - got an empty/dummy SQL string", msg.DBUniqueName, msg.MetricName)
@@ -508,43 +499,36 @@ func (r *Reaper) FetchMetric(ctx context.Context, msg MetricFetchConfig, hostSta
508499
return nil, nil
509500
}
510501

511-
if msg.MetricName == specialMetricChangeEvents { // special handling, multiple queries + stateful
502+
switch msg.MetricName {
503+
case specialMetricChangeEvents:
512504
r.CheckForPGObjectChangesAndStore(ctx, msg.DBUniqueName, md, hostState) // TODO no hostState for Prometheus currently
513-
} else if msg.MetricName == recoMetricName {
505+
return nil, nil
506+
case recoMetricName:
514507
if data, err = GetRecommendations(ctx, msg.DBUniqueName, md); err != nil {
515508
return nil, err
516509
}
517-
} else if msg.Source == sources.SourcePgPool {
518-
if data, err = FetchMetricsPgpool(ctx, msg, md, metric); err != nil {
519-
return nil, err
520-
}
521-
} else {
522-
data, err = QueryMeasurements(ctx, msg.DBUniqueName, sql)
523-
524-
if err != nil {
510+
default:
511+
if data, err = QueryMeasurements(ctx, msg.DBUniqueName, sql); err != nil {
525512
// let's soften errors to "info" from functions that expect the server to be a primary to reduce noise
526513
if strings.Contains(err.Error(), "recovery is in progress") && md.IsInRecovery {
527514
log.GetLogger(ctx).Debugf("[%s:%s] failed to fetch metrics: %s", msg.DBUniqueName, msg.MetricName, err)
528515
return nil, err
529516
}
530-
531517
if msg.MetricName == specialMetricInstanceUp {
532518
log.GetLogger(ctx).WithError(err).Debugf("[%s:%s] failed to fetch metrics. marking instance as not up", msg.DBUniqueName, msg.MetricName)
533519
data = make(metrics.Measurements, 1)
534520
data[0] = metrics.NewMeasurement(time.Now().UnixNano())
535521
data[0]["is_up"] = 0 // should be updated if the "instance_up" metric definition is changed
536522
goto send_to_storageChannel
537523
}
538-
539524
log.GetLogger(ctx).
540525
WithFields(map[string]any{"source": msg.DBUniqueName, "metric": msg.MetricName}).
541526
WithError(err).Error("failed to fetch metrics")
542527

543528
return nil, err
544529
}
545-
546-
log.GetLogger(ctx).WithFields(map[string]any{"source": msg.DBUniqueName, "metric": msg.MetricName, "rows": len(data)}).Info("measurements fetched")
547530
}
531+
log.GetLogger(ctx).WithFields(map[string]any{"source": msg.DBUniqueName, "metric": msg.MetricName, "rows": len(data)}).Info("measurements fetched")
548532

549533
r.measurementCache.Put(cacheKey, data)
550534

internal/reaper/recommendations.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ const (
1616
recoMetricName = "recommendations"
1717
specialMetricChangeEvents = "change_events"
1818
specialMetricServerLogEventCounts = "server_log_event_counts"
19-
specialMetricPgpoolStats = "pgpool_stats"
2019
specialMetricInstanceUp = "instance_up"
2120
)
2221

internal/sources/conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ FROM
206206
}
207207

208208
func (md *SourceConn) FetchVersion(ctx context.Context, sql string) (version string, ver int, err error) {
209-
if err = md.Conn.QueryRow(ctx, sql).Scan(&version); err != nil {
209+
if err = md.Conn.QueryRow(ctx, sql, pgx.QueryExecModeSimpleProtocol).Scan(&version); err != nil {
210210
return
211211
}
212212
ver = VersionToInt(version)

0 commit comments

Comments
 (0)