-
Notifications
You must be signed in to change notification settings - Fork 109
Expand file tree
/
Copy pathtest_basic.py
More file actions
109 lines (77 loc) · 3.39 KB
/
test_basic.py
File metadata and controls
109 lines (77 loc) · 3.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import os
import pytest
from dbt.tests.adapter.basic.test_adapter_methods import BaseAdapterMethod
from dbt.tests.adapter.basic.test_base import BaseSimpleMaterializations
from dbt.tests.adapter.basic.test_empty import BaseEmpty
from dbt.tests.adapter.basic.test_ephemeral import BaseEphemeral
from dbt.tests.adapter.basic.test_generic_tests import BaseGenericTests
from dbt.tests.adapter.basic.test_incremental import BaseIncremental
from dbt.tests.adapter.basic.test_singular_tests import BaseSingularTests
from dbt.tests.adapter.basic.test_singular_tests_ephemeral import BaseSingularTestsEphemeral
from dbt.tests.adapter.basic.test_snapshot_check_cols import BaseSnapshotCheckCols
from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp
from dbt.tests.util import run_dbt
class TestSimpleMaterializations(BaseSimpleMaterializations):
def test_existing_view_materialization(self, project, models):
"""Test that materializing an existing view works correctly."""
# Create a temporary model file directly in the project
model_path = os.path.join(project.project_root, "models", "view_model_exists.sql")
# Write the initial model without the value column
with open(model_path, "w") as f:
f.write(
"""
{{ config(materialized='view') }}
select
1 as id
{% if var('include_value_column', false) %}
, 2 as value
{% endif %}
"""
)
# First run to create the view without the extra column
results = run_dbt(["run", "-m", "view_model_exists"])
assert len(results) == 1
# Generate catalog to get column information
catalog = run_dbt(["docs", "generate"])
# Check columns in the catalog
node_id = "model.base.view_model_exists"
assert node_id in catalog.nodes
# Get columns from the catalog
columns = catalog.nodes[node_id].columns
column_names = [name.lower() for name in columns.keys()]
# Verify only the id column exists
assert "id" in column_names
assert "value" not in column_names
# Second run with a variable to include the extra column
results = run_dbt(
["run", "-m", "view_model_exists", "--vars", '{"include_value_column": true}']
)
assert len(results) == 1
# Generate catalog again to get updated column information
catalog = run_dbt(["docs", "generate"])
# Get updated columns from the catalog
columns = catalog.nodes[node_id].columns
column_names = [name.lower() for name in columns.keys()]
# Verify both columns exist now
assert "id" in column_names
assert "value" in column_names
class TestSingularTests(BaseSingularTests):
pass
@pytest.mark.skip(reason="SQLServer doesn't support nested CTE")
class TestSingularTestsEphemeral(BaseSingularTestsEphemeral):
pass
class TestEmpty(BaseEmpty):
pass
@pytest.mark.skip(reason="SQLServer doesn't support nested CTE")
class TestEphemeral(BaseEphemeral):
pass
class TestIncremental(BaseIncremental):
pass
class TestGenericTests(BaseGenericTests):
pass
class TestSnapshotCheckCols(BaseSnapshotCheckCols):
pass
class TestSnapshotTimestamp(BaseSnapshotTimestamp):
pass
class TestBaseAdapterMethod(BaseAdapterMethod):
pass