@@ -5,14 +5,15 @@ import (
55 "os"
66 "path/filepath"
77 "testing"
8+ "time"
89
910 "github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
1011 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
1112 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
1213 "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
1314 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
1415 "github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
15- "github.com/pashagolub/pgxmock/v4"
16+ "github.com/pashagolub/pgxmock/v4"
1617 "github.com/stretchr/testify/assert"
1718 "github.com/stretchr/testify/require"
1819)
@@ -100,14 +101,14 @@ func TestReaper_LoadSources(t *testing.T) {
100101
101102 t .Run ("Test source config changes trigger restart" , func (t * testing.T ) {
102103 baseSource := sources.Source {
103- Name : "TestSource" ,
104- IsEnabled : true ,
105- Kind : sources .SourcePostgres ,
106- ConnStr : "postgres://localhost:5432/testdb" ,
107- Metrics : map [string ]float64 {"cpu" : 10 , "memory" : 20 },
108- MetricsStandby : map [string ]float64 {"cpu" : 30 },
109- CustomTags : map [string ]string {"env" : "test" },
110- Group : "default" ,
104+ Name : "TestSource" ,
105+ IsEnabled : true ,
106+ Kind : sources .SourcePostgres ,
107+ ConnStr : "postgres://localhost:5432/testdb" ,
108+ Metrics : map [string ]float64 {"cpu" : 10 , "memory" : 20 },
109+ MetricsStandby : map [string ]float64 {"cpu" : 30 },
110+ CustomTags : map [string ]string {"env" : "test" },
111+ Group : "default" ,
111112 }
112113
113114 testCases := []struct {
@@ -331,3 +332,254 @@ func TestReaper_LoadSources(t *testing.T) {
331332 assert .Nil (t , mockConn1 .ExpectationsWereMet (), "Expected all mock expectations to be met" )
332333 })
333334}
335+
336+ func newFetchMetricReaper () * Reaper {
337+ return & Reaper {
338+ Options : & cmdopts.Options {
339+ Metrics : metrics.CmdOpts {},
340+ Sinks : sinks.CmdOpts {},
341+ },
342+ measurementCache : NewInstanceMetricCache (),
343+ }
344+ }
345+
346+ func TestReaper_FetchMetric (t * testing.T ) {
347+ ctx := log .WithLogger (t .Context (), log .NewNoopLogger ())
348+
349+ t .Run ("metric not found in definitions" , func (t * testing.T ) {
350+ r := newFetchMetricReaper ()
351+ md , mock := createTestSourceConn (t )
352+ defer mock .Close ()
353+
354+ env , err := r .FetchMetric (ctx , md , "nonexistent_metric_xyz" )
355+ assert .ErrorIs (t , err , metrics .ErrMetricNotFound )
356+ assert .Nil (t , env )
357+ assert .NoError (t , mock .ExpectationsWereMet ())
358+ })
359+
360+ t .Run ("primary-only metric skipped on standby" , func (t * testing.T ) {
361+ r := newFetchMetricReaper ()
362+ metricDefs .MetricDefs ["primary_only_metric" ] = metrics.Metric {
363+ SQLs : metrics.SQLs {0 : "SELECT 1" },
364+ NodeStatus : "primary" ,
365+ }
366+ md , mock := createTestSourceConn (t )
367+ defer mock .Close ()
368+ md .IsInRecovery = true
369+
370+ env , err := r .FetchMetric (ctx , md , "primary_only_metric" )
371+ assert .NoError (t , err )
372+ assert .Nil (t , env )
373+ assert .NoError (t , mock .ExpectationsWereMet ())
374+ })
375+
376+ t .Run ("standby-only metric skipped on primary" , func (t * testing.T ) {
377+ r := newFetchMetricReaper ()
378+ metricDefs .MetricDefs ["standby_only_metric" ] = metrics.Metric {
379+ SQLs : metrics.SQLs {0 : "SELECT 1" },
380+ NodeStatus : "standby" ,
381+ }
382+ md , mock := createTestSourceConn (t )
383+ defer mock .Close ()
384+ md .IsInRecovery = false
385+
386+ env , err := r .FetchMetric (ctx , md , "standby_only_metric" )
387+ assert .NoError (t , err )
388+ assert .Nil (t , env )
389+ assert .NoError (t , mock .ExpectationsWereMet ())
390+ })
391+
392+ t .Run ("default metric with no SQL for version returns nil" , func (t * testing.T ) {
393+ r := newFetchMetricReaper ()
394+ metricDefs .MetricDefs ["no_sql_metric" ] = metrics.Metric {
395+ SQLs : metrics.SQLs {}, // no SQL defined
396+ }
397+ md , mock := createTestSourceConn (t )
398+ defer mock .Close ()
399+
400+ env , err := r .FetchMetric (ctx , md , "no_sql_metric" )
401+ assert .NoError (t , err )
402+ assert .Nil (t , env )
403+ assert .NoError (t , mock .ExpectationsWereMet ())
404+ })
405+
406+ t .Run ("default metric query success returns envelope" , func (t * testing.T ) {
407+ r := newFetchMetricReaper ()
408+ metricDefs .MetricDefs ["test_metric" ] = metrics.Metric {
409+ SQLs : metrics.SQLs {0 : "SELECT 1" },
410+ }
411+ md , mock := createTestSourceConn (t )
412+ defer mock .Close ()
413+ md .Source .Name = "mydb"
414+ md .Source .CustomTags = map [string ]string {"env" : "prod" }
415+
416+ rows := pgxmock .NewRows ([]string {"epoch_ns" , "value" }).
417+ AddRow (time .Now ().UnixNano (), int64 (42 ))
418+ mock .ExpectQuery ("SELECT 1" ).WillReturnRows (rows )
419+
420+ env , err := r .FetchMetric (ctx , md , "test_metric" )
421+ require .NoError (t , err )
422+ require .NotNil (t , env )
423+ assert .Equal (t , "mydb" , env .DBName )
424+ assert .Equal (t , "test_metric" , env .MetricName )
425+ assert .Len (t , env .Data , 1 )
426+ assert .Equal (t , map [string ]string {"env" : "prod" }, env .CustomTags )
427+ assert .NoError (t , mock .ExpectationsWereMet ())
428+ })
429+
430+ t .Run ("default metric query error returns error" , func (t * testing.T ) {
431+ r := newFetchMetricReaper ()
432+ metricDefs .MetricDefs ["error_metric" ] = metrics.Metric {
433+ SQLs : metrics.SQLs {0 : "SELECT fail" },
434+ }
435+ md , mock := createTestSourceConn (t )
436+ defer mock .Close ()
437+
438+ mock .ExpectQuery ("SELECT fail" ).WillReturnError (assert .AnError )
439+
440+ env , err := r .FetchMetric (ctx , md , "error_metric" )
441+ assert .Error (t , err )
442+ assert .Nil (t , env )
443+ assert .NoError (t , mock .ExpectationsWereMet ())
444+ })
445+
446+ t .Run ("default metric query returns empty rows" , func (t * testing.T ) {
447+ r := newFetchMetricReaper ()
448+ metricDefs .MetricDefs ["empty_metric" ] = metrics.Metric {
449+ SQLs : metrics.SQLs {0 : "SELECT empty" },
450+ }
451+ md , mock := createTestSourceConn (t )
452+ defer mock .Close ()
453+
454+ mock .ExpectQuery ("SELECT empty" ).WillReturnRows (pgxmock .NewRows ([]string {"epoch_ns" }))
455+
456+ env , err := r .FetchMetric (ctx , md , "empty_metric" )
457+ assert .NoError (t , err )
458+ assert .Nil (t , env )
459+ assert .NoError (t , mock .ExpectationsWereMet ())
460+ })
461+
462+ t .Run ("storage name used as metric name in envelope" , func (t * testing.T ) {
463+ r := newFetchMetricReaper ()
464+ metricDefs .MetricDefs ["logical_metric" ] = metrics.Metric {
465+ SQLs : metrics.SQLs {0 : "SELECT 1" },
466+ StorageName : "physical_metric" ,
467+ }
468+ md , mock := createTestSourceConn (t )
469+ defer mock .Close ()
470+
471+ rows := pgxmock .NewRows ([]string {"epoch_ns" , "v" }).
472+ AddRow (time .Now ().UnixNano (), int64 (1 ))
473+ mock .ExpectQuery ("SELECT 1" ).WillReturnRows (rows )
474+
475+ env , err := r .FetchMetric (ctx , md , "logical_metric" )
476+ require .NoError (t , err )
477+ require .NotNil (t , env )
478+ assert .Equal (t , "physical_metric" , env .MetricName )
479+ assert .NoError (t , mock .ExpectationsWereMet ())
480+ })
481+
482+ t .Run ("instance_up special metric returns envelope via GetInstanceUpMeasurement" , func (t * testing.T ) {
483+ r := newFetchMetricReaper ()
484+ metricDefs .MetricDefs [specialMetricInstanceUp ] = metrics.Metric {
485+ SQLs : metrics.SQLs {0 : "SELECT 1" },
486+ }
487+ md , mock := createTestSourceConn (t )
488+ defer mock .Close ()
489+ mock .ExpectPing ()
490+
491+ env , err := r .FetchMetric (ctx , md , specialMetricInstanceUp )
492+ require .NoError (t , err )
493+ require .NotNil (t , env )
494+ assert .Equal (t , specialMetricInstanceUp , env .MetricName )
495+ assert .Len (t , env .Data , 1 )
496+ assert .Equal (t , 1 , env .Data [0 ][specialMetricInstanceUp ])
497+ assert .NoError (t , mock .ExpectationsWereMet ())
498+ })
499+
500+ t .Run ("change_events special metric returns nil when no changes detected" , func (t * testing.T ) {
501+ r := newFetchMetricReaper ()
502+ metricDefs .MetricDefs [specialMetricChangeEvents ] = metrics.Metric {
503+ SQLs : metrics.SQLs {0 : "SELECT 1" },
504+ }
505+ // Remove all hash metric definitions so detection functions return early
506+ delete (metricDefs .MetricDefs , "sproc_hashes" )
507+ delete (metricDefs .MetricDefs , "table_hashes" )
508+ delete (metricDefs .MetricDefs , "index_hashes" )
509+ delete (metricDefs .MetricDefs , "configuration_hashes" )
510+ delete (metricDefs .MetricDefs , "privilege_hashes" )
511+
512+ md , mock := createTestSourceConn (t )
513+ defer mock .Close ()
514+
515+ env , err := r .FetchMetric (ctx , md , specialMetricChangeEvents )
516+ assert .NoError (t , err )
517+ assert .Nil (t , env , "expected nil envelope when no changes detected" )
518+ assert .NoError (t , mock .ExpectationsWereMet ())
519+ })
520+
521+ t .Run ("cache hit serves data without querying DB" , func (t * testing.T ) {
522+ r := newFetchMetricReaper ()
523+ r .Options .Metrics .InstanceLevelCacheMaxSeconds = 30
524+
525+ metricDefs .MetricDefs ["cached_metric" ] = metrics.Metric {
526+ SQLs : metrics.SQLs {0 : "SELECT 1" },
527+ IsInstanceLevel : true ,
528+ }
529+ md , mock := createTestSourceConn (t )
530+ defer mock .Close ()
531+ md .Source .Metrics = map [string ]float64 {"cached_metric" : 10 }
532+
533+ // Pre-populate the cache
534+ cachedData := metrics.Measurements {
535+ metrics.Measurement {
536+ metrics .EpochColumnName : time .Now ().UnixNano (),
537+ "value" : int64 (99 ),
538+ },
539+ }
540+ cacheKey := md .GetClusterIdentifier () + ":cached_metric"
541+ r .measurementCache .Put (cacheKey , cachedData )
542+
543+ // No DB query expected
544+ env , err := r .FetchMetric (ctx , md , "cached_metric" )
545+ require .NoError (t , err )
546+ require .NotNil (t , env )
547+ assert .Equal (t , "cached_metric" , env .MetricName )
548+ assert .Len (t , env .Data , 1 )
549+ assert .NoError (t , mock .ExpectationsWereMet ())
550+ })
551+
552+ t .Run ("sysinfo fields added to measurements" , func (t * testing.T ) {
553+ r := newFetchMetricReaper ()
554+ r .Options .Sinks .RealDbnameField = "real_dbname"
555+ r .Options .Sinks .SystemIdentifierField = "sys_id"
556+ metricDefs .MetricDefs ["sysinfo_metric" ] = metrics.Metric {
557+ SQLs : metrics.SQLs {0 : "SELECT sysinfo" },
558+ }
559+ md , mock := createTestSourceConn (t )
560+ defer mock .Close ()
561+ md .RealDbname = "realdb"
562+ md .SystemIdentifier = "42"
563+
564+ rows := pgxmock .NewRows ([]string {"epoch_ns" , "v" }).
565+ AddRow (time .Now ().UnixNano (), int64 (1 ))
566+ mock .ExpectQuery ("SELECT sysinfo" ).WillReturnRows (rows )
567+
568+ env , err := r .FetchMetric (ctx , md , "sysinfo_metric" )
569+ require .NoError (t , err )
570+ require .NotNil (t , env )
571+ assert .Equal (t , "realdb" , env .Data [0 ]["real_dbname" ])
572+ assert .Equal (t , "42" , env .Data [0 ]["sys_id" ])
573+ assert .NoError (t , mock .ExpectationsWereMet ())
574+ })
575+ }
576+
577+ func TestWriteMeasurements (t * testing.T ) {
578+ ctx , cancel := context .WithCancel (log .WithLogger (t .Context (), log .NewNoopLogger ()))
579+ defer cancel ()
580+ r := NewReaper (ctx , & cmdopts.Options {
581+ SinksWriter : testutil .NewMockWriter (assert .AnError , false , nil ),
582+ })
583+ go r .WriteMeasurements (ctx )
584+ r .WriteInstanceDown (& sources.SourceConn {})
585+ }
0 commit comments