Skip to content

Latest commit

 

History

History
1076 lines (865 loc) · 39.6 KB

File metadata and controls

1076 lines (865 loc) · 39.6 KB

🔝 Retour au Sommaire

20bis.2.4 — Debezium pour Streaming d'Événements

Introduction

Dans le chapitre précédent, nous avons découvert les concepts du Change Data Capture (CDC) et du Logical Decoding. Nous avons brièvement présenté Debezium comme solution de référence. Ce chapitre approfondit Debezium : son architecture, sa configuration avancée, les transformations de messages, le monitoring, et les patterns de production.

Debezium est une plateforme open-source de Change Data Capture distribuée. Elle capture les modifications de bases de données en temps réel et les diffuse sous forme de flux d'événements vers Apache Kafka. C'est aujourd'hui le standard de facto pour le CDC dans les architectures événementielles.

À la fin de ce chapitre, vous saurez déployer, configurer et opérer Debezium en production avec PostgreSQL.


Pourquoi Debezium ?

Les Défis du CDC en Production

Implémenter le CDC "à la main" pose plusieurs défis :

Défi Complexité
Gestion des connexions au WAL Reconnexion, timeouts, heartbeats
Snapshots initiaux Capturer l'état existant avant le streaming
Évolution des schémas Gérer les ALTER TABLE sans interruption
Garanties de livraison At-least-once, ordering, exactly-once
Monitoring Métriques, alertes, lag tracking
Scalabilité Répartition de charge, haute disponibilité

Ce que Debezium Apporte

Debezium résout tous ces problèmes avec une solution éprouvée :

┌─────────────────────────────────────────────────────────────────────────┐
│                      Fonctionnalités Debezium                           │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ✅ Connecteurs pour 10+ bases de données                               │
│  ✅ Snapshots automatiques et incrémentaux                              │
│  ✅ Gestion des schémas avec Schema Registry                            │
│  ✅ Transformations de messages (SMT)                                   │
│  ✅ Métriques JMX/Prometheus                                            │
│  ✅ Tolérance aux pannes et reprise automatique                         │
│  ✅ Intégration native avec Kafka Connect                               │
│  ✅ Documentation exhaustive et communauté active                       │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Architecture de Debezium

Vue d'Ensemble

Debezium s'exécute comme un ensemble de connecteurs au sein de Kafka Connect, la plateforme d'intégration d'Apache Kafka.

┌────────────────────────────────────────────────────────────────────────┐
│                                                                        │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │                      Kafka Connect Cluster                       │  │
│  │                                                                  │  │
│  │   ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │  │
│  │   │  Worker 1   │  │  Worker 2   │  │  Worker 3   │              │  │
│  │   │             │  │             │  │             │              │  │
│  │   │ ┌─────────┐ │  │ ┌─────────┐ │  │ ┌─────────┐ │              │  │
│  │   │ │Debezium │ │  │ │Debezium │ │  │ │  Other  │ │              │  │
│  │   │ │Postgres │ │  │ │ MySQL   │ │  │ │Connector│ │              │  │
│  │   │ │Connector│ │  │ │Connector│ │  │ │         │ │              │  │
│  │   │ └────┬────┘ │  │ └────┬────┘ │  │ └─────────┘ │              │  │
│  │   └──────┼──────┘  └──────┼──────┘  └─────────────┘              │  │
│  │          │                │                                      │  │
│  └──────────┼────────────────┼──────────────────────────────────────┘  │
│             │                │                                         │
│             ▼                ▼                                         │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │                        Apache Kafka                              │  │
│  │                                                                  │  │
│  │   ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │  │
│  │   │   Topic:    │  │   Topic:    │  │   Topic:    │              │  │
│  │   │  postgres.  │  │   mysql.    │  │   schema    │              │  │
│  │   │   public.   │  │   mydb.     │  │   changes   │              │  │
│  │   │  customers  │  │   orders    │  │             │              │  │
│  │   └─────────────┘  └─────────────┘  └─────────────┘              │  │
│  │                                                                  │  │
│  └──────────────────────────────────────────────────────────────────┘  │
│                                                                        │
│        ▲                          ▲                                    │
│        │                          │                                    │
│   Logical                    Binlog                                    │
│   Replication                Replication                               │
│        │                          │                                    │
│  ┌─────┴─────┐              ┌─────┴─────┐                              │
│  │PostgreSQL │              │   MySQL   │                              │
│  └───────────┘              └───────────┘                              │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Les Composants

Composant Rôle
Kafka Connect Framework d'intégration qui héberge les connecteurs
Debezium Connector Plugin qui capture les changements d'une base spécifique
Source Task Thread qui lit le WAL et produit des messages
Kafka Bus de messages qui stocke et distribue les événements
Schema Registry Stocke les schémas Avro/JSON (optionnel mais recommandé)

Modes de Déploiement

Mode Standalone (Développement)

Un seul worker Kafka Connect :

# Lancer Connect en mode standalone
connect-standalone.sh \
    connect-standalone.properties \
    postgres-connector.properties

Mode Distribué (Production)

Cluster de workers pour la haute disponibilité :

# Chaque worker rejoint le cluster
connect-distributed.sh connect-distributed.properties

Le cluster distribue automatiquement les tâches entre les workers. Si un worker tombe, ses tâches sont redistribuées.


Installation Complète

Prérequis

  • PostgreSQL 10+ avec wal_level = logical
  • Apache Kafka 2.x ou 3.x
  • Java 11+
  • Docker (optionnel mais recommandé)

Option 1 : Avec Docker Compose (Recommandé)

Créez un fichier docker-compose.yml complet :

version: '3.8'

services:
  # ZooKeeper pour Kafka
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    healthcheck:
      test: echo srvr | nc zookeeper 2181 || exit 1
      interval: 10s
      timeout: 5s
      retries: 5

  # Apache Kafka
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    hostname: kafka
    container_name: kafka
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    healthcheck:
      test: kafka-broker-api-versions --bootstrap-server kafka:29092
      interval: 10s
      timeout: 5s
      retries: 5

  # Schema Registry (optionnel mais recommandé)
  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      kafka:
        condition: service_healthy
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  # PostgreSQL source
  postgres:
    image: postgres:16
    hostname: postgres
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: sourcedb
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
      - "-c"
      - "max_replication_slots=10"
      - "-c"
      - "max_wal_senders=10"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql

  # Debezium Connect
  connect:
    image: debezium/connect:2.4
    hostname: connect
    container_name: connect
    depends_on:
      kafka:
        condition: service_healthy
      postgres:
        condition: service_started
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: debezium-connect-cluster
      CONFIG_STORAGE_TOPIC: _connect_configs
      OFFSET_STORAGE_TOPIC: _connect_offsets
      STATUS_STORAGE_TOPIC: _connect_statuses
      CONFIG_STORAGE_REPLICATION_FACTOR: 1
      OFFSET_STORAGE_REPLICATION_FACTOR: 1
      STATUS_STORAGE_REPLICATION_FACTOR: 1
      KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      # Pour Avro avec Schema Registry :
      # KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      # KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      # VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    healthcheck:
      test: curl -f http://localhost:8083/connectors || exit 1
      interval: 10s
      timeout: 5s
      retries: 10

  # Interface Web pour Kafka (optionnel)
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
      - connect
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: debezium
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083

volumes:
  postgres_data:

Script d'initialisation init.sql :

-- Créer un utilisateur dédié au CDC
CREATE USER debezium WITH REPLICATION LOGIN PASSWORD 'dbz_password';

-- Base de données exemple
CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) UNIQUE,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    total NUMERIC(10,2) NOT NULL,
    status VARCHAR(50) DEFAULT 'pending',
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE order_items (
    id SERIAL PRIMARY KEY,
    order_id INTEGER REFERENCES orders(id),
    product_name VARCHAR(255),
    quantity INTEGER,
    unit_price NUMERIC(10,2)
);

-- Configurer REPLICA IDENTITY pour capturer les valeurs "before"
ALTER TABLE customers REPLICA IDENTITY FULL;  
ALTER TABLE orders REPLICA IDENTITY FULL;  
ALTER TABLE order_items REPLICA IDENTITY FULL;  

-- Donner les droits à debezium
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;  
GRANT USAGE ON SCHEMA public TO debezium;  

-- Créer la publication pour Debezium
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- Insérer des données initiales
INSERT INTO customers (name, email) VALUES
    ('Alice Martin', 'alice@example.com'),
    ('Bob Johnson', 'bob@example.com'),
    ('Charlie Brown', 'charlie@example.com');

INSERT INTO orders (customer_id, total, status) VALUES
    (1, 150.00, 'completed'),
    (1, 75.50, 'pending'),
    (2, 200.00, 'shipped');

Démarrer l'environnement :

docker-compose up -d

# Vérifier que tout est démarré
docker-compose ps

# Voir les logs de Connect
docker-compose logs -f connect

Option 2 : Installation Manuelle

# Télécharger Kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz  
tar -xzf kafka_2.13-3.6.0.tgz  
cd kafka_2.13-3.6.0  

# Télécharger le connecteur Debezium PostgreSQL
mkdir -p plugins/debezium-postgres  
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.4.0.Final/debezium-connector-postgres-2.4.0.Final-plugin.tar.gz  
tar -xzf debezium-connector-postgres-2.4.0.Final-plugin.tar.gz -C plugins/debezium-postgres  

# Configurer Kafka Connect
cat >> config/connect-distributed.properties << EOF  
plugin.path=/path/to/kafka/plugins  
EOF  

# Démarrer ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &

# Démarrer Kafka
bin/kafka-server-start.sh config/server.properties &

# Démarrer Kafka Connect
bin/connect-distributed.sh config/connect-distributed.properties &

Configuration du Connecteur PostgreSQL

Enregistrement via l'API REST

Debezium se configure via l'API REST de Kafka Connect :

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-source-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",

      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "debezium",
      "database.password": "dbz_password",
      "database.dbname": "sourcedb",

      "topic.prefix": "dbserver1",

      "plugin.name": "pgoutput",
      "slot.name": "debezium_slot",
      "publication.name": "dbz_publication",

      "table.include.list": "public.customers,public.orders,public.order_items",

      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "true",

      "snapshot.mode": "initial",

      "heartbeat.interval.ms": "10000",
      "heartbeat.action.query": "UPDATE public.heartbeat SET ts = NOW()",

      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.drop.tombstones": "false",
      "transforms.unwrap.delete.handling.mode": "rewrite"
    }
  }'

Vérifier l'État du Connecteur

# Liste des connecteurs
curl http://localhost:8083/connectors

# État d'un connecteur
curl http://localhost:8083/connectors/postgres-source-connector/status | jq

# Configuration actuelle
curl http://localhost:8083/connectors/postgres-source-connector/config | jq

Réponse typique d'un connecteur sain :

{
  "name": "postgres-source-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "connect:8083"
    }
  ],
  "type": "source"
}

Paramètres de Configuration Essentiels

Connexion à la Base

Paramètre Description
database.hostname Hôte PostgreSQL
database.port Port (défaut: 5432)
database.user Utilisateur avec droits REPLICATION
database.password Mot de passe
database.dbname Nom de la base de données

Réplication Logique

Paramètre Description
plugin.name pgoutput (recommandé), wal2json, decoderbufs
slot.name Nom du slot de réplication
publication.name Nom de la publication PostgreSQL
publication.autocreate.mode all_tables, filtered, disabled

Sélection des Tables

Paramètre Description
table.include.list Tables à inclure (regex)
table.exclude.list Tables à exclure
column.include.list Colonnes à inclure
column.exclude.list Colonnes à exclure (données sensibles)
schema.include.list Schémas à inclure

Topics Kafka

Paramètre Description
topic.prefix Préfixe des topics (ex: dbserver1)
topic.naming.strategy Stratégie de nommage des topics
topic.creation.enable Créer les topics automatiquement

Modes de Snapshot

Le snapshot est la phase initiale où Debezium capture l'état actuel des tables avant de passer au streaming continu.

Les Différents Modes

┌─────────────────────────────────────────────────────────────────────────┐
│                         Modes de Snapshot                               │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  initial          ════════════╦══════════════════════════════════►      │
│                   ║ Snapshot  ║         Streaming continu               │
│                   ╚═══════════╝                                         │
│                                                                         │
│  always           ════════════╦══════════════════════════════════►      │
│  (à chaque        ║ Snapshot  ║         Streaming continu               │
│   redémarrage)    ╚═══════════╝                                         │
│                                                                         │
│  never            ══════════════════════════════════════════════►       │
│                              Streaming uniquement                       │
│                              (depuis le LSN actuel)                     │
│                                                                         │
│  initial_only     ════════════╗                                         │
│                   ║ Snapshot  ║  (Arrêt après snapshot)                 │
│                   ╚═══════════╝                                         │
│                                                                         │
│  when_needed      ═══════╦════╦═════════════════════════════════►       │
│                   (Snap  ║    ║  Streaming                              │
│                   si pas ║    ║                                         │
│                   d'offset)   ╝                                         │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘
Mode Comportement
initial Snapshot au premier démarrage, puis streaming
always Snapshot à chaque redémarrage du connecteur
never Pas de snapshot, streaming depuis la position actuelle
initial_only Snapshot uniquement, pas de streaming (migration batch)
when_needed Snapshot si pas d'offset stocké ou slot invalide
exported Utilise une transaction d'export pour le snapshot
custom Classe personnalisée pour le snapshot

Configuration du Snapshot

{
  "snapshot.mode": "initial",
  "snapshot.include.collection.list": "public.customers,public.orders",
  "snapshot.select.statement.overrides": "public.large_table",
  "snapshot.select.statement.overrides.public.large_table": "SELECT * FROM public.large_table WHERE created_at > '2024-01-01'",
  "snapshot.max.threads": "4",
  "snapshot.fetch.size": "10000",
  "snapshot.lock.timeout.ms": "10000"
}

Snapshot Incrémental (Ad-hoc)

Depuis Debezium 1.6, vous pouvez déclencher un snapshot à la demande via des signaux :

-- Créer la table de signaux
CREATE TABLE debezium_signal (
    id VARCHAR(42) PRIMARY KEY,
    type VARCHAR(32) NOT NULL,
    data VARCHAR(2048) NULL
);

-- Configurer le connecteur pour écouter les signaux
-- "signal.data.collection": "public.debezium_signal"

-- Déclencher un snapshot incrémental
INSERT INTO debezium_signal (id, type, data)  
VALUES (  
    'ad-hoc-1',
    'execute-snapshot',
    '{"data-collections": ["public.customers"], "type": "incremental"}'
);

Transformations de Messages (SMT)

Les Single Message Transforms (SMT) permettent de modifier les messages avant leur envoi à Kafka.

Transformations Courantes

1. ExtractNewRecordState (Unwrap)

Simplifie la structure du message en extrayant uniquement l'état "after" :

// AVANT (structure Debezium complète)
{
  "schema": { ... },
  "payload": {
    "before": null,
    "after": { "id": 1, "name": "Alice" },
    "source": { ... },
    "op": "c",
    "ts_ms": 1234567890
  }
}

// APRÈS (avec ExtractNewRecordState)
{
  "id": 1,
  "name": "Alice"
}

Configuration :

{
  "transforms": "unwrap",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.delete.handling.mode": "rewrite",
  "transforms.unwrap.add.fields": "op,source.ts_ms"
}

2. Filtrage par Contenu

{
  "transforms": "filter",
  "transforms.filter.type": "io.debezium.transforms.Filter",
  "transforms.filter.language": "jsr223.groovy",
  "transforms.filter.condition": "value.after.status == 'active'"
}

3. Routage vers Topics Différents

Router les messages vers différents topics selon le contenu :

{
  "transforms": "route",
  "transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
  "transforms.route.topic.regex": "(.*)customers(.*)",
  "transforms.route.topic.replacement": "$1users$2"
}

4. Masquage de Données Sensibles

{
  "transforms": "mask",
  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "email,phone,ssn",
  "transforms.mask.replacement": "***MASKED***"
}

5. Ajout de Métadonnées

{
  "transforms": "addMetadata",
  "transforms.addMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.addMetadata.static.field": "environment",
  "transforms.addMetadata.static.value": "production"
}

Chaîner Plusieurs Transformations

{
  "transforms": "unwrap,filter,route,addField",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.filter.type": "io.debezium.transforms.Filter",
  "transforms.filter.language": "jsr223.groovy",
  "transforms.filter.condition": "value.status != 'deleted'",
  "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.route.regex": "dbserver1.public.(.*)",
  "transforms.route.replacement": "prod.$1",
  "transforms.addField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.addField.timestamp.field": "processed_at"
}

Gestion des Schémas

Le Problème de l'Évolution des Schémas

Quand vous modifiez la structure d'une table (ALTER TABLE), les consommateurs doivent pouvoir gérer les anciens ET nouveaux formats de messages.

Jour 1: { "id": 1, "name": "Alice" }  
Jour 2: ALTER TABLE ADD COLUMN email VARCHAR(255)  
Jour 2: { "id": 2, "name": "Bob", "email": "bob@example.com" }  

Le consommateur doit gérer les deux formats !

Schema Registry

Le Schema Registry (Confluent ou Apicurio) stocke les schémas et garantit la compatibilité :

┌─────────────────────────────────────────────────────────────────────────┐
│                                                                         │
│  Debezium ──────► Schema Registry ◄────── Consommateurs                 │
│     │                   │                        │                      │
│     │ Enregistre        │ Stocke les             │ Récupère             │
│     │ le schéma         │ schémas                │ le schéma            │
│     ▼                   │                        ▼                      │
│  Message Kafka          │                 Désérialise                   │
│  (données + ID schéma)  │                 correctement                  │
│                         │                                               │
└─────────────────────────────────────────────────────────────────────────┘

Configuration avec Avro

{
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://schema-registry:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081",
  "value.converter.schemas.enable": "true"
}

Modes de Compatibilité

Mode Règle
BACKWARD Nouveau schéma peut lire anciennes données
FORWARD Ancien schéma peut lire nouvelles données
FULL Backward + Forward
NONE Pas de vérification
# Configurer la compatibilité pour un sujet
curl -X PUT http://localhost:8081/config/dbserver1.public.customers-value \
  -H "Content-Type: application/json" \
  -d '{"compatibility": "BACKWARD"}'

Monitoring et Opérations

Métriques JMX

Debezium expose des métriques via JMX :

# Activer JMX dans le connecteur
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \
  -Dcom.sun.management.jmxremote.port=9010 \
  -Dcom.sun.management.jmxremote.authenticate=false \
  -Dcom.sun.management.jmxremote.ssl=false"

Métriques Importantes

Métrique Description
MilliSecondsBehindSource Latence par rapport à la source
TotalNumberOfEventsSeen Nombre total d'événements capturés
NumberOfEventsFiltered Événements filtrés
QueueTotalCapacity Capacité de la queue interne
QueueRemainingCapacity Capacité restante
LastEvent Timestamp du dernier événement
SnapshotCompleted Snapshot terminé (true/false)
SnapshotDurationInSeconds Durée du snapshot

Prometheus et Grafana

Utilisez JMX Exporter pour exposer les métriques à Prometheus :

# prometheus-jmx-config.yml
rules:
  - pattern: "debezium.postgres<type=connector-metrics, context=(.+), server=(.+)><>(.+): (.+)"
    name: "debezium_postgres_$3"
    labels:
      context: "$1"
      server: "$2"
    type: GAUGE

  - pattern: "debezium.postgres<type=connector-metrics, context=streaming, server=(.+)><>MilliSecondsBehindSource"
    name: "debezium_streaming_lag_ms"
    labels:
      server: "$1"
    type: GAUGE

Dashboard Grafana

Métriques clés à surveiller :

┌─────────────────────────────────────────────────────────────────────────┐
│                     Dashboard Debezium                                  │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐       │
│  │   Lag (ms)       │  │  Events/sec      │  │  Queue Usage     │       │
│  │                  │  │                  │  │                  │       │
│  │     245 ms       │  │     1,234        │  │     45%          │       │
│  │     ▂▃▅▆▇        │  │     ▃▅▆▇▅▃       │  │     ▃▃▃▃▅▆       │       │
│  └──────────────────┘  └──────────────────┘  └──────────────────┘       │
│                                                                         │
│  ┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐       │
│  │   Errors         │  │  Snapshot Status │  │  Slot Lag        │       │
│  │                  │  │                  │  │                  │       │
│  │       0          │  │   Completed ✓    │  │     12 MB        │       │
│  │     ▁▁▁▁▁        │  │                  │  │     ▂▂▂▃▃▂       │       │
│  └──────────────────┘  └──────────────────┘  └──────────────────┘       │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Alertes Recommandées

# Prometheus alerting rules
groups:
  - name: debezium
    rules:
      - alert: DebeziumHighLag
        expr: debezium_streaming_lag_ms > 60000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Debezium lag > 60 seconds"

      - alert: DebeziumConnectorDown
        expr: debezium_connector_status != 1
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Debezium connector is not running"

      - alert: DebeziumSlotLagHigh
        expr: pg_replication_slots_lag_bytes > 1073741824
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "PostgreSQL replication slot lag > 1GB"

Opérations Courantes

Mettre à Jour la Configuration

# Mettre à jour un connecteur
curl -X PUT http://localhost:8083/connectors/postgres-source-connector/config \
  -H "Content-Type: application/json" \
  -d '{
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "table.include.list": "public.customers,public.orders,public.new_table",
    ... autres paramètres ...
  }'

Pause et Reprise

# Mettre en pause
curl -X PUT http://localhost:8083/connectors/postgres-source-connector/pause

# Reprendre
curl -X PUT http://localhost:8083/connectors/postgres-source-connector/resume

Redémarrer un Connecteur

# Redémarrer le connecteur
curl -X POST http://localhost:8083/connectors/postgres-source-connector/restart

# Redémarrer une tâche spécifique
curl -X POST http://localhost:8083/connectors/postgres-source-connector/tasks/0/restart

Supprimer un Connecteur

# Supprimer le connecteur
curl -X DELETE http://localhost:8083/connectors/postgres-source-connector

# IMPORTANT : Nettoyer aussi le slot PostgreSQL
psql -c "SELECT pg_drop_replication_slot('debezium_slot');"

Patterns de Production

Pattern 1 : Outbox Pattern avec Debezium

Le pattern Outbox garantit la cohérence entre les modifications de base et la publication d'événements :

-- Table Outbox
CREATE TABLE outbox (
    id UUID PRIMARY KEY,
    aggregate_type VARCHAR(255) NOT NULL,
    aggregate_id VARCHAR(255) NOT NULL,
    type VARCHAR(255) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- La transaction métier écrit dans outbox
BEGIN;
    INSERT INTO orders (customer_id, total) VALUES (1, 100.00);
    INSERT INTO outbox (id, aggregate_type, aggregate_id, type, payload)
    VALUES (
        gen_random_uuid(),
        'Order',
        '12345',
        'OrderCreated',
        '{"order_id": "12345", "customer_id": 1, "total": 100.00}'
    );
COMMIT;

Configuration Debezium pour Outbox :

{
  "table.include.list": "public.outbox",
  "transforms": "outbox",
  "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
  "transforms.outbox.table.fields.additional.placement": "type:header:eventType",
  "transforms.outbox.route.by.field": "aggregate_type",
  "transforms.outbox.route.topic.replacement": "events.${routedByValue}"
}

Pattern 2 : Multi-Tenant

{
  "transforms": "addTenant,route",
  "transforms.addTenant.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.addTenant.static.field": "tenant_id",
  "transforms.addTenant.static.value": "tenant_123",
  "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.route.regex": "dbserver1.public.(.*)",
  "transforms.route.replacement": "tenant_123.$1"
}

Pattern 3 : Réplication Cross-Region

┌─────────────────┐         ┌─────────────────┐
│   Region EU     │         │   Region US     │
│                 │         │                 │
│  PostgreSQL ────┼── CDC ──┼──► PostgreSQL   │
│  (Primary)      │  Kafka  │    (Replica)    │
│                 │         │                 │
└─────────────────┘         └─────────────────┘

Troubleshooting

Problème : Le Connecteur ne Démarre Pas

# Vérifier les logs
docker-compose logs connect | grep -i error

# Causes courantes :
# - pg_hba.conf n'autorise pas la connexion
# - wal_level != logical
# - Utilisateur sans droits REPLICATION

Problème : Lag Croissant

-- Vérifier le lag du slot
SELECT
    slot_name,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots  
WHERE slot_name = 'debezium_slot';  

-- Solutions :
-- - Augmenter les ressources du worker Connect
-- - Réduire le nombre de tables capturées
-- - Optimiser les transformations

Problème : Slot Perdu (Slot Already Exists)

-- Le slot existe mais le connecteur ne peut pas s'y connecter
-- Supprimer et recréer

SELECT pg_drop_replication_slot('debezium_slot');

-- Puis redémarrer le connecteur

Problème : Messages Dupliqués

Debezium garantit at-least-once delivery. Les consommateurs doivent être idempotents :

def process_event(event):
    event_id = f"{event['source']['lsn']}_{event['source']['txId']}"

    if redis.exists(f"processed:{event_id}"):
        return  # Déjà traité

    # Traiter l'événement
    do_work(event)

    # Marquer comme traité (avec TTL)
    redis.setex(f"processed:{event_id}", 86400, "1")

Conclusion

Debezium transforme PostgreSQL en source d'événements robuste pour les architectures modernes. Ses points forts :

  • Fiabilité : Basé sur le WAL, garantie at-least-once
  • Flexibilité : SMT pour transformer les messages à la volée
  • Observabilité : Métriques complètes pour le monitoring
  • Écosystème : Intégration native avec Kafka et Schema Registry
  • Communauté : Open-source actif avec support Red Hat

En production, Debezium permet de construire des pipelines CDC qui synchronisent des téraoctets de données avec une latence de l'ordre de la seconde, sans modifier les applications sources.


Points Clés à Retenir

  • Debezium : Plateforme CDC open-source basée sur Kafka Connect
  • Modes de snapshot : initial, never, always selon le cas d'usage
  • SMT : Transformations pour simplifier, filtrer ou enrichir les messages
  • Schema Registry : Gestion de l'évolution des schémas
  • Monitoring : Métriques JMX, alertes sur le lag et les erreurs
  • Outbox Pattern : Garantir la cohérence transactionnelle des événements
  • Idempotence : Les consommateurs doivent gérer les doublons
  • Opérations : API REST pour gérer les connecteurs (pause, restart, update)

⏭️ PostgreSQL en architecture serverless