-
Notifications
You must be signed in to change notification settings - Fork 52
Expand file tree
/
Copy pathDuckDbInitializer.cs
More file actions
877 lines (788 loc) · 38.5 KB
/
DuckDbInitializer.cs
File metadata and controls
877 lines (788 loc) · 38.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DuckDB.NET.Data;
using Microsoft.Extensions.Logging;
namespace PerformanceMonitorLite.Database;
/// <summary>
/// Initializes the DuckDB database and creates tables on first run.
/// </summary>
public class DuckDbInitializer
{
private readonly string _databasePath;
private readonly ILogger<DuckDbInitializer>? _logger;
/// <summary>
/// Coordinates UI readers with maintenance writers (CHECKPOINT, archive DELETEs, compaction).
/// Read locks allow unlimited concurrent UI queries. Write locks are exclusive and wait
/// for all readers to finish before proceeding.
/// </summary>
private static readonly ReaderWriterLockSlim s_dbLock = new(LockRecursionPolicy.NoRecursion);
/// <summary>
/// Acquires a read lock on the database. Multiple readers can hold this concurrently.
/// Dispose the returned object to release the lock.
/// If the current thread already owns a read lock (e.g., leaked by an unhandled exception),
/// returns a no-op disposable to allow the operation to proceed.
/// </summary>
public IDisposable AcquireReadLock()
{
try
{
s_dbLock.EnterReadLock();
}
catch (LockRecursionException)
{
/* The current thread already owns a read lock — likely leaked by an unhandled
exception that prevented Dispose(). Since we're already protected by a read lock,
return a no-op disposable so the caller can proceed normally. */
return NoOpDisposable.Instance;
}
return new LockReleaser(s_dbLock, write: false);
}
/// <summary>
/// Acquires an exclusive write lock on the database. Blocks until all readers finish.
/// Dispose the returned object to release the lock.
/// </summary>
public IDisposable AcquireWriteLock()
{
s_dbLock.EnterWriteLock();
return new LockReleaser(s_dbLock, write: true);
}
private sealed class NoOpDisposable : IDisposable
{
public static readonly NoOpDisposable Instance = new();
public void Dispose() { }
}
private sealed class LockReleaser : IDisposable
{
private readonly ReaderWriterLockSlim _lock;
private readonly bool _write;
private bool _disposed;
public LockReleaser(ReaderWriterLockSlim rwLock, bool write)
{
_lock = rwLock;
_write = write;
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
if (_write) _lock.ExitWriteLock();
else _lock.ExitReadLock();
}
}
/// <summary>
/// Current schema version. Increment this when schema changes require table rebuilds.
/// </summary>
internal const int CurrentSchemaVersion = 22;
private readonly string _archivePath;
public DuckDbInitializer(string databasePath, ILogger<DuckDbInitializer>? logger = null)
{
_databasePath = databasePath;
_logger = logger;
_archivePath = Path.Combine(Path.GetDirectoryName(databasePath) ?? ".", "archive");
}
/* Tables that have parquet archives — views are created to UNION hot data with archived parquet files.
IMPORTANT: Must match ArchiveService.ArchivableTables — every archived table needs an archive view. */
private static readonly string[] ArchivableTables =
[
"wait_stats", "query_stats", "procedure_stats", "query_store_stats",
"query_snapshots", "cpu_utilization_stats", "file_io_stats", "memory_stats",
"memory_clerks", "tempdb_stats", "perfmon_stats", "deadlocks",
"blocked_process_reports", "memory_grant_stats", "waiting_tasks",
"running_jobs", "database_size_stats", "server_properties",
"session_stats", "server_config", "database_config",
"database_scoped_config", "trace_flags", "config_alert_log",
"collection_log"
];
/// <summary>
/// Gets the connection string for the DuckDB database.
/// Disables automatic WAL checkpoints to prevent 2-3s stop-the-world stalls
/// during collector writes. Manual CHECKPOINT runs between collection cycles instead.
/// </summary>
public string ConnectionString => $"Data Source={_databasePath};checkpoint_threshold=1GB";
/// <summary>
/// Ensures the database exists and all tables are created.
/// Handles DuckDB version mismatches by exporting data to Parquet, recreating the database, and importing.
/// </summary>
public async Task InitializeAsync()
{
_logger?.LogInformation("Initializing DuckDB database at {Path}", _databasePath);
var directory = Path.GetDirectoryName(_databasePath);
if (!string.IsNullOrEmpty(directory) && !Directory.Exists(directory))
{
Directory.CreateDirectory(directory);
_logger?.LogInformation("Created database directory: {Directory}", directory);
}
var archivePath = Path.Combine(directory ?? ".", "archive");
if (!Directory.Exists(archivePath))
{
Directory.CreateDirectory(archivePath);
_logger?.LogInformation("Created archive directory: {ArchivePath}", archivePath);
}
/* Try to open the database. If the DuckDB storage version has changed,
this will throw. We handle it by exporting to Parquet, rebuilding, and importing. */
DuckDBConnection connection;
try
{
connection = new DuckDBConnection(ConnectionString);
await connection.OpenAsync();
}
catch (Exception ex) when (IsStorageVersionError(ex))
{
_logger?.LogWarning("DuckDB storage version mismatch detected. Migrating data via Parquet export/import.");
await MigrateViaParquetAsync(archivePath);
connection = new DuckDBConnection(ConnectionString);
await connection.OpenAsync();
}
using (connection)
{
await ExecuteNonQueryAsync(connection,
"CREATE TABLE IF NOT EXISTS schema_version (version INTEGER NOT NULL)");
var existingVersion = await GetSchemaVersionAsync(connection);
/* On a fresh/reset database (v0), skip migrations entirely — they DROP tables
expecting CREATE TABLE to follow, which is destructive on a blank DB.
Just create tables with the current schema and stamp the version. */
if (existingVersion > 0 && existingVersion < CurrentSchemaVersion)
{
_logger?.LogInformation("Schema upgrade needed: v{Old} -> v{New}", existingVersion, CurrentSchemaVersion);
await RunMigrationsAsync(connection, existingVersion);
}
foreach (var tableStatement in Schema.GetAllTableStatements())
{
await ExecuteNonQueryAsync(connection, tableStatement);
}
foreach (var indexStatement in Schema.GetAllIndexStatements())
{
await ExecuteNonQueryAsync(connection, indexStatement);
}
if (existingVersion < CurrentSchemaVersion)
{
await SetSchemaVersionAsync(connection, CurrentSchemaVersion);
}
_logger?.LogInformation("Database initialization complete. Schema version: {Version}", CurrentSchemaVersion);
}
await CreateArchiveViewsAsync();
await InitializeAnalysisSchemaAsync();
}
/// <summary>
/// Checks if an exception is a DuckDB storage version mismatch.
/// </summary>
private static bool IsStorageVersionError(Exception ex)
{
/* DuckDB version mismatch errors include:
- "Serialization Error: Failed to deserialize" (incompatible storage format)
- "IO Error: Trying to read a database file with version number X, but we can only read version Y"
Note: Since DuckDB v0.10+, backward compatibility is maintained (newer reads older).
This primarily catches forward-incompatibility (older library, newer file). */
var message = ex.ToString().ToLowerInvariant();
return message.Contains("serialization error")
|| message.Contains("failed to deserialize")
|| message.Contains("trying to read a database file with version")
|| message.Contains("storage version")
|| message.Contains("unable to open database");
}
/// <summary>
/// Exports all tables from the old database to Parquet, deletes the database, and reimports.
/// Uses DuckDB's EXPORT DATABASE which writes one Parquet file per table.
/// </summary>
private async Task MigrateViaParquetAsync(string archivePath)
{
var exportDir = Path.Combine(archivePath, $"upgrade_{DateTime.Now:yyyyMMdd_HHmmss}");
Directory.CreateDirectory(exportDir);
/* Step 1: Try to export from the old database using EXPORT DATABASE.
Since DuckDB v0.10+, newer versions can read older files (backward compat),
so upgrading DuckDB should normally open the file without hitting this path.
This mainly handles edge cases (e.g., downgrade, corruption).
If the file is truly unreadable, the backup preserves it for manual recovery
using the original DuckDB version's CLI: duckdb old.db "EXPORT DATABASE 'dir'" */
var exported = false;
try
{
/* Attempt read-only open — some version mismatches allow read but not write */
var readOnlyConnStr = $"Data Source={_databasePath};ACCESS_MODE=READ_ONLY";
using (var oldConn = new DuckDBConnection(readOnlyConnStr))
{
await oldConn.OpenAsync();
/* Export all tables to Parquet */
using var cmd = oldConn.CreateCommand();
cmd.CommandText = $"EXPORT DATABASE '{exportDir.Replace("'", "''")}' (FORMAT PARQUET)";
await cmd.ExecuteNonQueryAsync();
exported = true;
_logger?.LogInformation("Exported old database to {ExportDir}", exportDir);
}
}
catch (Exception ex)
{
_logger?.LogWarning(ex, "Could not export old database — data will be preserved as backup file only");
}
/* Step 2: Back up and delete the old database file */
var backupPath = _databasePath + $".backup_{DateTime.Now:yyyyMMdd_HHmmss}";
try
{
/* DuckDB may have .wal files too */
File.Move(_databasePath, backupPath);
_logger?.LogInformation("Backed up old database to {BackupPath}", backupPath);
var walPath = _databasePath + ".wal";
if (File.Exists(walPath))
{
File.Move(walPath, backupPath + ".wal");
}
}
catch (Exception ex)
{
_logger?.LogError(ex, "Failed to back up old database, deleting instead");
File.Delete(_databasePath);
var walPath = _databasePath + ".wal";
if (File.Exists(walPath)) File.Delete(walPath);
}
/* Step 3: If we exported successfully, import into the fresh database */
if (exported)
{
try
{
using var newConn = new DuckDBConnection(ConnectionString);
await newConn.OpenAsync();
using var cmd = newConn.CreateCommand();
cmd.CommandText = $"IMPORT DATABASE '{exportDir.Replace("'", "''")}' ";
await cmd.ExecuteNonQueryAsync();
_logger?.LogInformation("Imported data from Parquet export into new database");
}
catch (Exception ex)
{
/* Import may fail if schema changed between versions — that's okay,
the normal initialization will create fresh tables */
_logger?.LogWarning(ex, "Could not import Parquet data — starting with fresh tables. " +
"Parquet files preserved at {ExportDir} for manual recovery.", exportDir);
}
}
}
private async Task<int> GetSchemaVersionAsync(DuckDBConnection connection)
{
try
{
using var command = connection.CreateCommand();
command.CommandText = "SELECT COALESCE(MAX(version), 0) FROM schema_version";
var result = await command.ExecuteScalarAsync();
return Convert.ToInt32(result);
}
catch
{
return 0;
}
}
private async Task SetSchemaVersionAsync(DuckDBConnection connection, int version)
{
await ExecuteNonQueryAsync(connection, "DELETE FROM schema_version");
using var command = connection.CreateCommand();
command.CommandText = "INSERT INTO schema_version (version) VALUES ($1)";
command.Parameters.Add(new DuckDBParameter { Value = version });
await command.ExecuteNonQueryAsync();
}
/// <summary>
/// Runs schema migrations from the given version up to CurrentSchemaVersion.
/// Each migration drops and recreates affected tables.
///
/// IMPORTANT: When adding a new data collection table, you must also register it in:
/// 1. Schema.cs — GetAllTableStatements() and GetAllIndexStatements()
/// 2. DuckDbInitializer.cs — ArchivableTables (archive view creation)
/// 3. ArchiveService.cs — ArchivableTables (parquet export + purge)
/// Forgetting any of these causes unbounded growth and 512 MB reset loops.
/// </summary>
private async Task RunMigrationsAsync(DuckDBConnection connection, int fromVersion)
{
if (fromVersion < 2)
{
/* v2: Added delta columns to query_stats (delta_logical_writes, delta_physical_reads, delta_spills)
and procedure_stats (delta_logical_reads, delta_logical_writes, delta_physical_reads).
Added plan_id, avg_logical_writes, avg_physical_reads to query_store_stats.
Restructured blocked_process_reports. */
_logger?.LogInformation("Running migration to v2: rebuilding query_stats, procedure_stats, query_store_stats, blocked_process_reports");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS query_stats");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS procedure_stats");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS query_store_stats");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS blocking_snapshots"); /* Cleanup - table no longer used */
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS blocked_process_reports");
}
if (fromVersion < 3)
{
/* v3: Fix server_id values. Previously used string.GetHashCode() which is
randomized per process in .NET Core, producing different IDs on each restart.
Now uses a deterministic FNV-1a hash of server_name. This migration updates
all existing rows to use the correct deterministic server_id. */
_logger?.LogInformation("Running migration to v3: fixing server_id values (non-deterministic hash -> deterministic)");
await FixServerIdsAsync(connection);
}
if (fromVersion < 4)
{
/* v4: Added sql_duration_ms and duckdb_duration_ms columns to collection_log
for split collector timing (SQL query vs DuckDB insert).
Only ALTER if the table already exists — on fresh installs it will be
created with these columns by GetAllTableStatements(). */
_logger?.LogInformation("Running migration to v4: adding timing columns to collection_log");
try
{
await ExecuteNonQueryAsync(connection, "ALTER TABLE collection_log ADD COLUMN IF NOT EXISTS sql_duration_ms INTEGER");
await ExecuteNonQueryAsync(connection, "ALTER TABLE collection_log ADD COLUMN IF NOT EXISTS duckdb_duration_ms INTEGER");
}
catch
{
/* Table doesn't exist yet — will be created with correct schema below */
}
}
if (fromVersion < 5)
{
/* v5: Added database_scoped_config and trace_flags tables
for database-scoped configuration and active trace flag collection. */
_logger?.LogInformation("Running migration to v5: adding database_scoped_config and trace_flags tables");
await ExecuteNonQueryAsync(connection, Schema.CreateDatabaseScopedConfigTable);
await ExecuteNonQueryAsync(connection, Schema.CreateDatabaseScopedConfigIndex);
await ExecuteNonQueryAsync(connection, Schema.CreateTraceFlagsTable);
await ExecuteNonQueryAsync(connection, Schema.CreateTraceFlagsIndex);
}
if (fromVersion < 6)
{
/* v6: Added sql_handle and plan_handle to query_stats and procedure_stats,
and query_plan_hash to query_store_stats for cross-referencing.
Must drop/recreate because ALTER TABLE appends columns at the end,
but the DuckDB appender writes by position and expects specific column order. */
_logger?.LogInformation("Running migration to v6: rebuilding query_stats, procedure_stats, query_store_stats for handle columns");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS query_stats");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS procedure_stats");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS query_store_stats");
}
if (fromVersion < 7)
{
/* v7: Changed collection_log.log_id from INTEGER to BIGINT.
GenerateCollectionId() returns a long seeded from DateTime.UtcNow.Ticks
which overflows 32-bit INTEGER, causing all collection_log INSERTs to fail silently. */
_logger?.LogInformation("Running migration to v7: rebuilding collection_log (log_id INTEGER -> BIGINT)");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS collection_log");
}
if (fromVersion < 8)
{
/* v8: Added min_worker_time, max_worker_time, min_elapsed_time, max_elapsed_time,
and total_spills columns to procedure_stats for parity with Dashboard.
Must drop/recreate because DuckDB appender writes by position. */
_logger?.LogInformation("Running migration to v8: rebuilding procedure_stats for min/max/spills columns");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS procedure_stats");
}
if (fromVersion < 9)
{
/* v9: Added dismissed column to config_alert_log for hide/dismiss functionality.
Safe to ALTER because this table uses INSERT (not appender). */
_logger?.LogInformation("Running migration to v9: adding dismissed column to config_alert_log");
try
{
/* DuckDB does not support ADD COLUMN with NOT NULL — use nullable with DEFAULT */
await ExecuteNonQueryAsync(connection, "ALTER TABLE config_alert_log ADD COLUMN IF NOT EXISTS dismissed BOOLEAN DEFAULT false");
}
catch
{
/* Table doesn't exist yet — will be created with correct schema below */
}
}
if (fromVersion < 10)
{
/* v10: Added server_name column to collection_log so log entries
can be identified by server without needing a lookup table. */
_logger?.LogInformation("Running migration to v10: adding server_name column to collection_log");
try
{
await ExecuteNonQueryAsync(connection, "ALTER TABLE collection_log ADD COLUMN IF NOT EXISTS server_name VARCHAR");
}
catch
{
/* Table doesn't exist yet — will be created with correct schema below */
}
}
if (fromVersion < 11)
{
/* v11: Expanded database_config from 9 to 28 columns (sys.databases).
Added state_desc, collation, RCSI, snapshot isolation, stats settings,
encryption, security, and version-gated columns (ADR, memory optimized, optimized locking). */
_logger?.LogInformation("Running migration to v11: rebuilding database_config for expanded sys.databases columns");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS database_config");
}
if (fromVersion < 12)
{
/* v12: Added login_name, host_name, program_name, open_transaction_count,
percent_complete columns to query_snapshots for Issue #149. */
_logger?.LogInformation("Running migration to v12: adding session columns to query_snapshots");
try
{
await ExecuteNonQueryAsync(connection, "ALTER TABLE query_snapshots ADD COLUMN IF NOT EXISTS login_name VARCHAR");
await ExecuteNonQueryAsync(connection, "ALTER TABLE query_snapshots ADD COLUMN IF NOT EXISTS host_name VARCHAR");
await ExecuteNonQueryAsync(connection, "ALTER TABLE query_snapshots ADD COLUMN IF NOT EXISTS program_name VARCHAR");
await ExecuteNonQueryAsync(connection, "ALTER TABLE query_snapshots ADD COLUMN IF NOT EXISTS open_transaction_count INTEGER");
await ExecuteNonQueryAsync(connection, "ALTER TABLE query_snapshots ADD COLUMN IF NOT EXISTS percent_complete DECIMAL(5,2)");
}
catch
{
/* Table doesn't exist yet — will be created with correct schema below */
}
}
if (fromVersion < 13)
{
/* v13: Full column parity with Dashboard for all three query/procedure collectors.
query_stats: added creation_time, last_execution_time, total_clr_time,
min/max physical_reads, rows, spills, memory grant columns (6), thread columns (4).
procedure_stats: added cached_time, last_execution_time,
min/max logical_reads, physical_reads, logical_writes, spills.
query_store_stats: complete rebuild with all min/max columns, DOP, CLR,
memory, tempdb, plan forcing, compilation metrics, version-gated columns.
Must drop/recreate because DuckDB appender writes by position. */
_logger?.LogInformation("Running migration to v13: rebuilding query_stats, procedure_stats, query_store_stats for full Dashboard column parity");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS query_stats");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS procedure_stats");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS query_store_stats");
}
if (fromVersion < 14)
{
/* v14: Switched memory_grant_stats from per-session (dm_exec_query_memory_grants)
to semaphore-level (dm_exec_query_resource_semaphores) for parity with Dashboard.
Old schema had session_id, query_text, dop, etc. New schema has
resource_semaphore_id, pool_id, and delta columns.
Must drop/recreate because column layout is completely different. */
_logger?.LogInformation("Running migration to v14: rebuilding memory_grant_stats for resource semaphore schema");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS memory_grant_stats");
}
if (fromVersion < 15)
{
/* v15: Added queued I/O columns (io_stall_queued_read_ms, io_stall_queued_write_ms)
and their delta counterparts to file_io_stats for latency overlay charts.
Must drop/recreate because DuckDB appender writes by position. */
_logger?.LogInformation("Running migration to v15: rebuilding file_io_stats for queued I/O columns");
await ExecuteNonQueryAsync(connection, "DROP TABLE IF EXISTS file_io_stats");
}
if (fromVersion < 16)
{
/* v16: Added database_size_stats and server_properties tables for FinOps monitoring.
New tables only — no existing table changes needed. Tables created by
GetAllTableStatements() during initialization. */
_logger?.LogInformation("Running migration to v16: adding FinOps tables (database_size_stats, server_properties)");
}
if (fromVersion < 17)
{
/* v17: Added volume-level drive space columns to database_size_stats.
Columns appended at end — safe for DuckDB appender positional writes. */
_logger?.LogInformation("Running migration to v17: adding volume stats columns to database_size_stats");
try
{
await ExecuteNonQueryAsync(connection, "ALTER TABLE database_size_stats ADD COLUMN IF NOT EXISTS volume_mount_point VARCHAR");
await ExecuteNonQueryAsync(connection, "ALTER TABLE database_size_stats ADD COLUMN IF NOT EXISTS volume_total_mb DECIMAL(19,2)");
await ExecuteNonQueryAsync(connection, "ALTER TABLE database_size_stats ADD COLUMN IF NOT EXISTS volume_free_mb DECIMAL(19,2)");
}
catch
{
/* Table doesn't exist yet — will be created with correct schema below */
}
}
if (fromVersion < 18)
{
/* v18: Added session_stats table for per-application connection tracking
from sys.dm_exec_sessions. New table only — created by GetAllTableStatements(). */
_logger?.LogInformation("Running migration to v18: adding session_stats table for application connections");
}
if (fromVersion < 19)
{
_logger?.LogInformation("Running migration to v19: adding worker thread columns to memory_stats");
try
{
await ExecuteNonQueryAsync(connection, "ALTER TABLE memory_stats ADD COLUMN IF NOT EXISTS max_workers_count INTEGER");
await ExecuteNonQueryAsync(connection, "ALTER TABLE memory_stats ADD COLUMN IF NOT EXISTS current_workers_count INTEGER");
}
catch (Exception ex)
{
_logger?.LogWarning("Migration to v19 encountered an error (non-fatal): {Error}", ex.Message);
}
}
if (fromVersion < 20)
{
_logger?.LogInformation("Running migration to v20: adding mute rules table and muted column to alert log");
try
{
/* DuckDB does not support ADD COLUMN with NOT NULL — use nullable with DEFAULT */
await ExecuteNonQueryAsync(connection, "ALTER TABLE config_alert_log ADD COLUMN IF NOT EXISTS muted BOOLEAN DEFAULT false");
}
catch (Exception ex)
{
_logger?.LogWarning("Migration to v20 encountered an error (non-fatal): {Error}", ex.Message);
}
}
if (fromVersion < 21)
{
_logger?.LogInformation("Running migration to v21: adding detail_text column to alert log");
try
{
await ExecuteNonQueryAsync(connection, "ALTER TABLE config_alert_log ADD COLUMN IF NOT EXISTS detail_text VARCHAR");
}
catch (Exception ex)
{
_logger?.LogWarning("Migration to v21 encountered an error (non-fatal): {Error}", ex.Message);
}
}
if (fromVersion < 22)
{
_logger?.LogInformation("Running migration to v22: adding growth rate and VLF count columns to database_size_stats");
try
{
await ExecuteNonQueryAsync(connection, "ALTER TABLE database_size_stats ADD COLUMN IF NOT EXISTS is_percent_growth BOOLEAN");
await ExecuteNonQueryAsync(connection, "ALTER TABLE database_size_stats ADD COLUMN IF NOT EXISTS growth_pct INTEGER");
await ExecuteNonQueryAsync(connection, "ALTER TABLE database_size_stats ADD COLUMN IF NOT EXISTS vlf_count INTEGER");
}
catch (Exception ex)
{
_logger?.LogError(ex, "Migration to v22 failed");
throw;
}
}
}
/// <summary>
/// Fixes server_id values in all tables by recomputing from server_name using the
/// deterministic hash function. Previous versions used string.GetHashCode() which
/// is randomized per process in .NET Core.
/// </summary>
private async Task FixServerIdsAsync(DuckDBConnection connection)
{
var tablesWithServerId = new[]
{
"servers", "collection_log", "wait_stats", "query_stats", "cpu_utilization_stats",
"file_io_stats", "memory_stats", "memory_clerks", "deadlocks",
"procedure_stats", "query_store_stats", "query_snapshots", "tempdb_stats",
"perfmon_stats", "server_config", "database_config",
"blocked_process_reports", "memory_grant_stats", "waiting_tasks"
};
foreach (var table in tablesWithServerId)
{
try
{
/* Get distinct server_name values from this table */
using var queryCmd = connection.CreateCommand();
queryCmd.CommandText = $"SELECT DISTINCT server_name FROM {table}";
var serverNames = new List<string>();
using (var reader = await queryCmd.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
{
if (!reader.IsDBNull(0))
serverNames.Add(reader.GetString(0));
}
}
/* Update server_id for each server_name */
foreach (var serverName in serverNames)
{
var newId = Services.RemoteCollectorService.GetDeterministicHashCode(serverName);
using var updateCmd = connection.CreateCommand();
updateCmd.CommandText = $"UPDATE {table} SET server_id = $1 WHERE server_name = $2";
updateCmd.Parameters.Add(new DuckDBParameter { Value = newId });
updateCmd.Parameters.Add(new DuckDBParameter { Value = serverName });
await updateCmd.ExecuteNonQueryAsync();
}
if (serverNames.Count > 0)
_logger?.LogInformation("Fixed server_id in {Table} for {Count} server(s)", table, serverNames.Count);
}
catch (Exception ex)
{
/* Table might not exist yet — that's fine, it will be created with correct IDs */
_logger?.LogDebug(ex, "Skipped server_id fix for {Table} (may not exist yet)", table);
}
}
}
/// <summary>
/// Creates a new connection to the database.
/// </summary>
public DuckDBConnection CreateConnection()
{
return new DuckDBConnection(ConnectionString);
}
/// <summary>
/// Creates or refreshes views that UNION hot DuckDB tables with archived parquet files.
/// Call at startup and after each archive cycle so newly archived data is queryable.
/// </summary>
public async Task CreateArchiveViewsAsync()
{
using var connection = CreateConnection();
await connection.OpenAsync();
foreach (var table in ArchivableTables)
{
try
{
var parquetGlob = Path.Combine(_archivePath, $"*_{table}.parquet");
var hasParquetFiles = Directory.Exists(_archivePath)
&& Directory.GetFiles(_archivePath, $"*_{table}.parquet").Length > 0;
string viewSql;
if (hasParquetFiles)
{
var globPath = parquetGlob.Replace("\\", "/");
viewSql = $"CREATE OR REPLACE VIEW v_{table} AS SELECT * FROM {table} UNION ALL BY NAME SELECT * FROM read_parquet('{globPath}', union_by_name=true)";
}
else
{
viewSql = $"CREATE OR REPLACE VIEW v_{table} AS SELECT * FROM {table}";
}
using var cmd = connection.CreateCommand();
cmd.CommandText = viewSql;
await cmd.ExecuteNonQueryAsync();
}
catch (Exception ex)
{
/* Schema mismatch between hot table and old parquet — fall back to table-only view */
_logger?.LogWarning(ex, "Failed to create archive view for {Table}, using table-only view", table);
try
{
using var fallbackCmd = connection.CreateCommand();
fallbackCmd.CommandText = $"CREATE OR REPLACE VIEW v_{table} AS SELECT * FROM {table}";
await fallbackCmd.ExecuteNonQueryAsync();
}
catch (Exception fallbackEx)
{
_logger?.LogError(fallbackEx, "Failed to create fallback view for {Table}", table);
}
}
}
_logger?.LogDebug("Archive views created/refreshed for {Count} tables", ArchivableTables.Length);
}
/// <summary>
/// Initializes the analysis engine schema (separate version track from main schema).
/// Only called when App.AnalysisEnabled is true.
/// Internal for test access.
/// </summary>
internal async Task InitializeAnalysisSchemaAsync()
{
using var connection = CreateConnection();
await connection.OpenAsync();
await ExecuteNonQueryAsync(connection,
"CREATE TABLE IF NOT EXISTS analysis_schema_version (version INTEGER NOT NULL)");
var existingVersion = 0;
try
{
using var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT COALESCE(MAX(version), 0) FROM analysis_schema_version";
var result = await cmd.ExecuteScalarAsync();
existingVersion = Convert.ToInt32(result);
}
catch { /* Table doesn't exist yet */ }
foreach (var tableStatement in AnalysisSchema.GetAllTableStatements())
{
await ExecuteNonQueryAsync(connection, tableStatement);
}
foreach (var indexStatement in AnalysisSchema.GetAllIndexStatements())
{
await ExecuteNonQueryAsync(connection, indexStatement);
}
if (existingVersion < AnalysisSchema.CurrentVersion)
{
// Run migrations for version upgrades
foreach (var migration in AnalysisSchema.GetMigrationStatements(existingVersion))
{
try { await ExecuteNonQueryAsync(connection, migration); }
catch { /* Column/table may already exist */ }
}
await ExecuteNonQueryAsync(connection, "DELETE FROM analysis_schema_version");
using var cmd = connection.CreateCommand();
cmd.CommandText = "INSERT INTO analysis_schema_version (version) VALUES ($1)";
cmd.Parameters.Add(new DuckDBParameter { Value = AnalysisSchema.CurrentVersion });
await cmd.ExecuteNonQueryAsync();
_logger?.LogInformation("Analysis schema initialized at version {Version}", AnalysisSchema.CurrentVersion);
}
}
/// <summary>
/// Runs a manual WAL checkpoint. Call this between collection cycles
/// to flush the WAL during idle time instead of during collector writes.
/// </summary>
public async Task CheckpointAsync()
{
using var writeLock = AcquireWriteLock();
try
{
using var connection = CreateConnection();
await connection.OpenAsync();
using var command = connection.CreateCommand();
command.CommandText = "CHECKPOINT";
await command.ExecuteNonQueryAsync();
}
catch (Exception ex)
{
_logger?.LogDebug(ex, "Manual checkpoint failed (non-critical)");
}
}
/// <summary>
/// Executes a non-query SQL statement.
/// </summary>
private async Task ExecuteNonQueryAsync(DuckDBConnection connection, string sql)
{
try
{
using var command = connection.CreateCommand();
command.CommandText = sql;
await command.ExecuteNonQueryAsync();
}
catch (Exception ex)
{
_logger?.LogError(ex, "Failed to execute SQL: {Sql}", sql.Substring(0, Math.Min(100, sql.Length)));
throw;
}
}
/// <summary>
/// Checks if the database file exists.
/// </summary>
public bool DatabaseExists()
{
return File.Exists(_databasePath);
}
/// <summary>
/// Gets the database file size in megabytes.
/// </summary>
public double GetDatabaseSizeMb()
{
if (!DatabaseExists())
{
return 0;
}
var fileInfo = new FileInfo(_databasePath);
return fileInfo.Length / (1024.0 * 1024.0);
}
/// <summary>
/// Gets the actual used data size inside the database by querying pragma_database_size().
/// Returns null if the query fails (e.g., database busy).
/// </summary>
public double? GetUsedDataSizeMb()
{
try
{
using var connection = CreateConnection();
connection.Open();
using var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT (used_blocks * block_size)::BIGINT FROM pragma_database_size()";
var result = cmd.ExecuteScalar();
if (result != null && result != DBNull.Value)
{
return Convert.ToInt64(result) / (1024.0 * 1024.0);
}
}
catch
{
/* Database may be busy — fall back to null */
}
return null;
}
/// <summary>
/// Deletes the database and WAL files, then reinitializes with fresh empty tables
/// and archive views pointing at the parquet files.
/// Acquires its own write lock — caller must NOT already hold the lock.
/// </summary>
public async Task ResetDatabaseAsync()
{
using var writeLock = AcquireWriteLock();
if (File.Exists(_databasePath))
File.Delete(_databasePath);
var walPath = _databasePath + ".wal";
if (File.Exists(walPath))
File.Delete(walPath);
_logger?.LogInformation("Database files deleted, reinitializing");
await InitializeAsync();
}
}