Skip to content

Commit 7097a99

Browse files
committed
feat: capture and cluster pg_stat_statements (PoC)
0 parents  commit 7097a99

12 files changed

Lines changed: 421 additions & 0 deletions

File tree

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/bin/
2+
/qshape
3+
*.test
4+
*.out
5+
clusters.json

LICENSE

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
BSD 2-Clause License
2+
3+
Copyright (c) 2026, Radim Marek
4+
All rights reserved.
5+
6+
Redistribution and use in source and binary forms, with or without
7+
modification, are permitted provided that the following conditions are met:
8+
9+
1. Redistributions of source code must retain the above copyright notice, this
10+
list of conditions and the following disclaimer.
11+
12+
2. Redistributions in binary form must reproduce the above copyright notice,
13+
this list of conditions and the following disclaimer in the documentation
14+
and/or other materials provided with the distribution.
15+
16+
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17+
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18+
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19+
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
20+
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21+
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
22+
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
23+
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
24+
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25+
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26+
27+

cluster.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package qshape
2+
3+
import "sort"
4+
5+
type (
6+
Query struct {
7+
Raw string `json:"raw"`
8+
QueryID int64 `json:"queryid,omitempty"`
9+
Calls int64 `json:"calls,omitempty"`
10+
TotalExecTimeMs float64 `json:"total_exec_time_ms,omitempty"`
11+
MeanExecTimeMs float64 `json:"mean_exec_time_ms,omitempty"`
12+
StddevExecTimeMs float64 `json:"stddev_exec_time_ms,omitempty"`
13+
Rows int64 `json:"rows,omitempty"`
14+
}
15+
16+
Cluster struct {
17+
Fingerprint string `json:"fingerprint"`
18+
Canonical string `json:"canonical"`
19+
Members []Query `json:"members"`
20+
TotalCalls int64 `json:"total_calls"`
21+
TotalExecTimeMs float64 `json:"total_exec_time_ms,omitempty"`
22+
MeanExecTimeMs float64 `json:"mean_exec_time_ms,omitempty"`
23+
Rows int64 `json:"rows,omitempty"`
24+
}
25+
)
26+
27+
// Group clusters queries by canonical fingerprint. Queries that fail to
28+
// parse become singleton clusters with empty Fingerprint. Output is
29+
// sorted by descending TotalExecTimeMs (when any timing is present),
30+
// otherwise by descending TotalCalls, with Fingerprint as the tiebreaker.
31+
func Group(queries []Query) ([]Cluster, error) {
32+
groups := make(map[string]*Cluster)
33+
var unparseable []Cluster
34+
35+
for _, q := range queries {
36+
fp, err := Fingerprint(q.Raw)
37+
if err != nil {
38+
unparseable = append(unparseable, Cluster{
39+
Fingerprint: "",
40+
Canonical: q.Raw,
41+
Members: []Query{q},
42+
TotalCalls: q.Calls,
43+
TotalExecTimeMs: q.TotalExecTimeMs,
44+
Rows: q.Rows,
45+
MeanExecTimeMs: q.MeanExecTimeMs,
46+
})
47+
continue
48+
}
49+
c, ok := groups[fp]
50+
if !ok {
51+
canonical, derr := Normalize(q.Raw)
52+
if derr != nil {
53+
canonical = q.Raw
54+
}
55+
c = &Cluster{
56+
Fingerprint: fp,
57+
Canonical: canonical,
58+
}
59+
groups[fp] = c
60+
}
61+
c.Members = append(c.Members, q)
62+
c.TotalCalls += q.Calls
63+
c.TotalExecTimeMs += q.TotalExecTimeMs
64+
c.Rows += q.Rows
65+
}
66+
67+
out := make([]Cluster, 0, len(groups)+len(unparseable))
68+
for _, c := range groups {
69+
if c.TotalCalls > 0 {
70+
c.MeanExecTimeMs = c.TotalExecTimeMs / float64(c.TotalCalls)
71+
}
72+
out = append(out, *c)
73+
}
74+
out = append(out, unparseable...)
75+
76+
hasTiming := false
77+
for _, c := range out {
78+
if c.TotalExecTimeMs > 0 {
79+
hasTiming = true
80+
break
81+
}
82+
}
83+
84+
sort.Slice(out, func(i, j int) bool {
85+
if hasTiming && out[i].TotalExecTimeMs != out[j].TotalExecTimeMs {
86+
return out[i].TotalExecTimeMs > out[j].TotalExecTimeMs
87+
}
88+
if out[i].TotalCalls != out[j].TotalCalls {
89+
return out[i].TotalCalls > out[j].TotalCalls
90+
}
91+
return out[i].Fingerprint < out[j].Fingerprint
92+
})
93+
return out, nil
94+
}

cmd/qshape/capture.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"os"
8+
"strings"
9+
10+
"github.com/boringsql/qshape"
11+
"github.com/jackc/pgx/v5"
12+
"github.com/spf13/cobra"
13+
)
14+
15+
func captureCmd() *cobra.Command {
16+
var (
17+
minCalls int64
18+
limit int
19+
database string
20+
)
21+
cmd := &cobra.Command{
22+
Use: "capture <conn-string>",
23+
Short: "Fetch pg_stat_statements (with timing) from a live PG and cluster it",
24+
Long: `Connect to a PostgreSQL node, read pg_stat_statements directly,
25+
and emit JSON clusters on stdout.
26+
27+
Requires the pg_stat_statements extension to be loaded and created
28+
in the target database. Requires PostgreSQL 13+ for the *_exec_time
29+
columns; older releases use the legacy total_time naming and are not
30+
supported.
31+
32+
Connection string accepts libpq URLs (postgres://user:pass@host/db)
33+
or keyword/value form (host=... user=... dbname=...).`,
34+
Args: cobra.ExactArgs(1),
35+
RunE: func(_ *cobra.Command, args []string) error {
36+
return runCapture(args[0], minCalls, limit, database)
37+
},
38+
}
39+
cmd.Flags().Int64Var(&minCalls, "min-calls", 0, "exclude queries with calls <= this value")
40+
cmd.Flags().IntVar(&limit, "limit", 0, "limit to top N by total_exec_time (0 = no limit)")
41+
cmd.Flags().StringVar(&database, "database", "", "filter to a specific database name (default: all)")
42+
return cmd
43+
}
44+
45+
func runCapture(connStr string, minCalls int64, limit int, database string) error {
46+
ctx := context.Background()
47+
conn, err := pgx.Connect(ctx, connStr)
48+
if err != nil {
49+
return fmt.Errorf("connect: %w", err)
50+
}
51+
defer conn.Close(ctx)
52+
53+
var sb strings.Builder
54+
sb.WriteString(`SELECT s.queryid, s.calls, s.query,
55+
s.total_exec_time, s.mean_exec_time, s.stddev_exec_time, s.rows
56+
FROM pg_stat_statements s`)
57+
args := []any{}
58+
where := []string{}
59+
if database != "" {
60+
sb.WriteString("\nJOIN pg_database d ON d.oid = s.dbid")
61+
where = append(where, fmt.Sprintf("d.datname = $%d", len(args)+1))
62+
args = append(args, database)
63+
}
64+
where = append(where, fmt.Sprintf("s.calls > $%d", len(args)+1))
65+
args = append(args, minCalls)
66+
sb.WriteString("\nWHERE ")
67+
sb.WriteString(strings.Join(where, " AND "))
68+
sb.WriteString("\nORDER BY s.total_exec_time DESC")
69+
if limit > 0 {
70+
sb.WriteString(fmt.Sprintf("\nLIMIT %d", limit))
71+
}
72+
73+
rows, err := conn.Query(ctx, sb.String(), args...)
74+
if err != nil {
75+
return fmt.Errorf("query pg_stat_statements (extension installed? PG 13+?): %w", err)
76+
}
77+
defer rows.Close()
78+
79+
var queries []qshape.Query
80+
for rows.Next() {
81+
var (
82+
qid, calls, rowCount int64
83+
raw string
84+
totalExec, meanExec, stddevExec float64
85+
)
86+
if err := rows.Scan(&qid, &calls, &raw, &totalExec, &meanExec, &stddevExec, &rowCount); err != nil {
87+
return fmt.Errorf("scan: %w", err)
88+
}
89+
queries = append(queries, qshape.Query{
90+
QueryID: qid,
91+
Calls: calls,
92+
Raw: raw,
93+
TotalExecTimeMs: totalExec,
94+
MeanExecTimeMs: meanExec,
95+
StddevExecTimeMs: stddevExec,
96+
Rows: rowCount,
97+
})
98+
}
99+
if err := rows.Err(); err != nil {
100+
return err
101+
}
102+
103+
clusters, err := qshape.Group(queries)
104+
if err != nil {
105+
return err
106+
}
107+
108+
fmt.Fprintf(os.Stderr, "captured %d queries → %d clusters\n", len(queries), len(clusters))
109+
110+
enc := json.NewEncoder(os.Stdout)
111+
enc.SetIndent("", " ")
112+
return enc.Encode(map[string]any{"clusters": clusters})
113+
}

cmd/qshape/fingerprint.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/boringsql/qshape"
7+
"github.com/spf13/cobra"
8+
)
9+
10+
func fingerprintCmd() *cobra.Command {
11+
return &cobra.Command{
12+
Use: "fingerprint [sql|-]",
13+
Short: "Print the stable AST fingerprint for a SQL statement",
14+
Args: cobra.MaximumNArgs(1),
15+
RunE: func(_ *cobra.Command, args []string) error {
16+
sql, err := readSQLArg(args)
17+
if err != nil {
18+
return err
19+
}
20+
fp, err := qshape.Fingerprint(sql)
21+
if err != nil {
22+
return err
23+
}
24+
fmt.Println(fp)
25+
return nil
26+
},
27+
}
28+
}

cmd/qshape/main.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package main
2+
3+
import "os"
4+
5+
func main() {
6+
if err := Run(); err != nil {
7+
os.Exit(1)
8+
}
9+
}

cmd/qshape/normalize.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/boringsql/qshape"
7+
"github.com/spf13/cobra"
8+
)
9+
10+
func normalizeCmd() *cobra.Command {
11+
return &cobra.Command{
12+
Use: "normalize [sql|-]",
13+
Short: "Parse and deparse SQL to its canonical form",
14+
Args: cobra.MaximumNArgs(1),
15+
RunE: func(_ *cobra.Command, args []string) error {
16+
sql, err := readSQLArg(args)
17+
if err != nil {
18+
return err
19+
}
20+
out, err := qshape.Normalize(sql)
21+
if err != nil {
22+
return err
23+
}
24+
fmt.Println(out)
25+
return nil
26+
},
27+
}
28+
}

cmd/qshape/root.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package main
2+
3+
import (
4+
"io"
5+
"os"
6+
7+
"github.com/spf13/cobra"
8+
)
9+
10+
func Run() error {
11+
root := &cobra.Command{
12+
Use: "qshape",
13+
Short: "Canonicalize, fingerprint, and cluster SQL at the AST level",
14+
}
15+
root.AddCommand(
16+
normalizeCmd(),
17+
fingerprintCmd(),
18+
captureCmd(),
19+
)
20+
return root.Execute()
21+
}
22+
23+
func readSQLArg(args []string) (string, error) {
24+
if len(args) == 1 && args[0] != "-" {
25+
return args[0], nil
26+
}
27+
b, err := io.ReadAll(os.Stdin)
28+
if err != nil {
29+
return "", err
30+
}
31+
return string(b), nil
32+
}

fingerprint.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package qshape
2+
3+
import pg_query "github.com/pganalyze/pg_query_go/v6"
4+
5+
func Fingerprint(sql string) (string, error) {
6+
fp, err := pg_query.Fingerprint(sql)
7+
if err != nil {
8+
return "", err
9+
}
10+
return "sha1:" + fp, nil
11+
}

go.mod

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
module github.com/boringsql/qshape
2+
3+
go 1.25.6
4+
5+
require (
6+
github.com/jackc/pgx/v5 v5.8.0
7+
github.com/pganalyze/pg_query_go/v6 v6.1.0
8+
github.com/spf13/cobra v1.10.2
9+
)
10+
11+
require (
12+
github.com/inconshreveable/mousetrap v1.1.0 // indirect
13+
github.com/jackc/pgpassfile v1.0.0 // indirect
14+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
15+
github.com/spf13/pflag v1.0.9 // indirect
16+
golang.org/x/text v0.29.0 // indirect
17+
google.golang.org/protobuf v1.31.0 // indirect
18+
)

0 commit comments

Comments
 (0)