Skip to content

Latest commit

 

History

History
964 lines (746 loc) · 34.5 KB

File metadata and controls

964 lines (746 loc) · 34.5 KB

🔝 Retour au Sommaire

20bis.2.3 — Logical Decoding et Change Data Capture (CDC)

Introduction

Dans les chapitres précédents, nous avons exploré deux mécanismes pour propager les changements de données :

  • Event Store : Stocker explicitement les événements métier
  • NOTIFY/LISTEN : Notifications temps réel via triggers

Mais que faire si vous souhaitez capturer tous les changements d'une base existante, sans modifier les applications qui y écrivent ? Comment synchroniser une base PostgreSQL avec Elasticsearch, Kafka, ou un data warehouse, de manière fiable et en temps réel ?

C'est là qu'interviennent le Logical Decoding et le Change Data Capture (CDC). Ces technologies permettent de "lire" le journal de transactions de PostgreSQL et de transformer chaque modification en événement exploitable.

Ce chapitre vous apprendra à capturer les changements de votre base PostgreSQL et à les diffuser vers d'autres systèmes.


Qu'est-ce que le Change Data Capture ?

Définition

Le Change Data Capture (CDC) est une technique qui consiste à identifier et capturer les modifications apportées aux données d'une base de données, puis à livrer ces changements à un système cible en temps réel.

┌─────────────────────────────────────────────────────────────────────────┐
│                     CHANGE DATA CAPTURE (CDC)                           │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌──────────────┐          ┌──────────────┐          ┌──────────────┐   │
│  │ Application  │          │              │          │   Systèmes   │   │
│  │   Source     │          │     CDC      │          │    Cibles    │   │
│  │              │          │   Pipeline   │          │              │   │
│  │  INSERT ────►│          │              │          │ Elasticsearch│   │
│  │  UPDATE ────►│─────────►│  Capture ───►│─────────►│ Kafka        │   │
│  │  DELETE ────►│          │  Transform   │          │ Data Lake    │   │
│  │              │          │  Deliver     │          │ Analytics    │   │
│  └──────────────┘          └──────────────┘          └──────────────┘   │
│                                                                         │
│        Source                   CDC                      Sink           │
│       (PostgreSQL)           (Debezium)              (Consumers)        │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Les Approches Traditionnelles et Leurs Limites

Avant le CDC, plusieurs approches étaient utilisées pour synchroniser les données :

Approche Principe Limites
Polling Requêter périodiquement les changements Latence, charge sur la base, colonnes updated_at requises
Triggers applicatifs Écrire dans une table d'audit via triggers Overhead sur chaque écriture, couplage
Dual Write L'application écrit dans deux systèmes Risque d'incohérence, complexité applicative
ETL batch Export/import périodique complet Très haute latence (heures), coûteux en ressources

L'Avantage du CDC Basé sur les Logs

Le CDC moderne lit directement le journal de transactions (WAL) de la base de données :

Avantage Description
Non intrusif Aucune modification de l'application source
Temps réel Latence de l'ordre de la seconde
Complet Capture TOUTES les modifications (INSERT, UPDATE, DELETE)
Faible impact Lecture du WAL, pas de requêtes supplémentaires
Fiable Basé sur le journal de transactions, rien n'est perdu
Ordonné L'ordre des opérations est préservé

Le WAL : Le Cœur du Système

Qu'est-ce que le WAL ?

Le Write-Ahead Log (WAL) est le journal de transactions de PostgreSQL. Chaque modification de données est d'abord écrite dans le WAL avant d'être appliquée aux fichiers de données.

┌─────────────────────────────────────────────────────────────────────────┐
│                          PostgreSQL                                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   Transaction                                                           │
│       │                                                                 │
│       ▼                                                                 │
│   ┌───────────────────────────────────────────────────────────────┐     │
│   │                      WAL (Write-Ahead Log)                    │     │
│   │                                                               │     │
│   │  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐      │     │
│   │  │ LSN │ │ LSN │ │ LSN │ │ LSN │ │ LSN │ │ LSN │ │ LSN │      │     │
│   │  │  1  │ │  2  │ │  3  │ │  4  │ │  5  │ │  6  │ │  7  │      │     │
│   │  └─────┘ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘      │     │
│   │     │       │       │       │       │       │       │         │     │
│   │  INSERT  UPDATE  DELETE  INSERT  UPDATE  COMMIT  INSERT       │     │
│   │  user    order   item    order   user            product      │     │
│   │                                                               │     │
│   └───────────────────────────────────────────────────────────────┘     │
│                              │                                          │
│                              ▼                                          │
│                    Fichiers de données                                  │
│                    (tables, index)                                      │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

LSN = Log Sequence Number (position dans le WAL)

Pourquoi le WAL est Idéal pour le CDC

Caractéristique Bénéfice pour le CDC
Séquentiel Ordre des événements garanti
Durable Aucune perte de données après commit
Complet Contient toutes les modifications
Performant Lecture séquentielle efficace
Indépendant Pas d'impact sur les requêtes applicatives

Logical Decoding : Transformer le WAL en Événements

Principe

Le Logical Decoding est la fonctionnalité PostgreSQL qui permet de décoder le contenu du WAL en changements logiques compréhensibles (lignes insérées, mises à jour, supprimées).

WAL Physique                    Logical Decoding                Événements
(bytes bruts)                   (décodage)                      (lisibles)

┌────────────────┐              ┌────────────────┐              ┌────────────────┐
│ 0x1A2B3C...    │              │                │              │ INSERT INTO    │
│ page 42        │     ───►     │   Plugin de    │     ───►     │ users (id, name│
│ offset 128     │              │   décodage     │              │ VALUES (1,     │
│ tuple data...  │              │   (wal2json,   │              │ 'Alice')       │
└────────────────┘              │   pgoutput)    │              └────────────────┘
                                └────────────────┘

Les Composants Clés

1. Slots de Réplication

Un slot de réplication est un pointeur qui suit la position de lecture dans le WAL. Il garantit que PostgreSQL conserve les segments WAL tant qu'ils n'ont pas été consommés.

-- Créer un slot de réplication logique
SELECT pg_create_logical_replication_slot(
    'my_slot',      -- Nom du slot
    'pgoutput'      -- Plugin de décodage
);

-- Lister les slots existants
SELECT slot_name, plugin, slot_type, active, restart_lsn  
FROM pg_replication_slots;  

-- Supprimer un slot
SELECT pg_drop_replication_slot('my_slot');

2. Plugins de Décodage

Les output plugins transforment les données WAL en format lisible :

Plugin Format Usage
pgoutput Protocole binaire PostgreSQL Réplication logique native, Debezium
wal2json JSON Intégration simple, debugging
test_decoding Texte simple Tests et démonstrations
decoderbufs Protocol Buffers Haute performance

3. Publications

Les publications définissent quelles tables sont incluses dans la réplication logique :

-- Publication pour toutes les tables
CREATE PUBLICATION my_publication FOR ALL TABLES;

-- Publication pour des tables spécifiques
CREATE PUBLICATION orders_pub FOR TABLE orders, order_items;

-- Publication avec filtrage des opérations
CREATE PUBLICATION inserts_only FOR TABLE audit_log
    WITH (publish = 'insert');  -- Seulement les INSERT

Configuration de PostgreSQL pour le CDC

Étape 1 : Paramètres postgresql.conf

# Activer la réplication logique
wal_level = logical

# Nombre maximum de slots de réplication
max_replication_slots = 10

# Nombre maximum de processus WAL sender
max_wal_senders = 10

# Conserver le WAL pour le décodage (optionnel mais recommandé)
wal_keep_size = 1GB

Après modification, redémarrez PostgreSQL :

sudo systemctl restart postgresql

Étape 2 : Vérifier la Configuration

-- Vérifier wal_level
SHOW wal_level;
-- Doit retourner 'logical'

-- Vérifier les slots disponibles
SHOW max_replication_slots;

-- Vérifier les WAL senders
SHOW max_wal_senders;

Étape 3 : Configurer les Permissions

-- Créer un utilisateur dédié à la réplication
CREATE USER cdc_user WITH REPLICATION LOGIN PASSWORD 'secure_password';

-- Donner les droits de lecture sur les tables
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;

-- Donner les droits sur le schéma
GRANT USAGE ON SCHEMA public TO cdc_user;

Dans pg_hba.conf, autoriser la connexion de réplication :

# TYPE  DATABASE        USER            ADDRESS                 METHOD
host    replication     cdc_user        10.0.0.0/8              scram-sha-256  
host    mydb            cdc_user        10.0.0.0/8              scram-sha-256  

Utilisation du Logical Decoding

Test avec test_decoding

Le plugin test_decoding est parfait pour comprendre le fonctionnement :

-- Créer un slot avec test_decoding
SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');

-- Créer une table de test
CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(255)
);

-- Insérer des données
INSERT INTO customers (name, email) VALUES ('Alice', 'alice@example.com');  
INSERT INTO customers (name, email) VALUES ('Bob', 'bob@example.com');  
UPDATE customers SET email = 'alice.new@example.com' WHERE id = 1;  
DELETE FROM customers WHERE id = 2;  

-- Lire les changements du slot
SELECT * FROM pg_logical_slot_get_changes(
    'test_slot',    -- Nom du slot
    NULL,           -- LSN de départ (NULL = depuis le début)
    NULL            -- Nombre max de changements (NULL = tous)
);

Résultat :

    lsn     |   xid   |                          data
------------+---------+---------------------------------------------------------
 0/16B5D80  |     734 | BEGIN 734
 0/16B5D80  |     734 | table public.customers: INSERT: id[integer]:1 name[...
 0/16B5E48  |     734 | COMMIT 734
 0/16B5E80  |     735 | BEGIN 735
 0/16B5E80  |     735 | table public.customers: INSERT: id[integer]:2 name[...
 0/16B5F18  |     735 | COMMIT 735
 0/16B5F50  |     736 | BEGIN 736
 0/16B5F50  |     736 | table public.customers: UPDATE: id[integer]:1 name[...
 0/16B6018  |     736 | COMMIT 736
 0/16B6050  |     737 | BEGIN 737
 0/16B6050  |     737 | table public.customers: DELETE: id[integer]:2
 0/16B60B8  |     737 | COMMIT 737

Différence entre get_changes et peek_changes

-- peek_changes : Lit sans consommer (le pointeur ne bouge pas)
SELECT * FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL);
-- Appeler plusieurs fois retourne les mêmes données

-- get_changes : Lit ET consomme (le pointeur avance)
SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL);
-- Appeler à nouveau retourne les NOUVEAUX changements uniquement

wal2json : Format JSON pour le CDC

Installation

# Debian/Ubuntu
sudo apt-get install postgresql-16-wal2json

# RHEL/CentOS
sudo yum install wal2json_16

# Ou compilation depuis les sources
git clone https://github.com/eulerto/wal2json.git  
cd wal2json  
make  
sudo make install  

Utilisation

-- Créer un slot avec wal2json
SELECT pg_create_logical_replication_slot('json_slot', 'wal2json');

-- Insérer des données
INSERT INTO customers (name, email) VALUES ('Charlie', 'charlie@example.com');  
UPDATE customers SET name = 'Charles' WHERE id = 3;  

-- Lire les changements en JSON
SELECT * FROM pg_logical_slot_get_changes(
    'json_slot',
    NULL,
    NULL,
    'include-timestamp', 'true',
    'include-lsn', 'true'
);

Résultat JSON :

{
  "change": [
    {
      "kind": "insert",
      "schema": "public",
      "table": "customers",
      "columnnames": ["id", "name", "email"],
      "columntypes": ["integer", "character varying(100)", "character varying(255)"],
      "columnvalues": [3, "Charlie", "charlie@example.com"],
      "timestamp": "2025-01-15 10:30:00.123456+00",
      "lsn": "0/16B6150"
    }
  ]
}

Options de Configuration wal2json

SELECT * FROM pg_logical_slot_get_changes(
    'json_slot',
    NULL,
    NULL,
    -- Options disponibles
    'include-timestamp', 'true',      -- Inclure le timestamp
    'include-lsn', 'true',            -- Inclure le LSN
    'include-xids', 'true',           -- Inclure l'ID de transaction
    'include-schemas', 'true',        -- Inclure le schéma
    'include-types', 'true',          -- Inclure les types de colonnes
    'include-type-oids', 'false',     -- Inclure les OIDs des types
    'include-not-null', 'true',       -- Inclure l'info NOT NULL
    'pretty-print', 'true',           -- JSON formaté
    'write-in-chunks', 'false',       -- Écriture par morceaux
    'filter-tables', 'public.logs',   -- Exclure certaines tables
    'add-tables', 'public.customers'  -- Inclure seulement certaines tables
);

Debezium : CDC pour la Production

Qu'est-ce que Debezium ?

Debezium est une plateforme CDC open-source qui capture les changements de bases de données et les diffuse vers Apache Kafka. C'est la solution de référence pour le CDC en production.

┌─────────────────────────────────────────────────────────────────────────┐
│                        Architecture Debezium                            │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌──────────────┐     ┌──────────────┐     ┌──────────────────────────┐ │
│  │              │     │              │     │                          │ │
│  │  PostgreSQL  │────►│   Debezium   │────►│      Apache Kafka        │ │
│  │              │     │  Connector   │     │                          │ │
│  │  (Source)    │     │              │     │  ┌─────────────────────┐ │ │
│  └──────────────┘     └──────────────┘     │  │ Topic: dbserver.    │ │ │
│        │                     │             │  │ public.customers    │ │ │
│        │                     │             │  └─────────────────────┘ │ │
│     WAL / Logical            │             │  ┌─────────────────────┐ │ │
│     Replication              │             │  │ Topic: dbserver.    │ │ │
│                              │             │  │ public.orders       │ │ │
│                              │             │  └─────────────────────┘ │ │
│                              │             └──────────────────────────┘ │
│                              │                         │                │
│                              │                         ▼                │
│                              │             ┌──────────────────────────┐ │
│                              │             │      Consumers           │ │
│                              │             │  - Elasticsearch         │ │
│                              │             │  - Data Warehouse        │ │
│                              │             │  - Microservices         │ │
│                              │             │  - Analytics             │ │
│                              │             └──────────────────────────┘ │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Format des Messages Debezium

Debezium produit des messages avec une structure riche :

{
  "schema": { ... },
  "payload": {
    "before": null,
    "after": {
      "id": 1,
      "name": "Alice",
      "email": "alice@example.com"
    },
    "source": {
      "version": "2.4.0.Final",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1705312200123,
      "snapshot": "false",
      "db": "mydb",
      "schema": "public",
      "table": "customers",
      "txId": 734,
      "lsn": 23456789,
      "xmin": null
    },
    "op": "c",
    "ts_ms": 1705312200456,
    "transaction": null
  }
}
Champ Description
before État de la ligne AVANT la modification (null pour INSERT)
after État de la ligne APRÈS la modification (null pour DELETE)
op Type d'opération : c (create), u (update), d (delete), r (read/snapshot)
source Métadonnées : base, table, LSN, timestamp, etc.
ts_ms Timestamp de capture par Debezium

Configuration Debezium pour PostgreSQL

docker-compose.yml

version: '3.8'  
services:  
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: debezium/connect:2.4
    depends_on:
      - kafka
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses

Enregistrer le Connecteur PostgreSQL

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "cdc_user",
      "database.password": "secure_password",
      "database.dbname": "mydb",
      "topic.prefix": "dbserver1",
      "plugin.name": "pgoutput",
      "slot.name": "debezium_slot",
      "publication.name": "dbz_publication",
      "table.include.list": "public.customers,public.orders",
      "column.exclude.list": "public.customers.password_hash",
      "decimal.handling.mode": "string",
      "time.precision.mode": "adaptive_time_microseconds"
    }
  }'

Options de Configuration Importantes

Option Description
topic.prefix Préfixe des topics Kafka (ex: dbserver1.public.customers)
plugin.name Plugin de décodage (pgoutput recommandé)
slot.name Nom du slot de réplication
table.include.list Tables à capturer (regex supporté)
table.exclude.list Tables à exclure
column.exclude.list Colonnes sensibles à exclure
snapshot.mode Mode de snapshot initial (initial, never, always)
tombstones.on.delete Générer des tombstones Kafka pour les DELETE

Consommer les Événements CDC

Python avec kafka-python

from kafka import KafkaConsumer  
import json  

def consume_cdc_events():
    consumer = KafkaConsumer(
        'dbserver1.public.customers',
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='cdc-consumer-group',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )

    print("En écoute des changements...")

    for message in consumer:
        payload = message.value['payload']

        op = payload['op']
        table = payload['source']['table']

        if op == 'c':  # CREATE (INSERT)
            print(f"[INSERT] {table}: {payload['after']}")
            handle_insert(table, payload['after'])

        elif op == 'u':  # UPDATE
            print(f"[UPDATE] {table}:")
            print(f"  Before: {payload['before']}")
            print(f"  After:  {payload['after']}")
            handle_update(table, payload['before'], payload['after'])

        elif op == 'd':  # DELETE
            print(f"[DELETE] {table}: {payload['before']}")
            handle_delete(table, payload['before'])

        elif op == 'r':  # READ (snapshot)
            print(f"[SNAPSHOT] {table}: {payload['after']}")
            handle_snapshot(table, payload['after'])

def handle_insert(table, data):
    # Indexer dans Elasticsearch
    # Mettre à jour un cache
    # Notifier un service
    pass

def handle_update(table, before, after):
    # Comparer les changements
    changes = {k: (before.get(k), v) for k, v in after.items() if before.get(k) != v}
    print(f"  Changes: {changes}")

def handle_delete(table, data):
    # Supprimer de l'index
    # Invalider le cache
    pass

def handle_snapshot(table, data):
    # Chargement initial
    pass

if __name__ == "__main__":
    consume_cdc_events()

Synchronisation avec Elasticsearch

from kafka import KafkaConsumer  
from elasticsearch import Elasticsearch  
import json  

es = Elasticsearch(['http://localhost:9200'])

def sync_to_elasticsearch():
    consumer = KafkaConsumer(
        'dbserver1.public.products',
        bootstrap_servers=['localhost:9092'],
        group_id='es-sync-group',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )

    for message in consumer:
        payload = message.value['payload']
        op = payload['op']

        if op in ('c', 'u', 'r'):  # INSERT, UPDATE, SNAPSHOT
            doc = payload['after']
            doc_id = doc['id']

            es.index(
                index='products',
                id=doc_id,
                document=doc
            )
            print(f"Indexed product {doc_id}")

        elif op == 'd':  # DELETE
            doc_id = payload['before']['id']

            es.delete(
                index='products',
                id=doc_id,
                ignore=[404]
            )
            print(f"Deleted product {doc_id}")

if __name__ == "__main__":
    sync_to_elasticsearch()

Cas d'Usage du CDC

1. Synchronisation de Cache

PostgreSQL ──► Debezium ──► Kafka ──► Consumer ──► Redis Cache

Quand une donnée change dans PostgreSQL,  
le cache Redis est automatiquement invalidé ou mis à jour.  

2. Recherche Full-Text

PostgreSQL ──► Debezium ──► Kafka ──► Consumer ──► Elasticsearch

Les produits modifiés sont automatiquement réindexés  
pour la recherche.  

3. Data Warehouse / Analytics

PostgreSQL ──► Debezium ──► Kafka ──► Consumer ──► BigQuery/Snowflake

Les données opérationnelles sont répliquées en temps réel  
vers le data warehouse pour l'analytics.  

4. Microservices : Propagation d'Événements

Service A (PostgreSQL) ──► Debezium ──► Kafka ──► Service B, C, D

Les changements dans un service sont propagés aux autres  
sans couplage direct.  

5. Audit et Compliance

PostgreSQL ──► Debezium ──► Kafka ──► Consumer ──► Audit Store

Chaque modification est capturée et archivée  
pour la conformité réglementaire.  

Comparaison : NOTIFY/LISTEN vs CDC

Critère NOTIFY/LISTEN CDC (Debezium)
Infrastructure PostgreSQL seul Kafka + Connect + PostgreSQL
Complexité Simple Plus complexe
Persistance ❌ Non (fire-and-forget) ✅ Oui (Kafka)
Replay ❌ Non ✅ Oui
Garantie de livraison ❌ At-most-once ✅ At-least-once
Payload 8000 caractères max Illimité
Performance Très rapide Quelques secondes de latence
Modification du code Triggers requis Aucune modification
Multi-consommateurs Oui (même connexion) Oui (groupes Kafka)
Historique des données ❌ Non ✅ Before/After

Quand Utiliser Quoi ?

Scénario Recommandation
Dashboard temps réel interne NOTIFY/LISTEN
Invalidation de cache local NOTIFY/LISTEN
Synchronisation vers systèmes externes CDC
Événements critiques ne devant pas être perdus CDC
Audit complet avec historique CDC
Infrastructure simple, pas de Kafka NOTIFY/LISTEN
Replay des événements nécessaire CDC

Gestion des Slots de Réplication

Surveillance des Slots

-- État des slots de réplication
SELECT
    slot_name,
    plugin,
    slot_type,
    active,
    restart_lsn,
    confirmed_flush_lsn,
    pg_size_pretty(
        pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)
    ) AS replication_lag
FROM pg_replication_slots;

Attention : Slots Non Consommés

Un slot qui n'est plus consommé empêche PostgreSQL de recycler le WAL, ce qui peut remplir le disque !

-- Trouver les slots problématiques (lag > 1GB)
SELECT
    slot_name,
    active,
    pg_size_pretty(
        pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)
    ) AS lag
FROM pg_replication_slots  
WHERE pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) > 1073741824;  

-- Supprimer un slot abandonné (ATTENTION : perte de données CDC)
SELECT pg_drop_replication_slot('abandoned_slot');

Configuration de Sécurité

-- Limiter la rétention WAL par slot (PostgreSQL 13+)
ALTER SYSTEM SET max_slot_wal_keep_size = '10GB';  
SELECT pg_reload_conf();  

CDC Sans Kafka : Alternatives Légères

Si Kafka est trop lourd pour votre contexte, des alternatives existent :

1. pg_recvlogical (CLI PostgreSQL)

# Écouter les changements en continu
pg_recvlogical -d mydb \
    --slot=my_slot \
    --plugin=wal2json \
    --start \
    -f - \
    -o include-timestamp=true

2. Script Python Direct

import psycopg  
import json  

def consume_logical_changes():
    conn = psycopg.connect(
        "host=localhost dbname=mydb user=cdc_user password=secret"
    )
    conn.autocommit = True

    # Créer le slot si nécessaire
    with conn.cursor() as cur:
        cur.execute("""
            SELECT pg_create_logical_replication_slot(
                'python_slot', 'wal2json'
            )
        """)

    print("Écoute des changements...")

    while True:
        with conn.cursor() as cur:
            cur.execute("""
                SELECT * FROM pg_logical_slot_get_changes(
                    'python_slot', NULL, NULL,
                    'include-timestamp', 'true'
                )
            """)

            for row in cur.fetchall():
                lsn, xid, data = row
                changes = json.loads(data)

                for change in changes.get('change', []):
                    print(f"[{change['kind'].upper()}] "
                          f"{change['schema']}.{change['table']}")
                    print(f"  Data: {change.get('columnvalues', change.get('oldkeys'))}")

        # Pause entre les polls
        time.sleep(1)

if __name__ == "__main__":
    consume_logical_changes()

3. Solutions Managées

Solution Description
AWS DMS Service managé AWS pour CDC
Fivetran SaaS pour synchronisation de données
Airbyte Open-source, alternative à Fivetran
Striim Plateforme CDC entreprise

Bonnes Pratiques

1. Toujours Configurer REPLICA IDENTITY

Pour capturer les valeurs BEFORE lors des UPDATE/DELETE :

-- FULL : Toutes les colonnes dans before (recommandé pour CDC)
ALTER TABLE customers REPLICA IDENTITY FULL;

-- DEFAULT : Seulement la clé primaire dans before
ALTER TABLE orders REPLICA IDENTITY DEFAULT;

-- USING INDEX : Utiliser un index unique
ALTER TABLE products REPLICA IDENTITY USING INDEX products_sku_key;

2. Exclure les Colonnes Sensibles

{
  "column.exclude.list": "public.users.password_hash,public.users.ssn"
}

3. Monitorer les Slots

-- Alerte si lag > 500MB
SELECT slot_name  
FROM pg_replication_slots  
WHERE pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) > 524288000;  

4. Gérer les Schémas Évolutifs

Debezium gère les changements de schéma, mais :

  • Testez les migrations en staging
  • Évitez de renommer des colonnes (préférez ajouter/supprimer)
  • Utilisez le Schema Registry pour la compatibilité

5. Idempotence des Consommateurs

Les messages peuvent être délivrés plusieurs fois (at-least-once) :

def handle_event(event):
    event_id = f"{event['source']['lsn']}_{event['source']['txId']}"

    # Vérifier si déjà traité
    if is_processed(event_id):
        return

    # Traiter l'événement
    process(event)

    # Marquer comme traité
    mark_processed(event_id)

Conclusion

Le Logical Decoding et le Change Data Capture transforment PostgreSQL en une source d'événements puissante. Sans modifier vos applications, vous pouvez :

  • Capturer chaque INSERT, UPDATE, DELETE en temps réel
  • Synchroniser des systèmes externes (Elasticsearch, caches, data warehouses)
  • Construire des architectures événementielles découplées
  • Garantir un audit complet de toutes les modifications

Debezium avec Kafka est la solution de production la plus robuste, offrant persistance, replay, et scalabilité. Pour des besoins plus simples, wal2json avec un script de consommation peut suffire.

Le CDC est le pont entre le monde transactionnel de PostgreSQL et le monde événementiel des architectures modernes.


Points Clés à Retenir

  • CDC : Capture des changements de données depuis le journal de transactions
  • WAL : Source de vérité pour toutes les modifications PostgreSQL
  • Logical Decoding : Transformation du WAL binaire en événements lisibles
  • Slots de réplication : Pointeurs qui garantissent la rétention du WAL
  • Plugins : pgoutput (natif), wal2json (JSON), test_decoding (test)
  • Debezium : Plateforme CDC de référence pour Kafka
  • REPLICA IDENTITY FULL : Nécessaire pour capturer l'état "before"
  • Surveillance : Monitorer les slots pour éviter de remplir le disque
  • Idempotence : Les consommateurs doivent gérer les doublons

⏭️ Debezium pour streaming d'événements