Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions .github/workflows/auditlog-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,36 @@ on:
paths:
- 'auditlog/**'
- '.github/workflows/auditlog-workflow.yml'
- 'scripts/run-example-test.py'
pull_request:
paths:
- 'auditlog/**'
- '.github/workflows/auditlog-workflow.yml'
- 'scripts/run-example-test.py'

jobs:
build:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Set up JDK 8
- uses: actions/checkout@v4

- name: Set up Java
uses: actions/setup-java@v4
with:
java-version: '8'
distribution: 'temurin'
- name: Cache local Maven repository
uses: actions/cache@v4
cache: 'maven'

- name: Build auditlog services
run: mvn clean package -DskipTests -f auditlog/pom.xml

- name: Set up Python
uses: actions/setup-python@v5
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('auditlog/**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- name: Check changes in [auditlog] example
run: cd auditlog && mvn clean package -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
python-version: '3.11'

- name: Install Python dependencies
run: pip install pyyaml requests

- name: Run auditlog test
run: python scripts/run-example-test.py auditlog
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import java.util.Date;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public class AuditData {

@JsonFormat(shape = JsonFormat.Shape.NUMBER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import java.util.Date;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public class SourceData {

private String version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import java.util.Date;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public class TransactionData {

@JsonProperty("transaction_id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public class TransactionEvent {

private TransactionData before;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.debezium.demos.auditing.admin;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public class VegetableData {

private Long id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public class VegetableEvent {

private VegetableData before;
Expand Down
128 changes: 128 additions & 0 deletions auditlog/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
name: auditlog
description: Tests Debezium PostgreSQL connector with audit log enrichment via Kafka Streams
env_file: ../.env
compose_file: docker-compose.yaml

cleanup:
type: docker_compose_down
volumes: true

steps:
- name: Build all images
type: docker_compose_build

- name: Start all services
type: docker_compose_up
detach: true

- name: Wait for Kafka Connect REST API to be ready
type: http_wait
url: http://localhost:8083/connectors
timeout_seconds: 120
interval_seconds: 5

- name: Register Debezium PostgreSQL connector
type: http_put
url: http://localhost:8083/connectors/inventory-connector/config
body_file: register-postgres.json
expected_status: [200, 201]

- name: Wait for connector to be RUNNING
type: http_wait
url: http://localhost:8083/connectors/inventory-connector/status
timeout_seconds: 60
interval_seconds: 5
expected_json_path: connector.state
expected_value: RUNNING

- name: Wait for vegetables-service to be ready
type: http_wait
url: http://localhost:8080/vegetables
timeout_seconds: 120
interval_seconds: 5
expected_status: [200, 405]

- name: Wait for admin-service to be ready
type: http_wait
url: http://localhost:8085/vegetables
timeout_seconds: 60
interval_seconds: 5
expected_status: 200

- name: Create a new vegetable record (farmerbob)
type: http_post
url: http://localhost:8080/vegetables
headers:
Authorization: "Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q"
Date: "Mon, 20 Apr 2026 12:00:00 GMT"
body:
name: "Cucumber"
description: "Fresh and crunchy"
expected_status: [200, 201, 204]

- name: Verify enriched message appears in Kafka
type: kafka_consume
topic: dbserver1.inventory.vegetable.enriched
timeout_seconds: 60
expected_content: "Cucumber"

- name: Verify metadata enrichment (user farmerbob)
type: kafka_consume
topic: dbserver1.inventory.vegetable.enriched
timeout_seconds: 30
expected_content: "farmerbob"

- name: Administrate missing events - Create a record bypassing the service
type: docker_compose_exec
service: vegetables-db
command: "psql postgresql://postgresuser:postgrespw@vegetables-db:5432/vegetablesdb -c \"insert into inventory.vegetable (id, description, name) values (nextval('inventory.vegetables_id_seq'), 'Tasty!', 'Banana');\""

- name: Verify raw anomalous event appears in Kafka
type: kafka_consume
service: kafka
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kafka_consume step sets service: kafka, but the runner currently hard-codes the Kafka service name to kafka and ignores any service field. Either remove this field from the YAML (to avoid suggesting it has an effect) or extend step_kafka_consume to honor an optional service override.

Suggested change
service: kafka

Copilot uses AI. Check for mistakes.
topic: dbserver1.inventory.vegetable
expected_content: "Banana"
timeout_seconds: 60

- name: Wait for missing event to appear in admin service
type: http_wait
url: http://localhost:8085/vegetables
timeout_seconds: 120
interval_seconds: 5
expected_status: 200
expected_json_path: 0.id
expected_value: "{ANY}"
capture_json:
MISSING_UUID: "0.id"

- name: Get list of tasks to provide missing data
type: http_wait
url: http://localhost:8085/vegetables/${MISSING_UUID}/tasks
timeout_seconds: 30
interval_seconds: 5
expected_status: 200
expected_json_path: "{FIRST_KEY}"
expected_value: "{ANY}"
capture_json:
TASK_UUID: "{FIRST_KEY}"

- name: Provide missing metadata to the admin service
type: http_post
url: http://localhost:8085/vegetables/${MISSING_UUID}/auditData/${TASK_UUID}
expected_status: [200, 201]
body:
audit:
usecase: "CREATE VEGETABLE"
user_name: "farmerjohn"

- name: Verify enriched message appears in Kafka (from admin resolution)
type: kafka_consume
topic: dbserver1.inventory.vegetable.enriched
timeout_seconds: 60
expected_content: "Banana"

- name: Verify metadata enrichment (user farmerjohn)
type: kafka_consume
topic: dbserver1.inventory.vegetable.enriched
timeout_seconds: 30
expected_content: "farmerjohn"
3 changes: 3 additions & 0 deletions postgres-failover-slots/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ services:
environment:
PGUSER: replicator
PGPASSWORD: zufsob-kuvtum-bImxa6
PGDATABASE: inventorydb
command: |
bash -c "
until pg_basebackup --pgdata=/var/lib/postgresql/data -R --slot=replication_slot --host=postgres_primary --port=5432
do
echo 'Waiting for primary to connect...'
rm -rf /var/lib/postgresql/data/*
sleep 1s
done
echo 'Backup done, starting replica...'
Expand Down Expand Up @@ -98,6 +100,7 @@ services:
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- OFFSET_FLUSH_INTERVAL_MS=1000
# For testing newer connector versions, unpack the connector archive into this
# directory and uncomment the lines below
# volumes:
Expand Down
3 changes: 2 additions & 1 deletion postgres-failover-slots/inventory-source.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
"topic.prefix" : "dbserver1",
"schema.include.list" : "inventory",
"plugin.name" : "pgoutput",
"slot.failover" : "true"
"slot.failover" : "true",
"snapshot.mode" : "when_needed"
}
45 changes: 28 additions & 17 deletions postgres-failover-slots/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ steps:
timeout_seconds: 30
expected_content: Sarah

# --- Failover sequence ---

- name: Simulate failure - stop primary database
type: docker_compose_stop
service: postgres_primary
Expand All @@ -63,7 +65,7 @@ steps:

- name: Wait for promotion to complete
type: wait
seconds: 5
seconds: 10

- name: Override DB_HOST to point pgbouncer at new primary
type: env_override
Expand All @@ -76,31 +78,40 @@ steps:
detach: true

- name: Wait for pgbouncer to become available
type: wait
seconds: 10

# --- Recover connector on new primary ---

- name: Delete old connector
type: http_delete
url: http://localhost:8083/connectors/inventory-source
expected_status: [200, 204, 404]

- name: Drop stale replication slot on promoted replica
type: docker_compose_exec
service: postgres_replica
command: "psql -U user -d inventorydb -c \"SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = 'debezium';\""

- name: Wait for connector cleanup
type: wait
seconds: 5

- name: Wait for connector task to reconnect after failover
- name: Re-register connector on new primary
type: http_put
url: http://localhost:8083/connectors/inventory-source/config
body_file: inventory-source.json
expected_status: [200, 201]

- name: Wait for connector to be RUNNING on new primary
type: http_wait
url: http://localhost:8083/connectors/inventory-source/status
timeout_seconds: 120
timeout_seconds: 60
interval_seconds: 5
expected_json_path: tasks.0.state
expected_value: RUNNING

- name: Wait for connector to fully establish streaming connection
type: wait
seconds: 10

- name: Verify connector is streaming - make test change
type: docker_compose_exec
service: postgres_replica
command: "psql -U user -d inventorydb -c \"UPDATE inventory.customers SET email = 'test@failover.com' WHERE id = 1001;\""

- name: Verify connector is streaming - check test change appears in Kafka
type: kafka_consume
topic: dbserver1.inventory.customers
timeout_seconds: 30
expected_content: test@failover.com
# --- Verify post-failover streaming ---

- name: Make change on new primary (promoted replica)
type: docker_compose_exec
Expand Down
Loading
Loading