Skip to content

Commit f3e8a33

Browse files
refactor: extract Fabric-specific logic to dispatched macros and use edr_boolean_literal
- get_columns_snapshot_query: replace inline target.type checks with edr_boolean_literal() - get_columns_changes_query: extract to dispatched macro (fabric__/default__get_column_changes_from_baseline_cur), use edr_boolean_literal, restore dbt_utils.group_by(9) to fix test_schema_changes failures - column_monitoring_query: extract GROUP BY to dispatched column_monitoring_group_by macro - insert_as_select: convert to dispatched macro (fabric__/default__insert_as_select) - get_anomaly_query: convert both macros to dispatched (fabric__/default__ variants) - create_temp_table: simplify DROP VIEW IF EXISTS syntax - Rename CI target from 'fabric' to 'sqlserver' (honest about what we test) - Add sqlserver__generate_elementary_profile_args (separate from fabric__) Co-Authored-By: Itamar Hartstein <haritamar@gmail.com>
1 parent ddd219c commit f3e8a33

10 files changed

Lines changed: 239 additions & 188 deletions

File tree

.github/workflows/test-all-warehouses.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ jobs:
4949
${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) ||
5050
fromJSON('["latest_official", "latest_pre"]') }}
5151
warehouse-type:
52-
[postgres, clickhouse, trino, dremio, spark, duckdb, fabric]
52+
[postgres, clickhouse, trino, dremio, spark, duckdb, sqlserver]
5353
exclude:
5454
# latest_pre is only tested on postgres
5555
- dbt-version: latest_pre
@@ -63,7 +63,7 @@ jobs:
6363
- dbt-version: latest_pre
6464
warehouse-type: duckdb
6565
- dbt-version: latest_pre
66-
warehouse-type: fabric
66+
warehouse-type: sqlserver
6767
uses: ./.github/workflows/test-warehouse.yml
6868
with:
6969
warehouse-type: ${{ matrix.warehouse-type }}

.github/workflows/test-warehouse.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ on:
1818
- clickhouse
1919
- dremio
2020
- duckdb
21-
- fabric
21+
- sqlserver
2222
elementary-ref:
2323
type: string
2424
required: false
@@ -103,7 +103,7 @@ jobs:
103103
echo "Dremio is healthy."
104104
105105
- name: Start SQL Server
106-
if: inputs.warehouse-type == 'fabric'
106+
if: inputs.warehouse-type == 'sqlserver'
107107
working-directory: ${{ env.TESTS_DIR }}
108108
env:
109109
MSSQL_SA_PASSWORD: ${{ env.MSSQL_SA_PASSWORD }}
@@ -114,7 +114,7 @@ jobs:
114114
echo "SQL Server is healthy."
115115
116116
- name: Install ODBC Driver
117-
if: inputs.warehouse-type == 'fabric'
117+
if: inputs.warehouse-type == 'sqlserver'
118118
run: |
119119
curl https://packages.microsoft.com/keys/microsoft.asc | sudo tee /etc/apt/trusted.gpg.d/microsoft.asc
120120
curl https://packages.microsoft.com/config/ubuntu/$(lsb_release -rs)/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list
@@ -157,15 +157,15 @@ jobs:
157157
run:
158158
pip install${{ (inputs.dbt-version == 'latest_pre' && ' --pre') || '' }}
159159
"dbt-core${{ (!startsWith(inputs.dbt-version, 'latest') && format('=={0}', inputs.dbt-version)) || '' }}"
160-
"dbt-${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || (inputs.warehouse-type == 'spark' && 'spark[PyHive]') || (inputs.warehouse-type == 'athena' && 'athena-community') || (inputs.warehouse-type == 'fabric' && 'sqlserver') || inputs.warehouse-type }}${{ (!startsWith(inputs.dbt-version, 'latest') && format('~={0}', inputs.dbt-version)) || '' }}"
160+
"dbt-${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || (inputs.warehouse-type == 'spark' && 'spark[PyHive]') || (inputs.warehouse-type == 'athena' && 'athena-community') || inputs.warehouse-type }}${{ (!startsWith(inputs.dbt-version, 'latest') && format('~={0}', inputs.dbt-version)) || '' }}"
161161

162162
- name: Install dbt-fusion
163163
if: inputs.dbt-version == 'fusion'
164164
run: |
165165
curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s --
166166
167167
- name: Install Elementary
168-
run: pip install "./elementary[${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || (inputs.warehouse-type == 'fabric' && 'sqlserver') || inputs.warehouse-type }}]"
168+
run: pip install "./elementary[${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || inputs.warehouse-type }}]"
169169

170170
- name: Write dbt profiles
171171
env:

integration_tests/profiles/profiles.yml.j2

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ elementary_tests:
6262
schema: {{ schema_name }}
6363
threads: 8
6464

65-
fabric: &fabric
65+
sqlserver: &sqlserver
6666
type: sqlserver
6767
driver: "ODBC Driver 18 for SQL Server"
6868
server: 127.0.0.1
@@ -135,7 +135,7 @@ elementary_tests:
135135
elementary:
136136
target: postgres
137137
outputs:
138-
{%- set targets = ['postgres', 'clickhouse', 'trino', 'dremio', 'spark', 'duckdb', 'fabric', 'snowflake', 'bigquery', 'redshift', 'databricks_catalog', 'athena'] %}
138+
{%- set targets = ['postgres', 'clickhouse', 'trino', 'dremio', 'spark', 'duckdb', 'sqlserver', 'snowflake', 'bigquery', 'redshift', 'databricks_catalog', 'athena'] %}
139139
{%- for t in targets %}
140140
{{ t }}:
141141
<<: *{{ t }}

macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -192,44 +192,11 @@
192192
{%- if timestamp_column %}
193193
left join buckets on (edr_bucket_start = start_bucket_in_data)
194194
{%- endif %}
195-
{% if target.type in ["fabric", "sqlserver"] %}
196-
{# T-SQL does not support positional GROUP BY and rejects
197-
GROUP BY on constant expressions. Use actual column
198-
names when there is a timestamp, otherwise omit GROUP BY
199-
for the constant-only case (unless dimensions exist). #}
200-
{% if timestamp_column %}
201-
group by
202-
edr_bucket_start,
203-
edr_bucket_end
204-
{% if dimensions | length > 0 %}
205-
,
206-
{{
207-
elementary.select_dimensions_columns(
208-
prefixed_dimensions
209-
)
210-
}}
211-
{% endif %}
212-
{% elif dimensions | length > 0 %}
213-
group by
214-
{{
215-
elementary.select_dimensions_columns(
216-
prefixed_dimensions
217-
)
218-
}}
219-
{% endif %}
220-
{% else %}
221-
{% if dimensions | length > 0 %}
222-
group by
223-
1,
224-
2,
225-
{{
226-
elementary.select_dimensions_columns(
227-
prefixed_dimensions
228-
)
229-
}}
230-
{% else %} group by 1, 2
231-
{% endif %}
232-
{% endif %}
195+
{{
196+
elementary.column_monitoring_group_by(
197+
timestamp_column, dimensions, prefixed_dimensions
198+
)
199+
}}
233200
{%- else %}{{ elementary.empty_column_monitors_cte() }}
234201
{%- endif %}
235202

@@ -359,6 +326,46 @@
359326

360327
{% endmacro %}
361328

329+
{% macro column_monitoring_group_by(
330+
timestamp_column, dimensions, prefixed_dimensions
331+
) %}
332+
{{
333+
return(
334+
adapter.dispatch("column_monitoring_group_by", "elementary")(
335+
timestamp_column, dimensions, prefixed_dimensions
336+
)
337+
)
338+
}}
339+
{% endmacro %}
340+
341+
{% macro default__column_monitoring_group_by(
342+
timestamp_column, dimensions, prefixed_dimensions
343+
) %}
344+
{% if dimensions | length > 0 %}
345+
group by 1, 2, {{ elementary.select_dimensions_columns(prefixed_dimensions) }}
346+
{% else %} group by 1, 2
347+
{% endif %}
348+
{% endmacro %}
349+
350+
{% macro fabric__column_monitoring_group_by(
351+
timestamp_column, dimensions, prefixed_dimensions
352+
) %}
353+
{#- T-SQL does not support positional GROUP BY and rejects
354+
GROUP BY on constant expressions. Use actual column
355+
names when there is a timestamp, otherwise omit GROUP BY
356+
for the constant-only case (unless dimensions exist). -#}
357+
{% if timestamp_column %}
358+
group by
359+
edr_bucket_start,
360+
edr_bucket_end
361+
{% if dimensions | length > 0 %}
362+
, {{ elementary.select_dimensions_columns(prefixed_dimensions) }}
363+
{% endif %}
364+
{% elif dimensions | length > 0 %}
365+
group by {{ elementary.select_dimensions_columns(prefixed_dimensions) }}
366+
{% endif %}
367+
{% endmacro %}
368+
362369
{% macro select_dimensions_columns(dimension_columns, as_prefix="") %}
363370
{% set select_statements %}
364371
{%- for column in dimension_columns -%}

macros/edr/data_monitoring/schema_changes/get_columns_changes_query.sql

Lines changed: 77 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -31,59 +31,17 @@
3131
model_baseline_relation,
3232
include_added=False
3333
) %}
34-
{% if target.type in ["fabric", "sqlserver"] %}
35-
{# Fabric / T-SQL does not allow CTEs inside subqueries or derived tables.
36-
get_columns_snapshot_query returns a CTE-based query, so we materialise
37-
its result into a temp table first, then reference it with a plain SELECT.
38-
We pass into_relation so the INTO clause is placed inside the CTE's final
39-
SELECT (the only valid position in T-SQL). #}
40-
{% set tmp_snapshot = api.Relation.create(
41-
database=model_relation.database,
42-
schema=model_relation.schema,
43-
identifier=model_relation.identifier ~ "__snap_tmp",
44-
type="table",
45-
) %}
46-
{% do run_query("drop table if exists " ~ tmp_snapshot) %}
47-
{% do run_query(
48-
elementary.get_columns_snapshot_query(
49-
model_relation, full_table_name, into_relation=tmp_snapshot
50-
)
51-
) %}
52-
53-
{% set cur %}
54-
select
55-
cs.full_table_name,
56-
lower(cs.column_name) as column_name,
57-
cs.data_type,
58-
case when bl.column_name is null then cast(1 as bit) else cast(0 as bit) end as is_new,
59-
{{ elementary.datetime_now_utc_as_timestamp_column() }} as detected_at
60-
from {{ tmp_snapshot }} cs
61-
left join (
62-
select lower(column_name) as column_name, data_type
63-
from {{ model_baseline_relation }}
64-
) bl on (lower(cs.column_name) = lower(bl.column_name))
65-
where lower(cs.full_table_name) = lower('{{ full_table_name }}')
66-
{% endset %}
67-
{% else %}
68-
{% set cur %}
69-
with baseline as (
70-
select lower(column_name) as column_name, data_type
71-
from {{ model_baseline_relation }}
72-
)
73-
74-
select
75-
columns_snapshot.full_table_name,
76-
lower(columns_snapshot.column_name) as column_name,
77-
columns_snapshot.data_type,
78-
(baseline.column_name IS NULL) as is_new,
79-
{{ elementary.datetime_now_utc_as_timestamp_column() }} as detected_at
80-
from ({{ elementary.get_columns_snapshot_query(model_relation, full_table_name) }}) columns_snapshot
81-
left join baseline on (
82-
lower(columns_snapshot.column_name) = lower(baseline.column_name)
34+
{% set cur %}
35+
{{
36+
adapter.dispatch(
37+
"get_column_changes_from_baseline_cur", "elementary"
38+
)(
39+
model_relation,
40+
full_table_name,
41+
model_baseline_relation,
8342
)
84-
where lower(columns_snapshot.full_table_name) = lower('{{ full_table_name }}')
85-
{% endset %}
86-
{% endif %}
43+
}}
44+
{% endset %}
8745

8846
{% set pre %}
8947
select
@@ -148,11 +106,7 @@
148106
{{ elementary.null_string() }} as pre_data_type,
149107
detected_at as detected_at
150108
from cur
151-
where
152-
is_new
153-
= {% if target.type in ["fabric", "sqlserver"] %} cast(1 as bit)
154-
{% else %} true
155-
{% endif %}
109+
where is_new = {{ elementary.edr_boolean_literal(true) }}
156110

157111
),
158112
{% endif %}
@@ -246,14 +200,7 @@
246200
)
247201
else null
248202
end as test_results_description
249-
from all_column_changes
250-
group by
251-
full_table_name,
252-
column_name,
253-
change,
254-
data_type,
255-
pre_data_type,
256-
detected_at
203+
from all_column_changes {{ dbt_utils.group_by(9) }}
257204

258205
)
259206

@@ -270,3 +217,68 @@
270217
from column_changes_test_results
271218

272219
{%- endmacro %}
220+
221+
{% macro default__get_column_changes_from_baseline_cur(
222+
model_relation, full_table_name, model_baseline_relation
223+
) %}
224+
with
225+
baseline as (
226+
select lower(column_name) as column_name, data_type
227+
from {{ model_baseline_relation }}
228+
)
229+
230+
select
231+
columns_snapshot.full_table_name,
232+
lower(columns_snapshot.column_name) as column_name,
233+
columns_snapshot.data_type,
234+
(baseline.column_name is null) as is_new,
235+
{{ elementary.datetime_now_utc_as_timestamp_column() }} as detected_at
236+
from
237+
(
238+
{{ elementary.get_columns_snapshot_query(model_relation, full_table_name) }}
239+
) columns_snapshot
240+
left join
241+
baseline on (lower(columns_snapshot.column_name) = lower(baseline.column_name))
242+
where lower(columns_snapshot.full_table_name) = lower('{{ full_table_name }}')
243+
{% endmacro %}
244+
245+
{% macro fabric__get_column_changes_from_baseline_cur(
246+
model_relation, full_table_name, model_baseline_relation
247+
) %}
248+
{#- Fabric / T-SQL does not allow CTEs inside subqueries or derived tables.
249+
get_columns_snapshot_query returns a CTE-based query, so we materialise
250+
its result into a temp table first, then reference it with a plain SELECT.
251+
We pass into_relation so the INTO clause is placed inside the CTE's final
252+
SELECT (the only valid position in T-SQL). -#}
253+
{% set tmp_snapshot = api.Relation.create(
254+
database=model_relation.database,
255+
schema=model_relation.schema,
256+
identifier=model_relation.identifier ~ "__snap_tmp",
257+
type="table",
258+
) %}
259+
{% do run_query("drop table if exists " ~ tmp_snapshot) %}
260+
{% do run_query(
261+
elementary.get_columns_snapshot_query(
262+
model_relation, full_table_name, into_relation=tmp_snapshot
263+
)
264+
) %}
265+
266+
select
267+
cs.full_table_name,
268+
lower(cs.column_name) as column_name,
269+
cs.data_type,
270+
case
271+
when bl.column_name is null
272+
then {{ elementary.edr_boolean_literal(true) }}
273+
else {{ elementary.edr_boolean_literal(false) }}
274+
end as is_new,
275+
{{ elementary.datetime_now_utc_as_timestamp_column() }} as detected_at
276+
from {{ tmp_snapshot }} cs
277+
left join
278+
(
279+
select lower(column_name) as column_name, data_type
280+
from {{ model_baseline_relation }}
281+
) bl
282+
on (lower(cs.column_name) = lower(bl.column_name))
283+
where lower(cs.full_table_name) = lower('{{ full_table_name }}')
284+
{% endmacro %}

macros/edr/data_monitoring/schema_changes/get_columns_snapshot_query.sql

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,8 @@
100100
{{ elementary.full_column_name() }}
101101
not in ({{ known_columns_query }})
102102
and full_table_name in ({{ known_tables_query }})
103-
then
104-
{% if target.type in ["fabric", "sqlserver"] %} cast(1 as bit)
105-
{% else %} true
106-
{% endif %}
107-
else
108-
{% if target.type in ["fabric", "sqlserver"] %} cast(0 as bit)
109-
{% else %} false
110-
{% endif %}
103+
then {{ elementary.edr_boolean_literal(true) }}
104+
else {{ elementary.edr_boolean_literal(false) }}
111105
end as is_new
112106
from columns_info
113107
),

0 commit comments

Comments
 (0)