Skip to content

Commit 2093f2e

Browse files
committed
Make config properties boolean
- fix naming, remove extra log prefixes - use `MustCompile()` for constant regex - consistently return errors without logging - inline `LogConfig` - fix query to determine log settings
1 parent a5756b7 commit 2093f2e

4 files changed

Lines changed: 113 additions & 137 deletions

File tree

internal/reaper/logparser.go

Lines changed: 60 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package reaper
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"os"
78
"path/filepath"
89
"regexp"
@@ -38,64 +39,60 @@ const maxChunkSize uint64 = 10 * 1024 * 1024 // 10 MB
3839
const maxTrackedFiles = 2500
3940

4041
type LogParser struct {
41-
ctx context.Context
42-
LogsMatchRegex *regexp.Regexp
43-
LogCfg *LogConfigs
44-
SourceConn *sources.SourceConn
45-
Interval float64
46-
StoreCh chan<- metrics.MeasurementEnvelope
47-
eventCounts map[string]int64 // for the specific DB. [WARNING: 34, ERROR: 10, ...], zeroed on storage send
48-
eventCountsTotal map[string]int64 // for the whole instance
49-
lastSendTime time.Time
50-
fileOffsets map[string]uint64 // map of log file paths to last read offsets
42+
*LogConfig
43+
ctx context.Context
44+
LogsMatchRegex *regexp.Regexp
45+
SourceConn *sources.SourceConn
46+
Interval float64
47+
StoreCh chan<- metrics.MeasurementEnvelope
48+
eventCounts map[string]int64 // for the specific DB. [WARNING: 34, ERROR: 10, ...], zeroed on storage send
49+
eventCountsTotal map[string]int64 // for the whole instance
50+
lastSendTime time.Time
51+
fileOffsets map[string]uint64 // map of log file paths to last read offsets
5152
}
5253

53-
type LogConfigs struct {
54-
IsLogCollectorEnabled string
55-
LogDestination string
56-
ServerMessagesLang string
57-
LogTruncOnRotation string
58-
LogFolder string
54+
type LogConfig struct {
55+
CollectorEnabled bool
56+
CSVDestination bool
57+
TruncateOnRotation bool
58+
Directory string
59+
ServerMessagesLang string
5960
}
6061

61-
func NewLogParser(ctx context.Context, mdb *sources.SourceConn, storeCh chan<- metrics.MeasurementEnvelope) (*LogParser, error) {
62+
func NewLogParser(ctx context.Context, mdb *sources.SourceConn, storeCh chan<- metrics.MeasurementEnvelope) (lp *LogParser, err error) {
6263

6364
logger := log.GetLogger(ctx).WithField("source", mdb.Name).WithField("metric", specialMetricServerLogEventCounts)
6465
ctx = log.WithLogger(ctx, logger)
6566

66-
logsRegex, err := regexp.Compile(csvLogDefaultRegEx)
67-
if err != nil {
68-
logger.WithError(err).Error("Invalid log parsing regex")
69-
return nil, err
70-
}
67+
logsRegex := regexp.MustCompile(csvLogDefaultRegEx)
68+
7169
logger.Debugf("Using %s as log parsing regex", logsRegex)
7270

73-
var logCfg *LogConfigs
74-
if logCfg, err = tryDetermineLogSettings(ctx, mdb.Conn); err != nil {
75-
logger.WithError(err).Error("Could not determine Postgres logs settings")
76-
return nil, err
71+
var cfg *LogConfig
72+
if cfg, err = tryDetermineLogSettings(ctx, mdb.Conn); err != nil {
73+
return nil, fmt.Errorf("could not determine Postgres logs settings: %w", err)
7774
}
7875

79-
if logCfg.IsLogCollectorEnabled != "on" {
76+
if !cfg.CollectorEnabled {
8077
return nil, errors.New("logging_collector is not enabled on the db server")
8178
}
8279

83-
if !strings.Contains(logCfg.LogDestination, "csvlog") {
80+
if !cfg.CSVDestination {
8481
return nil, errors.New("log_destination must contain 'csvlog' for log parsing to work")
8582
}
8683

87-
logger.Debugf("Considering log files in folder: %s", logCfg.LogFolder)
84+
logger.Debugf("Considering log files in folder: %s", cfg.Directory)
8885

8986
return &LogParser{
90-
ctx: ctx,
91-
LogsMatchRegex: logsRegex,
92-
SourceConn: mdb,
93-
Interval: mdb.GetMetricInterval(specialMetricServerLogEventCounts),
94-
StoreCh: storeCh,
95-
LogCfg: logCfg,
96-
eventCounts: make(map[string]int64),
97-
eventCountsTotal: make(map[string]int64),
98-
fileOffsets: make(map[string]uint64),
87+
ctx: ctx,
88+
LogsMatchRegex: logsRegex,
89+
SourceConn: mdb,
90+
Interval: mdb.GetMetricInterval(specialMetricServerLogEventCounts),
91+
StoreCh: storeCh,
92+
LogConfig: cfg,
93+
eventCounts: make(map[string]int64),
94+
eventCountsTotal: make(map[string]int64),
95+
fileOffsets: make(map[string]uint64),
9996
}, nil
10097
}
10198

@@ -106,56 +103,41 @@ func (lp *LogParser) HasSendIntervalElapsed() bool {
106103
func (lp *LogParser) ParseLogs() error {
107104
l := log.GetLogger(lp.ctx)
108105
if ok, err := db.IsClientOnSameHost(lp.SourceConn.Conn); ok && err == nil {
109-
l.Info("DB is on the same host. parsing logs locally")
110-
if err = checkHasLocalPrivileges(lp.LogCfg.LogFolder); err == nil {
106+
l.Info("DB is on the same host, parsing logs locally")
107+
if err = checkHasLocalPrivileges(lp.Directory); err == nil {
111108
return lp.parseLogsLocal()
112109
}
113-
l.WithError(err).Error("Could't parse logs locally. lacking required privileges")
110+
l.WithError(err).Error("Could't parse logs locally, lacking required privileges")
114111
}
115112

116-
l.Info("DB is not detected to be on the same host. parsing logs remotely")
117-
if err := checkHasRemotePrivileges(lp.ctx, lp.SourceConn, lp.LogCfg.LogFolder); err != nil {
118-
l.WithError(err).Error("Could't parse logs remotely. lacking required privileges")
113+
l.Info("DB is not detected to be on the same host, parsing logs remotely")
114+
if err := checkHasRemotePrivileges(lp.ctx, lp.SourceConn, lp.Directory); err != nil {
115+
l.WithError(err).Error("could't parse logs remotely, lacking required privileges")
119116
return err
120117
}
121118
return lp.parseLogsRemote()
122119
}
123120

124-
func tryDetermineLogSettings(ctx context.Context, conn db.PgxIface) (*LogConfigs, error) {
121+
func tryDetermineLogSettings(ctx context.Context, conn db.PgxIface) (cfg *LogConfig, err error) {
125122
sql := `select
126-
current_setting('logging_collector') as is_enabled,
127-
current_setting('log_destination') as ldest,
128-
current_setting('data_directory') as dd,
129-
current_setting('log_directory') as ld,
130-
current_setting('lc_messages')::varchar(2) as lc_messages,
131-
current_setting('log_truncate_on_rotation') as log_trunc`
132-
133-
logCfg := &LogConfigs{}
134-
var dataDir, logDir string
135-
err := conn.QueryRow(ctx, sql).Scan(
136-
&logCfg.IsLogCollectorEnabled,
137-
&logCfg.LogDestination,
138-
&dataDir,
139-
&logDir,
140-
&logCfg.ServerMessagesLang,
141-
&logCfg.LogTruncOnRotation,
142-
)
143-
if err != nil {
144-
return nil, err
145-
}
146-
147-
// TODO: what if the pg server runs on windows?
148-
if strings.HasPrefix(logDir, "/") {
149-
logCfg.LogFolder = logDir
150-
} else {
151-
logCfg.LogFolder = filepath.Join(dataDir, logDir)
152-
}
153-
154-
if _, ok := pgSeveritiesLocale[logCfg.ServerMessagesLang]; !ok {
155-
logCfg.ServerMessagesLang = "en"
156-
}
157-
158-
return logCfg, nil
123+
current_setting('logging_collector') = 'on' as is_enabled,
124+
strpos(current_setting('log_destination'), 'csvlog') > 0 as csvlog_dest,
125+
current_setting('log_truncate_on_rotation') = 'on' as log_trunc,
126+
case
127+
when current_setting('log_directory') ~ '^(\w:)?\/.+' then current_setting('log_directory')
128+
else current_setting('data_directory') || '/' || current_setting('log_directory')
129+
end as log_dir,
130+
current_setting('lc_messages')::varchar(2) as lc_messages`
131+
var res pgx.Rows
132+
if res, err = conn.Query(ctx, sql); err == nil {
133+
if cfg, err = pgx.CollectOneRow(res, pgx.RowToAddrOfStructByPos[LogConfig]); err == nil {
134+
if _, ok := pgSeveritiesLocale[cfg.ServerMessagesLang]; !ok {
135+
cfg.ServerMessagesLang = "en"
136+
}
137+
return cfg, nil
138+
}
139+
}
140+
return nil, err
159141
}
160142

161143
func checkHasRemotePrivileges(ctx context.Context, mdb *sources.SourceConn, logsDirPath string) error {

internal/reaper/logparser_local.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func (lp *LogParser) parseLogsLocal() error {
2020
var currInterval time.Duration
2121

2222
logger := log.GetLogger(lp.ctx)
23-
logsGlobPath := filepath.Join(lp.LogCfg.LogFolder, csvLogDefaultGlobSuffix)
23+
logsGlobPath := filepath.Join(lp.Directory, csvLogDefaultGlobSuffix)
2424

2525
for { // re-try loop. re-start in case of FS errors or just to refresh host config
2626
select {
@@ -64,7 +64,7 @@ func (lp *LogParser) parseLogsLocal() error {
6464
reader = bufio.NewReader(latestHandle)
6565

6666
linesOffset, ok := lp.fileOffsets[latest]
67-
if ok && lp.LogCfg.LogTruncOnRotation == "off" {
67+
if ok && !lp.TruncateOnRotation {
6868
linesRead = int(linesOffset)
6969
}
7070
if (ok || previous == latest) && linesRead > 0 { // skip already read lines
@@ -108,7 +108,7 @@ func (lp *LogParser) parseLogsLocal() error {
108108
// check for newly opened logfiles
109109
file, _ := getFileWithNextModTimestamp(logsGlobPath, latest)
110110
if file != "" && file != latest {
111-
if lp.LogCfg.LogTruncOnRotation == "off" {
111+
if !lp.TruncateOnRotation {
112112
lp.fileOffsets[latest] = uint64(linesRead)
113113
if len(lp.fileOffsets) > maxTrackedFiles {
114114
clear(lp.fileOffsets) // To avoid unbounded growth
@@ -138,8 +138,8 @@ func (lp *LogParser) parseLogsLocal() error {
138138
time.Sleep(time.Minute)
139139
break
140140
}
141-
if lp.LogCfg.ServerMessagesLang != "en" {
142-
errorSeverity = severityToEnglish(lp.LogCfg.ServerMessagesLang, errorSeverity)
141+
if lp.ServerMessagesLang != "en" {
142+
errorSeverity = severityToEnglish(lp.ServerMessagesLang, errorSeverity)
143143
}
144144

145145
databaseName, ok := result["database_name"]

internal/reaper/logparser_remote.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (lp *LogParser) parseLogsRemote() error {
3636
sql := "select name, size, modification from pg_ls_logdir() where name like '%csv' order by modification desc limit 1;"
3737
err := lp.SourceConn.Conn.QueryRow(lp.ctx, sql).Scan(&latestLogFile, &size, &modification)
3838
if err != nil {
39-
logger.Infof("No logfiles found in log dir: '%s'", lp.LogCfg.LogFolder)
39+
logger.Infof("No logfiles found in log dir: '%s'", lp.Directory)
4040
continue
4141
}
4242
offset = size // Seek to an end
@@ -45,7 +45,7 @@ func (lp *LogParser) parseLogsRemote() error {
4545
}
4646

4747
if linesRead == numOfLines && size != offset {
48-
logFilePath := filepath.Join(lp.LogCfg.LogFolder, latestLogFile)
48+
logFilePath := filepath.Join(lp.Directory, latestLogFile)
4949
sizeToRead := min(maxChunkSize, size-offset)
5050
err := lp.SourceConn.Conn.QueryRow(lp.ctx, "select pg_read_file($1, $2, $3)", logFilePath, offset, sizeToRead).Scan(&chunk)
5151
offset += sizeToRead
@@ -82,15 +82,15 @@ func (lp *LogParser) parseLogsRemote() error {
8282
sql := "select name, size from pg_ls_logdir() where modification > $1 and name like '%csv' order by modification, name limit 1;"
8383
err := lp.SourceConn.Conn.QueryRow(lp.ctx, sql, modification).Scan(&fileName, &latestSize)
8484
if err == nil && latestLogFile != fileName {
85-
if lp.LogCfg.LogTruncOnRotation == "off" {
85+
if !lp.TruncateOnRotation {
8686
lp.fileOffsets[latestLogFile] = size
8787
if len(lp.fileOffsets) > maxTrackedFiles {
8888
clear(lp.fileOffsets) // To avoid unbounded growth
8989
}
9090
}
9191
latestLogFile = fileName
9292
size = latestSize
93-
if lastOffset, ok := lp.fileOffsets[latestLogFile]; ok && lp.LogCfg.LogTruncOnRotation == "off" {
93+
if lastOffset, ok := lp.fileOffsets[latestLogFile]; ok && !lp.TruncateOnRotation {
9494
offset = lastOffset
9595
} else {
9696
offset = 0
@@ -114,8 +114,8 @@ func (lp *LogParser) parseLogsRemote() error {
114114
if len(matches) != 0 {
115115
result := lp.regexMatchesToMap(matches)
116116
errorSeverity := result["error_severity"]
117-
if lp.LogCfg.ServerMessagesLang != "en" {
118-
errorSeverity = severityToEnglish(lp.LogCfg.ServerMessagesLang, errorSeverity)
117+
if lp.ServerMessagesLang != "en" {
118+
errorSeverity = severityToEnglish(lp.ServerMessagesLang, errorSeverity)
119119
}
120120

121121
databaseName := result["database_name"]

0 commit comments

Comments
 (0)