Skip to content

Commit 2186788

Browse files
feat: add Neo4j lineage exporter for dbt manifest metadata
- Add Neo4jLineageExporter class to export dbt nodes and dependencies to Neo4j - Supports models, sources, seeds and snapshots as DbtNode graph nodes - Creates FEEDS_INTO relationships for downstream impact analysis - Add Neo4jConfig dataclass with environment variable support - Add 8 unit tests, all passing - Add README with usage examples and Cypher query examples
1 parent edf8d50 commit 2186788

6 files changed

Lines changed: 348 additions & 0 deletions

File tree

README_neo4j.md

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# Elementary Neo4j Lineage Exporter
2+
3+
## Overview
4+
5+
This utility exports dbt lineage data from Elementary's dbt manifest into Neo4j
6+
as a property graph. It enables downstream impact analysis, root cause detection,
7+
and lineage visualization directly in Neo4j.
8+
9+
## Motivation
10+
11+
Elementary already captures rich lineage metadata via dbt artifacts. This exporter
12+
makes that lineage available in Neo4j, enabling graph traversal queries for:
13+
14+
- **Impact analysis** — which models are affected if a source schema changes?
15+
- **Root cause detection** — trace data quality issues upstream
16+
- **Lineage visualization** — explore your dbt DAG as a graph
17+
18+
## Graph Model
19+
20+
**Nodes** — each dbt model, source, seed, and snapshot becomes a `DbtNode`:
21+
- `unique_id` — dbt unique identifier (primary key)
22+
- `name` — model/source name
23+
- `resource_type` — model, source, seed, snapshot
24+
- `schema` — target schema in your warehouse
25+
- `database` — target database
26+
- `package_name` — dbt package
27+
- `description` — model documentation
28+
29+
**Relationships**`(upstream)-[:FEEDS_INTO]->(downstream)`
30+
31+
## Installation
32+
33+
```bash
34+
pip install neo4j
35+
```
36+
37+
## Usage
38+
39+
```python
40+
from elementary_neo4j.neo4j_config import Neo4jConfig
41+
from elementary_neo4j.neo4j_exporter import Neo4jLineageExporter
42+
43+
config = Neo4jConfig(
44+
uri="bolt://localhost:7687",
45+
username="neo4j",
46+
password="your-password"
47+
)
48+
49+
exporter = Neo4jLineageExporter(config)
50+
result = exporter.export("path/to/manifest.json")
51+
print(result)
52+
# {"nodes_exported": 42, "dependencies_exported": 67}
53+
exporter.close()
54+
```
55+
56+
## Environment Variables
57+
58+
```bash
59+
export NEO4J_URI=bolt://localhost:7687
60+
export NEO4J_USERNAME=neo4j
61+
export NEO4J_PASSWORD=your-password
62+
export NEO4J_DATABASE=neo4j
63+
```
64+
65+
Then use:
66+
```python
67+
config = Neo4jConfig.from_env()
68+
```
69+
70+
## Example Neo4j Query
71+
72+
Find all models impacted by a source change:
73+
```cypher
74+
MATCH (source:DbtNode {name: "raw_customers"})-[:FEEDS_INTO*]->(impacted)
75+
RETURN impacted.name, impacted.resource_type
76+
```
77+
78+
## Running Tests
79+
80+
```bash
81+
PYTHONPATH=. python -m pytest tests/test_neo4j_exporter.py -v --ignore=integration_tests
82+
```

elementary_neo4j/__init__.py

Whitespace-only changes.

elementary_neo4j/neo4j_config.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from dataclasses import dataclass
2+
from typing import Optional
3+
4+
5+
@dataclass
6+
class Neo4jConfig:
7+
uri: str
8+
username: str
9+
password: str
10+
database: Optional[str] = "neo4j"
11+
12+
@classmethod
13+
def from_env(cls) -> "Neo4jConfig":
14+
import os
15+
return cls(
16+
uri=os.environ.get("NEO4J_URI", "bolt://localhost:7687"),
17+
username=os.environ.get("NEO4J_USERNAME", "neo4j"),
18+
password=os.environ.get("NEO4J_PASSWORD", ""),
19+
database=os.environ.get("NEO4J_DATABASE", "neo4j"),
20+
)

elementary_neo4j/neo4j_exporter.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import json
2+
import logging
3+
from pathlib import Path
4+
from typing import Any, Dict, List, Optional
5+
6+
from neo4j import GraphDatabase
7+
8+
from elementary_neo4j.neo4j_config import Neo4jConfig
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class Neo4jLineageExporter:
14+
"""
15+
Exports dbt lineage from Elementary's dbt manifest into Neo4j.
16+
Creates nodes for models, sources, and columns, and
17+
relationships for dependencies between them.
18+
"""
19+
20+
def __init__(self, config: Neo4jConfig):
21+
self.config = config
22+
self.driver = GraphDatabase.driver(
23+
config.uri,
24+
auth=(config.username, config.password)
25+
)
26+
27+
def close(self):
28+
self.driver.close()
29+
30+
def load_manifest(self, manifest_path: str) -> Dict[str, Any]:
31+
"""Load dbt manifest.json from file path."""
32+
path = Path(manifest_path)
33+
if not path.exists():
34+
raise FileNotFoundError(f"Manifest not found: {manifest_path}")
35+
with open(path, "r") as f:
36+
return json.load(f)
37+
38+
def extract_nodes(self, manifest: Dict[str, Any]) -> List[Dict]:
39+
"""Extract model and source nodes from manifest."""
40+
nodes = []
41+
for unique_id, node in manifest.get("nodes", {}).items():
42+
if node.get("resource_type") in ("model", "seed", "snapshot"):
43+
nodes.append({
44+
"unique_id": unique_id,
45+
"name": node.get("name"),
46+
"resource_type": node.get("resource_type"),
47+
"schema": node.get("schema"),
48+
"database": node.get("database"),
49+
"package_name": node.get("package_name"),
50+
"description": node.get("description", ""),
51+
})
52+
for unique_id, source in manifest.get("sources", {}).items():
53+
nodes.append({
54+
"unique_id": unique_id,
55+
"name": source.get("name"),
56+
"resource_type": "source",
57+
"schema": source.get("schema"),
58+
"database": source.get("database"),
59+
"package_name": source.get("package_name"),
60+
"description": source.get("description", ""),
61+
})
62+
return nodes
63+
64+
def extract_dependencies(self, manifest: Dict[str, Any]) -> List[Dict]:
65+
"""Extract upstream dependencies between nodes."""
66+
dependencies = []
67+
for unique_id, node in manifest.get("nodes", {}).items():
68+
for upstream_id in node.get("depends_on", {}).get("nodes", []):
69+
dependencies.append({
70+
"from_id": upstream_id,
71+
"to_id": unique_id,
72+
})
73+
return dependencies
74+
75+
def export_nodes(self, nodes: List[Dict]):
76+
"""Write nodes to Neo4j."""
77+
with self.driver.session(database=self.config.database) as session:
78+
for node in nodes:
79+
session.run(
80+
"""
81+
MERGE (n:DbtNode {unique_id: $unique_id})
82+
SET n.name = $name,
83+
n.resource_type = $resource_type,
84+
n.schema = $schema,
85+
n.database = $database,
86+
n.package_name = $package_name,
87+
n.description = $description
88+
""",
89+
**node
90+
)
91+
logger.info(f"Exported {len(nodes)} nodes to Neo4j")
92+
93+
def export_dependencies(self, dependencies: List[Dict]):
94+
"""Write dependency relationships to Neo4j."""
95+
with self.driver.session(database=self.config.database) as session:
96+
for dep in dependencies:
97+
session.run(
98+
"""
99+
MATCH (a:DbtNode {unique_id: $from_id})
100+
MATCH (b:DbtNode {unique_id: $to_id})
101+
MERGE (a)-[:FEEDS_INTO]->(b)
102+
""",
103+
**dep
104+
)
105+
logger.info(f"Exported {len(dependencies)} dependencies to Neo4j")
106+
107+
def export(self, manifest_path: str):
108+
"""Full export pipeline — nodes + dependencies."""
109+
logger.info(f"Loading manifest from {manifest_path}")
110+
manifest = self.load_manifest(manifest_path)
111+
nodes = self.extract_nodes(manifest)
112+
dependencies = self.extract_dependencies(manifest)
113+
self.export_nodes(nodes)
114+
self.export_dependencies(dependencies)
115+
logger.info("Neo4j lineage export complete")
116+
return {
117+
"nodes_exported": len(nodes),
118+
"dependencies_exported": len(dependencies)
119+
}

tests/__init__.py

Whitespace-only changes.

tests/test_neo4j_exporter.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import json
2+
import pytest
3+
from unittest.mock import MagicMock, patch, mock_open
4+
from elementary_neo4j.neo4j_exporter import Neo4jLineageExporter
5+
from elementary_neo4j.neo4j_config import Neo4jConfig
6+
7+
8+
@pytest.fixture
9+
def config():
10+
return Neo4jConfig(
11+
uri="bolt://localhost:7687",
12+
username="neo4j",
13+
password="test",
14+
database="neo4j"
15+
)
16+
17+
18+
@pytest.fixture
19+
def exporter(config):
20+
with patch("elementary_neo4j.neo4j_exporter.GraphDatabase.driver"):
21+
return Neo4jLineageExporter(config)
22+
23+
24+
@pytest.fixture
25+
def sample_manifest():
26+
return {
27+
"nodes": {
28+
"model.my_project.dim_customers": {
29+
"name": "dim_customers",
30+
"resource_type": "model",
31+
"schema": "analytics",
32+
"database": "snowflake_db",
33+
"package_name": "my_project",
34+
"description": "Customer dimension table",
35+
"depends_on": {
36+
"nodes": ["source.my_project.raw_customers"]
37+
}
38+
},
39+
"model.my_project.fct_orders": {
40+
"name": "fct_orders",
41+
"resource_type": "model",
42+
"schema": "analytics",
43+
"database": "snowflake_db",
44+
"package_name": "my_project",
45+
"description": "Orders fact table",
46+
"depends_on": {
47+
"nodes": ["model.my_project.dim_customers"]
48+
}
49+
}
50+
},
51+
"sources": {
52+
"source.my_project.raw_customers": {
53+
"name": "raw_customers",
54+
"resource_type": "source",
55+
"schema": "raw",
56+
"database": "snowflake_db",
57+
"package_name": "my_project",
58+
"description": "Raw customers source"
59+
}
60+
}
61+
}
62+
63+
64+
def test_extract_nodes_returns_models_and_sources(exporter, sample_manifest):
65+
nodes = exporter.extract_nodes(sample_manifest)
66+
assert len(nodes) == 3
67+
resource_types = [n["resource_type"] for n in nodes]
68+
assert "model" in resource_types
69+
assert "source" in resource_types
70+
71+
72+
def test_extract_nodes_contains_correct_fields(exporter, sample_manifest):
73+
nodes = exporter.extract_nodes(sample_manifest)
74+
model_node = next(n for n in nodes if n["name"] == "dim_customers")
75+
assert model_node["schema"] == "analytics"
76+
assert model_node["database"] == "snowflake_db"
77+
assert model_node["description"] == "Customer dimension table"
78+
79+
80+
def test_extract_dependencies_correct_count(exporter, sample_manifest):
81+
dependencies = exporter.extract_dependencies(sample_manifest)
82+
assert len(dependencies) == 2
83+
84+
85+
def test_extract_dependencies_correct_direction(exporter, sample_manifest):
86+
dependencies = exporter.extract_dependencies(sample_manifest)
87+
dep = next(
88+
d for d in dependencies
89+
if d["to_id"] == "model.my_project.dim_customers"
90+
)
91+
assert dep["from_id"] == "source.my_project.raw_customers"
92+
93+
94+
def test_load_manifest_file_not_found(exporter):
95+
with pytest.raises(FileNotFoundError):
96+
exporter.load_manifest("nonexistent/path/manifest.json")
97+
98+
99+
def test_load_manifest_reads_correctly(exporter, sample_manifest):
100+
mock_data = json.dumps(sample_manifest)
101+
with patch("builtins.open", mock_open(read_data=mock_data)):
102+
with patch("pathlib.Path.exists", return_value=True):
103+
result = exporter.load_manifest("fake/manifest.json")
104+
assert "nodes" in result
105+
assert "sources" in result
106+
107+
108+
def test_export_nodes_calls_session(exporter, sample_manifest):
109+
nodes = exporter.extract_nodes(sample_manifest)
110+
mock_session = MagicMock()
111+
exporter.driver.session.return_value.__enter__ = MagicMock(
112+
return_value=mock_session
113+
)
114+
exporter.driver.session.return_value.__exit__ = MagicMock(
115+
return_value=False
116+
)
117+
exporter.export_nodes(nodes)
118+
assert mock_session.run.call_count == len(nodes)
119+
120+
121+
def test_export_returns_correct_counts(exporter, sample_manifest):
122+
with patch.object(exporter, "load_manifest", return_value=sample_manifest):
123+
with patch.object(exporter, "export_nodes"):
124+
with patch.object(exporter, "export_dependencies"):
125+
result = exporter.export("fake/manifest.json")
126+
assert result["nodes_exported"] == 3
127+
assert result["dependencies_exported"] == 2

0 commit comments

Comments
 (0)