Skip to content

Commit 1614973

Browse files
committed
Add Snowflake adapter with fakesnow integration tests
- Add SnowflakeAdapter with connection URL support (snowflake://) - Add Snowflake to symmetric aggregation with modulo+small multiplier to avoid overflow - Add 11 integration tests using fakesnow library (10 passing, 1 skipped) - Update SemanticLayer to recognize snowflake:// URLs - Add fakesnow to dev dependencies for testing - Add Snowflake CI job using fakesnow (no Docker image needed) - Tests at parity with BigQuery/Postgres: basic metrics, dimensions, joins, filters, ORDER BY, LIMIT, symmetric aggregates, 3-way joins
1 parent 2def7ea commit 1614973

9 files changed

Lines changed: 1030 additions & 28 deletions

File tree

.github/workflows/integration.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,25 @@ jobs:
7979
BIGQUERY_PROJECT: "test-project"
8080
BIGQUERY_DATASET: "test_dataset"
8181
run: uv run pytest -m integration tests/db/test_bigquery_integration.py -v
82+
83+
snowflake-integration:
84+
runs-on: ubuntu-latest
85+
86+
steps:
87+
- uses: actions/checkout@v4
88+
89+
- name: Install uv
90+
uses: astral-sh/setup-uv@v5
91+
with:
92+
enable-cache: true
93+
94+
- name: Set up Python
95+
run: uv python install 3.12
96+
97+
- name: Install dependencies
98+
run: uv sync --extra snowflake --extra dev
99+
100+
- name: Run Snowflake integration tests
101+
env:
102+
SNOWFLAKE_TEST: "1"
103+
run: uv run pytest -m integration tests/db/test_snowflake_integration.py -v

docker-compose.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ services:
2222
- "9050:9050"
2323
command: ["--project=test-project", "--dataset=test_dataset"]
2424

25+
# Snowflake tests use fakesnow (Python library) for mocking, no Docker service needed
26+
2527
test:
2628
build:
2729
context: .
@@ -38,6 +40,7 @@ services:
3840
BIGQUERY_EMULATOR_HOST: "bigquery:9050"
3941
BIGQUERY_PROJECT: "test-project"
4042
BIGQUERY_DATASET: "test_dataset"
43+
SNOWFLAKE_TEST: "1" # fakesnow patches snowflake.connector, no external service needed
4144
command: pytest -m integration -v
4245

4346
volumes:

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ dev = [
3030
"ruff>=0.6.0",
3131
"pandas>=2.2,<3",
3232
"numpy>=1.26,<3",
33+
"fakesnow>=0.9.0", # For Snowflake integration tests
3334
]
3435
serve = [
3536
"riffq>=0.1.0",
@@ -43,6 +44,10 @@ bigquery = [
4344
"google-cloud-bigquery>=3.0.0",
4445
"pyarrow>=14.0.0", # For Arrow support
4546
]
47+
snowflake = [
48+
"snowflake-connector-python>=3.0.0",
49+
"pyarrow>=14.0.0", # For Arrow support
50+
]
4651

4752
[build-system]
4853
requires = ["hatchling"]

sidemantic/core/semantic_layer.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def __init__(
3232
- duckdb:///path/to/db.duckdb
3333
- postgres://user:pass@host:port/dbname
3434
- bigquery://project_id/dataset_id
35+
- snowflake://user:password@account/database/schema
3536
dialect: SQL dialect for query generation (optional, inferred from adapter)
3637
auto_register: Set as current layer for auto-registration (default: True)
3738
use_preaggregations: Enable automatic pre-aggregation routing (default: False)
@@ -64,10 +65,15 @@ def __init__(
6465

6566
self.adapter = BigQueryAdapter.from_url(connection)
6667
self.dialect = dialect or "bigquery"
68+
elif connection.startswith("snowflake://"):
69+
from sidemantic.db.snowflake import SnowflakeAdapter
70+
71+
self.adapter = SnowflakeAdapter.from_url(connection)
72+
self.dialect = dialect or "snowflake"
6773
else:
6874
raise ValueError(
6975
f"Unsupported connection URL: {connection}. "
70-
"Supported: duckdb:///, postgres://, bigquery://, or BaseDatabaseAdapter instance"
76+
"Supported: duckdb:///, postgres://, bigquery://, snowflake://, or BaseDatabaseAdapter instance"
7177
)
7278
else:
7379
raise TypeError(f"connection must be a string URL or BaseDatabaseAdapter instance, got {type(connection)}")

sidemantic/core/symmetric_aggregate.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def build_symmetric_aggregate_sql(
2626
primary_key: The primary key field to use for deduplication
2727
agg_type: Type of aggregation (sum, avg, count, count_distinct)
2828
model_alias: Optional table/CTE alias to prefix columns
29-
dialect: SQL dialect (duckdb, bigquery, postgres)
29+
dialect: SQL dialect (duckdb, bigquery, postgres, snowflake)
3030
3131
Returns:
3232
SQL expression using symmetric aggregates
@@ -56,6 +56,13 @@ def hash_func(col):
5656
return f"hashtext({col}::text)::bigint"
5757

5858
multiplier = "1024" # 2^10 as literal (smaller to avoid overflow)
59+
elif dialect == "snowflake":
60+
# Snowflake HASH returns very large 64-bit integers
61+
# Use modulo to constrain range, then very small multiplier to avoid overflow
62+
def hash_func(col):
63+
return f"(HASH({col}) % 1000000000)" # Modulo to constrain range
64+
65+
multiplier = "100" # Very small multiplier to avoid overflow
5966
else: # duckdb
6067

6168
def hash_func(col):

sidemantic/db/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,12 @@ def __getattr__(name):
1515
from sidemantic.db.postgres import PostgreSQLAdapter
1616

1717
return PostgreSQLAdapter
18+
if name == "BigQueryAdapter":
19+
from sidemantic.db.bigquery import BigQueryAdapter
20+
21+
return BigQueryAdapter
22+
if name == "SnowflakeAdapter":
23+
from sidemantic.db.snowflake import SnowflakeAdapter
24+
25+
return SnowflakeAdapter
1826
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")

sidemantic/db/snowflake.py

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
"""Snowflake database adapter."""
2+
3+
from typing import Any
4+
from urllib.parse import parse_qs, unquote, urlparse
5+
6+
from sidemantic.db.base import BaseDatabaseAdapter
7+
8+
9+
class SnowflakeResult:
10+
"""Wrapper for Snowflake cursor to match DuckDB result API."""
11+
12+
def __init__(self, cursor):
13+
"""Initialize Snowflake result wrapper.
14+
15+
Args:
16+
cursor: Snowflake cursor object
17+
"""
18+
self.cursor = cursor
19+
self._description = cursor.description
20+
21+
def fetchone(self) -> tuple | None:
22+
"""Fetch one row from the result."""
23+
return self.cursor.fetchone()
24+
25+
def fetchall(self) -> list[tuple]:
26+
"""Fetch all remaining rows."""
27+
return self.cursor.fetchall()
28+
29+
def fetch_record_batch(self) -> Any:
30+
"""Convert result to PyArrow RecordBatchReader."""
31+
import pyarrow as pa
32+
33+
# Fetch all rows and convert to Arrow
34+
rows = self.cursor.fetchall()
35+
if not rows:
36+
# Empty result
37+
schema = pa.schema([(desc[0], pa.string()) for desc in self._description])
38+
return pa.RecordBatchReader.from_batches(schema, [])
39+
40+
# Build Arrow table from rows
41+
columns = {desc[0]: [row[i] for row in rows] for i, desc in enumerate(self._description)}
42+
table = pa.table(columns)
43+
return pa.RecordBatchReader.from_batches(table.schema, table.to_batches())
44+
45+
@property
46+
def description(self):
47+
"""Get column descriptions."""
48+
return self._description
49+
50+
51+
class SnowflakeAdapter(BaseDatabaseAdapter):
52+
"""Snowflake database adapter.
53+
54+
Example:
55+
>>> adapter = SnowflakeAdapter(
56+
... account="myaccount",
57+
... user="myuser",
58+
... password="mypass",
59+
... database="mydb",
60+
... schema="myschema"
61+
... )
62+
>>> result = adapter.execute("SELECT * FROM table")
63+
"""
64+
65+
def __init__(
66+
self,
67+
account: str | None = None,
68+
user: str | None = None,
69+
password: str | None = None,
70+
database: str | None = None,
71+
schema: str | None = None,
72+
warehouse: str | None = None,
73+
role: str | None = None,
74+
**kwargs,
75+
):
76+
"""Initialize Snowflake adapter.
77+
78+
Args:
79+
account: Snowflake account identifier
80+
user: Username
81+
password: Password
82+
database: Database name
83+
schema: Schema name
84+
warehouse: Warehouse name
85+
role: Role name
86+
**kwargs: Additional arguments passed to snowflake.connector.connect
87+
"""
88+
try:
89+
import snowflake.connector
90+
except ImportError as e:
91+
raise ImportError(
92+
"Snowflake support requires snowflake-connector-python. "
93+
"Install with: pip install sidemantic[snowflake] or pip install snowflake-connector-python"
94+
) from e
95+
96+
# Build connection params
97+
conn_params = {}
98+
if account:
99+
conn_params["account"] = account
100+
if user:
101+
conn_params["user"] = user
102+
if password:
103+
conn_params["password"] = password
104+
if database:
105+
conn_params["database"] = database
106+
if schema:
107+
conn_params["schema"] = schema
108+
if warehouse:
109+
conn_params["warehouse"] = warehouse
110+
if role:
111+
conn_params["role"] = role
112+
113+
# Merge with additional kwargs
114+
conn_params.update(kwargs)
115+
116+
self.conn = snowflake.connector.connect(**conn_params)
117+
self.database = database
118+
self.schema = schema
119+
120+
def execute(self, sql: str) -> SnowflakeResult:
121+
"""Execute SQL query."""
122+
cursor = self.conn.cursor()
123+
cursor.execute(sql)
124+
return SnowflakeResult(cursor)
125+
126+
def executemany(self, sql: str, params: list) -> SnowflakeResult:
127+
"""Execute SQL with multiple parameter sets."""
128+
cursor = self.conn.cursor()
129+
cursor.executemany(sql, params)
130+
return SnowflakeResult(cursor)
131+
132+
def fetchone(self, result: SnowflakeResult) -> tuple | None:
133+
"""Fetch one row from result."""
134+
return result.fetchone()
135+
136+
def fetch_record_batch(self, result: SnowflakeResult) -> Any:
137+
"""Fetch result as PyArrow RecordBatchReader."""
138+
return result.fetch_record_batch()
139+
140+
def get_tables(self) -> list[dict]:
141+
"""List all tables in the database/schema."""
142+
if self.schema:
143+
sql = f"""
144+
SELECT table_name, table_schema as schema
145+
FROM information_schema.tables
146+
WHERE table_schema = '{self.schema}'
147+
AND table_type = 'BASE TABLE'
148+
"""
149+
elif self.database:
150+
sql = """
151+
SELECT table_name, table_schema as schema
152+
FROM information_schema.tables
153+
WHERE table_type = 'BASE TABLE'
154+
"""
155+
else:
156+
sql = """
157+
SELECT table_name, table_schema as schema
158+
FROM information_schema.tables
159+
WHERE table_type = 'BASE TABLE'
160+
"""
161+
162+
result = self.execute(sql)
163+
rows = result.fetchall()
164+
return [{"table_name": row[0], "schema": row[1]} for row in rows]
165+
166+
def get_columns(self, table_name: str, schema: str | None = None) -> list[dict]:
167+
"""Get column information for a table."""
168+
schema = schema or self.schema
169+
schema_filter = f"AND table_schema = '{schema}'" if schema else ""
170+
171+
sql = f"""
172+
SELECT column_name, data_type
173+
FROM information_schema.columns
174+
WHERE table_name = '{table_name}' {schema_filter}
175+
"""
176+
result = self.execute(sql)
177+
rows = result.fetchall()
178+
return [{"column_name": row[0], "data_type": row[1]} for row in rows]
179+
180+
def close(self) -> None:
181+
"""Close the Snowflake connection."""
182+
self.conn.close()
183+
184+
@property
185+
def dialect(self) -> str:
186+
"""Return SQL dialect."""
187+
return "snowflake"
188+
189+
@property
190+
def raw_connection(self) -> Any:
191+
"""Return raw Snowflake connection."""
192+
return self.conn
193+
194+
@classmethod
195+
def from_url(cls, url: str) -> "SnowflakeAdapter":
196+
"""Create adapter from connection URL.
197+
198+
URL format: snowflake://user:password@account/database/schema?warehouse=wh&role=myrole
199+
Minimal: snowflake://user:password@account
200+
201+
Args:
202+
url: Connection URL
203+
204+
Returns:
205+
SnowflakeAdapter instance
206+
"""
207+
if not url.startswith("snowflake://"):
208+
raise ValueError(f"Invalid Snowflake URL: {url}")
209+
210+
parsed = urlparse(url)
211+
212+
# Parse path: /database/schema
213+
path_parts = [p for p in parsed.path.split("/") if p]
214+
database = path_parts[0] if len(path_parts) > 0 else None
215+
schema = path_parts[1] if len(path_parts) > 1 else None
216+
217+
# Parse query parameters
218+
params = {}
219+
if parsed.query:
220+
params = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed.query).items()}
221+
222+
return cls(
223+
account=parsed.hostname,
224+
user=unquote(parsed.username) if parsed.username else None,
225+
password=unquote(parsed.password) if parsed.password else None,
226+
database=database,
227+
schema=schema,
228+
warehouse=params.pop("warehouse", None),
229+
role=params.pop("role", None),
230+
**params,
231+
)

0 commit comments

Comments
 (0)