Skip to content

Commit 7f68fdb

Browse files
committed
Add ClickHouse adapter with Docker integration tests
- Add ClickHouseAdapter with connection URL support (clickhouse://) - Add ClickHouse to symmetric aggregation using halfMD5 hash function - Add 11 integration tests using official ClickHouse Docker image (10 passing, 1 skipped) - Update SemanticLayer to recognize clickhouse:// URLs - Add clickhouse-connect to optional dependencies - Add ClickHouse CI job with service container - Add ClickHouse to docker-compose with password authentication - Tests at parity with BigQuery/Postgres/Snowflake: basic metrics, dimensions, joins, filters, ORDER BY, LIMIT, symmetric aggregates, 3-way joins
1 parent 1614973 commit 7f68fdb

9 files changed

Lines changed: 830 additions & 2 deletions

File tree

.github/workflows/integration.yml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,39 @@ jobs:
101101
env:
102102
SNOWFLAKE_TEST: "1"
103103
run: uv run pytest -m integration tests/db/test_snowflake_integration.py -v
104+
105+
clickhouse-integration:
106+
runs-on: ubuntu-latest
107+
108+
services:
109+
clickhouse:
110+
image: clickhouse/clickhouse-server:latest
111+
ports:
112+
- 8123:8123
113+
options: >-
114+
--health-cmd "wget --spider -q localhost:8123/ping"
115+
--health-interval 10s
116+
--health-timeout 5s
117+
--health-retries 5
118+
119+
steps:
120+
- uses: actions/checkout@v4
121+
122+
- name: Install uv
123+
uses: astral-sh/setup-uv@v5
124+
with:
125+
enable-cache: true
126+
127+
- name: Set up Python
128+
run: uv python install 3.12
129+
130+
- name: Install dependencies
131+
run: uv sync --extra clickhouse --extra dev
132+
133+
- name: Run ClickHouse integration tests
134+
env:
135+
CLICKHOUSE_TEST: "1"
136+
CLICKHOUSE_HOST: "localhost"
137+
CLICKHOUSE_PORT: "8123"
138+
CLICKHOUSE_PASSWORD: "clickhouse"
139+
run: uv run pytest -m integration tests/db/test_clickhouse_integration.py -v

docker-compose.yml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,27 @@ services:
2424

2525
# Snowflake tests use fakesnow (Python library) for mocking, no Docker service needed
2626

27+
clickhouse:
28+
image: clickhouse/clickhouse-server:latest
29+
platform: linux/amd64
30+
ports:
31+
- "8123:8123" # HTTP interface
32+
- "9000:9000" # Native protocol (optional)
33+
environment:
34+
CLICKHOUSE_DB: default
35+
CLICKHOUSE_USER: default
36+
CLICKHOUSE_PASSWORD: clickhouse
37+
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
38+
ulimits:
39+
nofile:
40+
soft: 262144
41+
hard: 262144
42+
healthcheck:
43+
test: ["CMD", "wget", "--spider", "-q", "localhost:8123/ping"]
44+
interval: 2s
45+
timeout: 5s
46+
retries: 10
47+
2748
test:
2849
build:
2950
context: .
@@ -33,6 +54,8 @@ services:
3354
condition: service_healthy
3455
bigquery:
3556
condition: service_started
57+
clickhouse:
58+
condition: service_healthy
3659
environment:
3760
POSTGRES_TEST: "1"
3861
POSTGRES_URL: "postgres://test:test@postgres:5432/sidemantic_test"
@@ -41,6 +64,10 @@ services:
4164
BIGQUERY_PROJECT: "test-project"
4265
BIGQUERY_DATASET: "test_dataset"
4366
SNOWFLAKE_TEST: "1" # fakesnow patches snowflake.connector, no external service needed
67+
CLICKHOUSE_TEST: "1"
68+
CLICKHOUSE_HOST: "clickhouse"
69+
CLICKHOUSE_PORT: "8123"
70+
CLICKHOUSE_PASSWORD: "clickhouse"
4471
command: pytest -m integration -v
4572

4673
volumes:

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ snowflake = [
4848
"snowflake-connector-python>=3.0.0",
4949
"pyarrow>=14.0.0", # For Arrow support
5050
]
51+
clickhouse = [
52+
"clickhouse-connect>=0.6.0",
53+
"pyarrow>=14.0.0", # For Arrow support
54+
]
5155

5256
[build-system]
5357
requires = ["hatchling"]

sidemantic/core/semantic_layer.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def __init__(
3333
- postgres://user:pass@host:port/dbname
3434
- bigquery://project_id/dataset_id
3535
- snowflake://user:password@account/database/schema
36+
- clickhouse://user:password@host:port/database
3637
dialect: SQL dialect for query generation (optional, inferred from adapter)
3738
auto_register: Set as current layer for auto-registration (default: True)
3839
use_preaggregations: Enable automatic pre-aggregation routing (default: False)
@@ -70,10 +71,15 @@ def __init__(
7071

7172
self.adapter = SnowflakeAdapter.from_url(connection)
7273
self.dialect = dialect or "snowflake"
74+
elif connection.startswith("clickhouse://"):
75+
from sidemantic.db.clickhouse import ClickHouseAdapter
76+
77+
self.adapter = ClickHouseAdapter.from_url(connection)
78+
self.dialect = dialect or "clickhouse"
7379
else:
7480
raise ValueError(
7581
f"Unsupported connection URL: {connection}. "
76-
"Supported: duckdb:///, postgres://, bigquery://, snowflake://, or BaseDatabaseAdapter instance"
82+
"Supported: duckdb:///, postgres://, bigquery://, snowflake://, clickhouse://, or BaseDatabaseAdapter instance"
7783
)
7884
else:
7985
raise TypeError(f"connection must be a string URL or BaseDatabaseAdapter instance, got {type(connection)}")

sidemantic/core/symmetric_aggregate.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def build_symmetric_aggregate_sql(
2626
primary_key: The primary key field to use for deduplication
2727
agg_type: Type of aggregation (sum, avg, count, count_distinct)
2828
model_alias: Optional table/CTE alias to prefix columns
29-
dialect: SQL dialect (duckdb, bigquery, postgres, snowflake)
29+
dialect: SQL dialect (duckdb, bigquery, postgres, snowflake, clickhouse)
3030
3131
Returns:
3232
SQL expression using symmetric aggregates
@@ -63,6 +63,12 @@ def hash_func(col):
6363
return f"(HASH({col}) % 1000000000)" # Modulo to constrain range
6464

6565
multiplier = "100" # Very small multiplier to avoid overflow
66+
elif dialect == "clickhouse":
67+
# ClickHouse halfMD5 returns UInt64
68+
def hash_func(col):
69+
return f"halfMD5(CAST({col} AS String))"
70+
71+
multiplier = "1048576" # 2^20 as literal
6672
else: # duckdb
6773

6874
def hash_func(col):

sidemantic/db/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,8 @@ def __getattr__(name):
2323
from sidemantic.db.snowflake import SnowflakeAdapter
2424

2525
return SnowflakeAdapter
26+
if name == "ClickHouseAdapter":
27+
from sidemantic.db.clickhouse import ClickHouseAdapter
28+
29+
return ClickHouseAdapter
2630
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")

sidemantic/db/clickhouse.py

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
"""ClickHouse database adapter."""
2+
3+
from typing import Any
4+
from urllib.parse import parse_qs, unquote, urlparse
5+
6+
from sidemantic.db.base import BaseDatabaseAdapter
7+
8+
9+
class ClickHouseResult:
10+
"""Wrapper for ClickHouse query result to match DuckDB result API."""
11+
12+
def __init__(self, result):
13+
"""Initialize ClickHouse result wrapper.
14+
15+
Args:
16+
result: ClickHouse query result from clickhouse-connect
17+
"""
18+
self._result = result
19+
self._row_index = 0
20+
21+
def fetchone(self) -> tuple | None:
22+
"""Fetch one row from the result."""
23+
if self._row_index >= self._result.row_count:
24+
return None
25+
row = self._result.result_rows[self._row_index]
26+
self._row_index += 1
27+
return row
28+
29+
def fetchall(self) -> list[tuple]:
30+
"""Fetch all remaining rows."""
31+
remaining = self._result.result_rows[self._row_index :]
32+
self._row_index = self._result.row_count
33+
return remaining
34+
35+
def fetch_record_batch(self) -> Any:
36+
"""Convert result to PyArrow RecordBatchReader."""
37+
import pyarrow as pa
38+
39+
# Convert ClickHouse result to Arrow
40+
rows = self._result.result_rows
41+
if not rows:
42+
# Empty result
43+
schema = pa.schema([(name, pa.string()) for name in self._result.column_names])
44+
return pa.RecordBatchReader.from_batches(schema, [])
45+
46+
# Build Arrow table from rows
47+
columns = {name: [row[i] for row in rows] for i, name in enumerate(self._result.column_names)}
48+
table = pa.table(columns)
49+
return pa.RecordBatchReader.from_batches(table.schema, table.to_batches())
50+
51+
@property
52+
def description(self):
53+
"""Get column descriptions."""
54+
return [(name, None) for name in self._result.column_names]
55+
56+
57+
class ClickHouseAdapter(BaseDatabaseAdapter):
58+
"""ClickHouse database adapter.
59+
60+
Example:
61+
>>> adapter = ClickHouseAdapter(
62+
... host="localhost",
63+
... port=8123,
64+
... database="default",
65+
... user="default",
66+
... password=""
67+
... )
68+
>>> result = adapter.execute("SELECT * FROM table")
69+
"""
70+
71+
def __init__(
72+
self,
73+
host: str = "localhost",
74+
port: int = 8123,
75+
database: str = "default",
76+
user: str | None = None,
77+
password: str | None = None,
78+
secure: bool = False,
79+
**kwargs,
80+
):
81+
"""Initialize ClickHouse adapter.
82+
83+
Args:
84+
host: ClickHouse host
85+
port: ClickHouse HTTP port (default: 8123)
86+
database: Database name
87+
user: Username
88+
password: Password
89+
secure: Use HTTPS instead of HTTP
90+
**kwargs: Additional arguments passed to clickhouse_connect.get_client
91+
"""
92+
try:
93+
import clickhouse_connect
94+
except ImportError as e:
95+
raise ImportError(
96+
"ClickHouse support requires clickhouse-connect. "
97+
"Install with: pip install sidemantic[clickhouse] or pip install clickhouse-connect"
98+
) from e
99+
100+
# Build connection params
101+
self.client = clickhouse_connect.get_client(
102+
host=host,
103+
port=port,
104+
database=database,
105+
username=user,
106+
password=password,
107+
secure=secure,
108+
**kwargs,
109+
)
110+
self.database = database
111+
112+
def execute(self, sql: str) -> ClickHouseResult:
113+
"""Execute SQL query."""
114+
result = self.client.query(sql)
115+
return ClickHouseResult(result)
116+
117+
def executemany(self, sql: str, params: list) -> ClickHouseResult:
118+
"""Execute SQL with multiple parameter sets.
119+
120+
Note: ClickHouse doesn't have native executemany, so we run queries sequentially.
121+
"""
122+
results = []
123+
for param_set in params:
124+
result = self.client.query(sql, parameters=param_set)
125+
results.append(ClickHouseResult(result))
126+
# Return last result for compatibility
127+
return results[-1] if results else ClickHouseResult(self.client.query("SELECT 1"))
128+
129+
def fetchone(self, result: ClickHouseResult) -> tuple | None:
130+
"""Fetch one row from result."""
131+
return result.fetchone()
132+
133+
def fetch_record_batch(self, result: ClickHouseResult) -> Any:
134+
"""Fetch result as PyArrow RecordBatchReader."""
135+
return result.fetch_record_batch()
136+
137+
def get_tables(self) -> list[dict]:
138+
"""List all tables in the database."""
139+
sql = """
140+
SELECT name as table_name, database as schema
141+
FROM system.tables
142+
WHERE database = %(database)s
143+
AND engine NOT LIKE '%View%'
144+
"""
145+
result = self.client.query(sql, parameters={"database": self.database})
146+
return [{"table_name": row[0], "schema": row[1]} for row in result.result_rows]
147+
148+
def get_columns(self, table_name: str, schema: str | None = None) -> list[dict]:
149+
"""Get column information for a table."""
150+
schema = schema or self.database
151+
152+
sql = """
153+
SELECT name as column_name, type as data_type
154+
FROM system.columns
155+
WHERE database = %(schema)s
156+
AND table = %(table)s
157+
"""
158+
result = self.client.query(sql, parameters={"schema": schema, "table": table_name})
159+
return [{"column_name": row[0], "data_type": row[1]} for row in result.result_rows]
160+
161+
def close(self) -> None:
162+
"""Close the ClickHouse client."""
163+
self.client.close()
164+
165+
@property
166+
def dialect(self) -> str:
167+
"""Return SQL dialect."""
168+
return "clickhouse"
169+
170+
@property
171+
def raw_connection(self) -> Any:
172+
"""Return raw ClickHouse client."""
173+
return self.client
174+
175+
@classmethod
176+
def from_url(cls, url: str) -> "ClickHouseAdapter":
177+
"""Create adapter from connection URL.
178+
179+
URL format: clickhouse://user:password@host:port/database
180+
or: clickhouse://host/database (default user/password)
181+
182+
Args:
183+
url: Connection URL
184+
185+
Returns:
186+
ClickHouseAdapter instance
187+
"""
188+
if not url.startswith("clickhouse://"):
189+
raise ValueError(f"Invalid ClickHouse URL: {url}")
190+
191+
parsed = urlparse(url)
192+
193+
# Parse path: /database
194+
database = parsed.path.lstrip("/") if parsed.path else "default"
195+
196+
# Parse query parameters
197+
params = {}
198+
if parsed.query:
199+
params = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed.query).items()}
200+
201+
# Check for secure parameter
202+
secure = params.pop("secure", "false").lower() in ("true", "1", "yes")
203+
204+
return cls(
205+
host=parsed.hostname or "localhost",
206+
port=parsed.port or 8123,
207+
database=database,
208+
user=unquote(parsed.username) if parsed.username else "default",
209+
password=unquote(parsed.password) if parsed.password else "",
210+
secure=secure,
211+
**params,
212+
)

0 commit comments

Comments
 (0)