Skip to content

Commit 51a8ef9

Browse files
committed
Add Spark SQL adapter with PyHive and Docker Thrift server
- Add SparkAdapter using PyHive for Spark Thrift server connections - Add Spark to docker-compose (apache/spark:3.5.0 with Thrift server) - Add 11 Spark integration tests (10 passing, 1 skipped) - Update SemanticLayer to recognize spark:// URLs - Add Spark to symmetric aggregation (shares xxhash64 with Databricks) - Add PyHive with pure-sasl to optional dependencies (no C deps needed) - Databricks adapter remains separate for real Databricks SQL warehouses - Tests at parity with other databases: basic metrics, dimensions, joins, filters, ORDER BY, LIMIT, symmetric aggregates, 3-way joins
1 parent 43eaee6 commit 51a8ef9

8 files changed

Lines changed: 670 additions & 282 deletions

File tree

docker-compose.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,26 @@ services:
2323
command: ["--project=test-project", "--dataset=test_dataset"]
2424

2525
# Snowflake tests use fakesnow (Python library) for mocking, no Docker service needed
26+
# Databricks tests require real Databricks workspace - no emulator available
27+
28+
spark:
29+
image: apache/spark:3.5.0
30+
platform: linux/amd64
31+
ports:
32+
- "10000:10000" # Thrift server
33+
- "4040:4040" # Spark UI
34+
environment:
35+
SPARK_NO_DAEMONIZE: "true"
36+
command: >
37+
/bin/bash -c "
38+
/opt/spark/sbin/start-thriftserver.sh &&
39+
tail -f /opt/spark/logs/*
40+
"
41+
healthcheck:
42+
test: ["CMD", "nc", "-z", "localhost", "10000"]
43+
interval: 5s
44+
timeout: 5s
45+
retries: 20
2646

2747
clickhouse:
2848
image: clickhouse/clickhouse-server:latest

pyproject.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ databricks = [
5656
"databricks-sql-connector>=2.0.0",
5757
"pyarrow>=14.0.0", # For Arrow support
5858
]
59+
spark = [
60+
"PyHive>=0.7.0",
61+
"thrift>=0.16.0",
62+
"thrift_sasl>=0.4.3",
63+
"pure-sasl>=0.6.2", # Pure Python SASL for PyHive (no C deps)
64+
"pyarrow>=14.0.0", # For Arrow support
65+
]
5966

6067
[build-system]
6168
requires = ["hatchling"]

sidemantic/core/semantic_layer.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def __init__(
3535
- snowflake://user:password@account/database/schema
3636
- clickhouse://user:password@host:port/database
3737
- databricks://token@server-hostname/http-path
38+
- spark://host:port/database
3839
dialect: SQL dialect for query generation (optional, inferred from adapter)
3940
auto_register: Set as current layer for auto-registration (default: True)
4041
use_preaggregations: Enable automatic pre-aggregation routing (default: False)
@@ -82,10 +83,15 @@ def __init__(
8283

8384
self.adapter = DatabricksAdapter.from_url(connection)
8485
self.dialect = dialect or "databricks"
86+
elif connection.startswith("spark://"):
87+
from sidemantic.db.spark import SparkAdapter
88+
89+
self.adapter = SparkAdapter.from_url(connection)
90+
self.dialect = dialect or "spark"
8591
else:
8692
raise ValueError(
8793
f"Unsupported connection URL: {connection}. "
88-
"Supported: duckdb:///, postgres://, bigquery://, snowflake://, clickhouse://, databricks://, or BaseDatabaseAdapter instance"
94+
"Supported: duckdb:///, postgres://, bigquery://, snowflake://, clickhouse://, databricks://, spark://, or BaseDatabaseAdapter instance"
8995
)
9096
else:
9197
raise TypeError(f"connection must be a string URL or BaseDatabaseAdapter instance, got {type(connection)}")

sidemantic/core/symmetric_aggregate.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def build_symmetric_aggregate_sql(
2626
primary_key: The primary key field to use for deduplication
2727
agg_type: Type of aggregation (sum, avg, count, count_distinct)
2828
model_alias: Optional table/CTE alias to prefix columns
29-
dialect: SQL dialect (duckdb, bigquery, postgres, snowflake, clickhouse, databricks)
29+
dialect: SQL dialect (duckdb, bigquery, postgres, snowflake, clickhouse, databricks, spark)
3030
3131
Returns:
3232
SQL expression using symmetric aggregates
@@ -69,7 +69,7 @@ def hash_func(col):
6969
return f"halfMD5(CAST({col} AS String))"
7070

7171
multiplier = "1048576" # 2^20 as literal
72-
elif dialect == "databricks":
72+
elif dialect in ("databricks", "spark"):
7373
# Databricks/Spark SQL xxhash64 returns bigint
7474
def hash_func(col):
7575
return f"xxhash64(CAST({col} AS STRING))"

sidemantic/db/spark.py

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
"""Spark SQL database adapter using PyHive."""
2+
3+
from typing import Any
4+
from urllib.parse import parse_qs, unquote, urlparse
5+
6+
from sidemantic.db.base import BaseDatabaseAdapter
7+
8+
9+
class SparkResult:
10+
"""Wrapper for PyHive cursor to match DuckDB result API."""
11+
12+
def __init__(self, cursor):
13+
"""Initialize Spark result wrapper.
14+
15+
Args:
16+
cursor: PyHive cursor object
17+
"""
18+
self.cursor = cursor
19+
self._description = cursor.description
20+
21+
def fetchone(self) -> tuple | None:
22+
"""Fetch one row from the result."""
23+
return self.cursor.fetchone()
24+
25+
def fetchall(self) -> list[tuple]:
26+
"""Fetch all remaining rows."""
27+
return self.cursor.fetchall()
28+
29+
def fetch_record_batch(self) -> Any:
30+
"""Convert result to PyArrow RecordBatchReader."""
31+
import pyarrow as pa
32+
33+
rows = self.cursor.fetchall()
34+
if not rows:
35+
# Empty result
36+
schema = pa.schema([(desc[0], pa.string()) for desc in self._description])
37+
return pa.RecordBatchReader.from_batches(schema, [])
38+
39+
# Build Arrow table from rows
40+
columns = {desc[0]: [row[i] for row in rows] for i, desc in enumerate(self._description)}
41+
table = pa.table(columns)
42+
return pa.RecordBatchReader.from_batches(table.schema, table.to_batches())
43+
44+
@property
45+
def description(self):
46+
"""Get column descriptions."""
47+
return self._description
48+
49+
50+
class SparkAdapter(BaseDatabaseAdapter):
51+
"""Spark SQL database adapter using PyHive for Thrift server connections.
52+
53+
Example:
54+
>>> adapter = SparkAdapter(
55+
... host="localhost",
56+
... port=10000,
57+
... database="default"
58+
... )
59+
>>> result = adapter.execute("SELECT * FROM table")
60+
"""
61+
62+
def __init__(
63+
self,
64+
host: str = "localhost",
65+
port: int = 10000,
66+
database: str = "default",
67+
username: str | None = None,
68+
**kwargs,
69+
):
70+
"""Initialize Spark adapter.
71+
72+
Args:
73+
host: Spark Thrift server hostname
74+
port: Thrift server port (default: 10000)
75+
database: Database name (default: "default")
76+
username: Username (optional)
77+
**kwargs: Additional arguments passed to pyhive.hive.connect
78+
"""
79+
try:
80+
from pyhive import hive
81+
except ImportError as e:
82+
raise ImportError(
83+
"Spark support requires PyHive. "
84+
"Install with: pip install sidemantic[spark] or pip install 'PyHive[hive]'"
85+
) from e
86+
87+
# Build connection params
88+
conn_params = {
89+
"host": host,
90+
"port": port,
91+
"database": database,
92+
}
93+
94+
if username:
95+
conn_params["username"] = username
96+
97+
# Merge with additional kwargs
98+
conn_params.update(kwargs)
99+
100+
self.conn = hive.connect(**conn_params)
101+
self.database = database
102+
103+
def execute(self, sql: str) -> SparkResult:
104+
"""Execute SQL query."""
105+
cursor = self.conn.cursor()
106+
cursor.execute(sql)
107+
return SparkResult(cursor)
108+
109+
def executemany(self, sql: str, params: list) -> SparkResult:
110+
"""Execute SQL with multiple parameter sets."""
111+
cursor = self.conn.cursor()
112+
cursor.executemany(sql, params)
113+
return SparkResult(cursor)
114+
115+
def fetchone(self, result: SparkResult) -> tuple | None:
116+
"""Fetch one row from result."""
117+
return result.fetchone()
118+
119+
def fetch_record_batch(self, result: SparkResult) -> Any:
120+
"""Fetch result as PyArrow RecordBatchReader."""
121+
return result.fetch_record_batch()
122+
123+
def get_tables(self) -> list[dict]:
124+
"""List all tables in the database."""
125+
sql = f"SHOW TABLES IN {self.database}"
126+
result = self.execute(sql)
127+
rows = result.fetchall()
128+
return [{"table_name": row[1], "schema": row[0]} for row in rows]
129+
130+
def get_columns(self, table_name: str, schema: str | None = None) -> list[dict]:
131+
"""Get column information for a table."""
132+
schema = schema or self.database
133+
table_ref = f"{schema}.{table_name}" if schema else table_name
134+
135+
sql = f"DESCRIBE {table_ref}"
136+
result = self.execute(sql)
137+
rows = result.fetchall()
138+
return [{"column_name": row[0], "data_type": row[1]} for row in rows]
139+
140+
def close(self) -> None:
141+
"""Close the Spark connection."""
142+
self.conn.close()
143+
144+
@property
145+
def dialect(self) -> str:
146+
"""Return SQL dialect."""
147+
return "spark"
148+
149+
@property
150+
def raw_connection(self) -> Any:
151+
"""Return raw PyHive connection."""
152+
return self.conn
153+
154+
@classmethod
155+
def from_url(cls, url: str) -> "SparkAdapter":
156+
"""Create adapter from connection URL.
157+
158+
URL format: spark://host:port/database
159+
Example: spark://localhost:10000/default
160+
161+
Args:
162+
url: Connection URL
163+
164+
Returns:
165+
SparkAdapter instance
166+
"""
167+
if not url.startswith("spark://"):
168+
raise ValueError(f"Invalid Spark URL: {url}")
169+
170+
parsed = urlparse(url)
171+
172+
# Parse hostname and port
173+
host = parsed.hostname or "localhost"
174+
port = parsed.port or 10000
175+
176+
# Parse database from path
177+
database = parsed.path.lstrip("/") if parsed.path else "default"
178+
179+
# Parse username
180+
username = unquote(parsed.username) if parsed.username else None
181+
182+
# Parse query parameters
183+
params = {}
184+
if parsed.query:
185+
params = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed.query).items()}
186+
187+
return cls(
188+
host=host,
189+
port=port,
190+
database=database,
191+
username=username,
192+
**params,
193+
)

0 commit comments

Comments
 (0)