Skip to content

Commit b436a1f

Browse files
committed
debezium/dbz#1810 Add automated E2E tests for the auditlog example.
Signed-off-by: Mohnish <kmohnishm@gmail.com>
1 parent a35c428 commit b436a1f

3 files changed

Lines changed: 118 additions & 21 deletions

File tree

.github/workflows/auditlog-workflow.yml

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,36 @@ on:
55
paths:
66
- 'auditlog/**'
77
- '.github/workflows/auditlog-workflow.yml'
8+
- 'scripts/run-example-test.py'
89
pull_request:
910
paths:
1011
- 'auditlog/**'
1112
- '.github/workflows/auditlog-workflow.yml'
13+
- 'scripts/run-example-test.py'
1214

1315
jobs:
14-
build:
16+
test:
1517
runs-on: ubuntu-latest
1618
steps:
17-
- uses: actions/checkout@v6
18-
- name: Set up JDK 8
19+
- uses: actions/checkout@v4
20+
21+
- name: Set up Java
1922
uses: actions/setup-java@v4
2023
with:
2124
java-version: '8'
2225
distribution: 'temurin'
23-
- name: Cache local Maven repository
24-
uses: actions/cache@v4
26+
cache: 'maven'
27+
28+
- name: Build auditlog services
29+
run: mvn clean package -DskipTests -f auditlog/pom.xml
30+
31+
- name: Set up Python
32+
uses: actions/setup-python@v5
2533
with:
26-
path: ~/.m2/repository
27-
key: ${{ runner.os }}-maven-${{ hashFiles('auditlog/**/pom.xml') }}
28-
restore-keys: |
29-
${{ runner.os }}-maven-
30-
- name: Check changes in [auditlog] example
31-
run: cd auditlog && mvn clean package -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
34+
python-version: '3.11'
35+
36+
- name: Install Python dependencies
37+
run: pip install pyyaml requests
38+
39+
- name: Run auditlog test
40+
run: python scripts/run-example-test.py auditlog

auditlog/test.yaml

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
name: auditlog
2+
description: Tests Debezium PostgreSQL connector with audit log enrichment via Kafka Streams
3+
env_file: ../.env
4+
compose_file: docker-compose.yaml
5+
6+
cleanup:
7+
type: docker_compose_down
8+
volumes: true
9+
10+
steps:
11+
- name: Start all services
12+
type: docker_compose_up
13+
detach: true
14+
15+
- name: Wait for Kafka Connect REST API to be ready
16+
type: http_wait
17+
url: http://localhost:8083/connectors
18+
timeout_seconds: 120
19+
interval_seconds: 5
20+
21+
- name: Register Debezium PostgreSQL connector
22+
type: http_put
23+
url: http://localhost:8083/connectors/inventory-connector/config
24+
body_file: register-postgres.json
25+
expected_status: [200, 201]
26+
27+
- name: Wait for connector to be RUNNING
28+
type: http_wait
29+
url: http://localhost:8083/connectors/inventory-connector/status
30+
timeout_seconds: 60
31+
interval_seconds: 5
32+
expected_json_path: connector.state
33+
expected_value: RUNNING
34+
35+
- name: Wait for vegetables-service to be ready
36+
type: http_wait
37+
url: http://localhost:8080/vegetables
38+
timeout_seconds: 120
39+
interval_seconds: 5
40+
expected_status: [200, 405]
41+
42+
- name: Create a new vegetable record (farmerbob)
43+
type: http_post
44+
url: http://localhost:8080/vegetables
45+
headers:
46+
Authorization: "Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q"
47+
Date: "Mon, 20 Apr 2026 12:00:00 GMT"
48+
body:
49+
name: "Cucumber"
50+
description: "Fresh and crunchy"
51+
expected_status: [200, 201, 204]
52+
53+
- name: Verify enriched message appears in Kafka
54+
type: kafka_consume
55+
topic: dbserver1.inventory.vegetable.enriched
56+
timeout_seconds: 60
57+
expected_content: "Cucumber"
58+
59+
- name: Verify metadata enrichment (user farmerbob)
60+
type: kafka_consume
61+
topic: dbserver1.inventory.vegetable.enriched
62+
timeout_seconds: 30
63+
expected_content: "farmerbob"

scripts/run-example-test.py

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,41 +90,64 @@ def step_docker_compose_exec(step, config):
9090
run_cmd(cmd)
9191

9292

93-
def step_http_put(step, config):
93+
def _http_request(method, step, config):
9494
url = step["url"]
95-
body_file = step["body_file"]
95+
body_file = step.get("body_file")
96+
body = step.get("body")
97+
headers = step.get("headers", {})
9698
expected_statuses = step.get("expected_status", [200, 201])
99+
97100
if isinstance(expected_statuses, int):
98101
expected_statuses = [expected_statuses]
99102

100-
with open(body_file) as f:
101-
body = json.load(f)
103+
if body_file:
104+
with open(body_file) as f:
105+
body = json.load(f)
102106

103-
print(f" PUT {url}")
104-
resp = requests.put(url, json=body, timeout=30)
107+
print(f" {method} {url}")
108+
resp = requests.request(method, url, json=body, headers=headers, timeout=30)
109+
105110
if resp.status_code not in expected_statuses:
106111
raise RuntimeError(
107-
f"PUT {url} returned {resp.status_code}, expected one of {expected_statuses}.\n"
112+
f"{method} {url} returned {resp.status_code}, expected one of {expected_statuses}.\n"
108113
f"Response: {resp.text}"
109114
)
110115
print(f" -> {resp.status_code}")
111116

112117

118+
def step_http_put(step, config):
119+
_http_request("PUT", step, config)
120+
121+
122+
def step_http_post(step, config):
123+
_http_request("POST", step, config)
124+
125+
113126
def step_http_wait(step, config):
114127
url = step["url"]
115128
timeout = step.get("timeout_seconds", 60)
116129
interval = step.get("interval_seconds", 5)
117130
json_path = step.get("expected_json_path")
118131
expected_value = step.get("expected_value")
132+
headers = step.get("headers", {})
133+
expected_statuses = step.get("expected_status")
134+
if isinstance(expected_statuses, int):
135+
expected_statuses = [expected_statuses]
119136

120137
deadline = time.time() + timeout
121138
print(f" Polling {url} (timeout={timeout}s)")
122139

123140
while True:
124141
try:
125-
resp = requests.get(url, timeout=10)
126-
# Only accept 2xx status codes as success
127-
if 200 <= resp.status_code < 300:
142+
resp = requests.get(url, headers=headers, timeout=10)
143+
144+
is_success = False
145+
if expected_statuses is not None:
146+
is_success = resp.status_code in expected_statuses
147+
else:
148+
is_success = 200 <= resp.status_code < 300
149+
150+
if is_success:
128151
if json_path and expected_value:
129152
data = resp.json()
130153
# resolve simple dot-separated path
@@ -199,13 +222,15 @@ def step_wait(step, config):
199222
"docker_compose_stop": step_docker_compose_stop,
200223
"docker_compose_exec": step_docker_compose_exec,
201224
"http_put": step_http_put,
225+
"http_post": step_http_post,
202226
"http_wait": step_http_wait,
203227
"kafka_consume": step_kafka_consume,
204228
"env_override": step_env_override,
205229
"wait": step_wait,
206230
}
207231

208232

233+
209234
# ---------------------------------------------------------------------------
210235
# Main
211236
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)