Database Engineering

Debezium and Change Data Capture: Stream MySQL and PostgreSQL to Kafka

Debezium reads MySQL binary logs and PostgreSQL WAL to stream every row-level change as a Kafka event in near real-time — replacing nightly batch ETL with sub-second data propagation.

JusDB Team
April 3, 2023
11 min read
206 views

Last year, a fintech team running transaction reporting came to JusDB with a familiar complaint: their nightly batch ETL job was producing reports that were already eight hours stale by the time analysts opened them each morning. Every fraud detection rule was running against yesterday's data. Every customer balance displayed in the dashboard was a snapshot from midnight. Their data warehouse and application database were two different realities, and the gap was widening as transaction volume grew. Within three weeks, we replaced their nightly batch pipeline with Debezium Change Data Capture running against their MySQL instance. Report latency dropped from 8 hours to under 200 milliseconds. Fraud detection rules now run against data that is seconds old, not hours.

This guide covers exactly how to implement CDC with Debezium for both MySQL and PostgreSQL — from initial configuration through production hardening.

TL;DR
  • Debezium reads MySQL binary logs and PostgreSQL WAL to stream every row-level change as a Kafka event in near real-time — no polling, no full table scans.
  • MySQL requires binlog_format=ROW, binlog_row_image=FULL, a unique server-id, and a replication user with REPLICATION SLAVE and REPLICATION CLIENT grants.
  • PostgreSQL requires wal_level=logical, a logical replication slot, and a publication defining which tables to capture.
  • Every Debezium event carries a before state, an after state, and an op field (c=insert, u=update, d=delete, r=snapshot read).
  • Replication slot lag is the top production risk for PostgreSQL — if your Kafka Connect worker stops, WAL accumulates and can fill disk.
  • Debezium delivers at-least-once semantics; your consumers must be idempotent or use deduplication logic.

What is Change Data Capture (CDC)

Change Data Capture is the practice of tracking row-level changes in a database — inserts, updates, and deletes — and streaming those changes to downstream systems as they happen. The key distinction from traditional ETL is the source of truth: CDC reads the database's own internal transaction log rather than querying tables periodically.

Traditional polling-based ETL asks the database "what changed since I last checked?" by running queries like SELECT * FROM orders WHERE updated_at > :last_run. This approach requires an updated_at column on every table, misses hard deletes entirely, creates query load on the primary database, and is bounded by how frequently you can afford to poll. Polling every 5 minutes means 5-minute lag by definition.

CDC reads the transaction log directly. The database already writes every change to the log for durability and replication purposes. CDC taps into that existing stream, which means zero additional write overhead on the primary and changes visible in milliseconds rather than minutes.

CDC Use Cases

The same CDC stream supports multiple downstream consumers simultaneously:

  • Real-time analytics: populate a data warehouse or OLAP system with changes as they occur, replacing nightly batch loads
  • Cache invalidation: invalidate Redis or Memcached entries the moment a row changes in the database
  • Microservice synchronization: keep a read model or search index (Elasticsearch, OpenSearch) consistent with the source of truth without polling APIs
  • Audit logs: produce an immutable, ordered record of every data change for compliance requirements
  • ETL and data lake ingestion: replace scheduled Spark or Glue jobs with continuous streaming into S3, BigQuery, or Snowflake

How Debezium Works

Debezium Architecture

Debezium is not a standalone server. It runs as a set of Kafka Connect source connectors deployed inside a Kafka Connect worker cluster. Each connector is responsible for one database instance. The connector reads the database transaction log, converts each change event into a JSON (or Avro) Kafka message, and publishes it to a Kafka topic. Downstream consumers — Kafka Streams applications, Flink jobs, Spark Structured Streaming, or simple consumer groups — subscribe to those topics and process changes at their own pace.

The flow is: Database transaction log → Debezium Kafka Connect source connector → Kafka topic → downstream consumers. Kafka acts as the durable buffer between the database and all downstream systems. If a consumer is slow or temporarily offline, Kafka retains the events and the consumer catches up when it restarts — without holding up replication on the database side (for MySQL; PostgreSQL has a caveat covered later).

Reading MySQL Binlog

Debezium's MySQL connector acts as a MySQL replica. It registers with the MySQL primary using a configured server-id, requests the binary log stream from a specified binlog position, and reads each binlog event. When binlog_format=ROW is set, MySQL writes the complete before and after state of every changed row to the binlog. Debezium reads these row events and converts them to Kafka messages.

The connector maintains its current binlog position (file name + offset) in a Kafka topic called the database history topic. If the connector restarts, it resumes from the last committed offset rather than replaying from the beginning of the binlog.

Reading PostgreSQL WAL (Logical Replication Slots)

PostgreSQL uses a different mechanism. Instead of registering as a physical replica, Debezium uses PostgreSQL's logical decoding infrastructure. You create a logical replication slot and a publication, and Debezium consumes the decoded WAL stream from that slot using the pgoutput or decoderbufs output plugin.

PostgreSQL WAL decoding happens on the server side: PostgreSQL converts internal WAL records into a logical row-change format before sending them to Debezium. This means Debezium sees clean before/after row data without needing to understand PostgreSQL's internal storage format.

Warning

PostgreSQL will retain WAL indefinitely for an active replication slot, even if Debezium is not consuming from it. If your Kafka Connect worker stops, restarts slowly, or falls behind, PostgreSQL accumulates WAL and can fill your disk. Monitor pg_replication_slots continuously in production.


Setting Up Debezium for MySQL

Prerequisites: MySQL Binary Log Configuration

Debezium requires row-based binary logging with full row images. Add or confirm these settings in my.cnf or my.ini under the [mysqld] section:

bash
[mysqld]
server-id         = 1          # Must be unique across all MySQL instances in the topology
log_bin           = mysql-bin  # Enable binary logging
binlog_format     = ROW        # Required: row-based format captures full row changes
binlog_row_image  = FULL       # Required: captures complete before and after row state
expire_logs_days  = 7          # Retain 7 days of binlogs for connector recovery

Restart MySQL after making these changes. Confirm binary logging is active:

sql
SHOW VARIABLES LIKE 'log_bin';
-- +---------------+-------+
-- | Variable_name | Value |
-- +---------------+-------+
-- | log_bin       | ON    |
-- +---------------+-------+

SHOW VARIABLES LIKE 'binlog_format';
-- +---------------+-------+
-- | Variable_name | Value |
-- +---------------+-------+
-- | binlog_format | ROW   |
-- +---------------+-------+

Create a MySQL Replication User

sql
CREATE USER 'debezium'@'%' IDENTIFIED BY 'your-secure-password';

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
  ON *.* TO 'debezium'@'%';

FLUSH PRIVILEGES;
Important

All five grants are required. REPLICATION SLAVE allows the connector to register as a replica and read the binlog stream. REPLICATION CLIENT allows it to query binlog status. RELOAD is needed for the initial snapshot phase. SHOW DATABASES is required for schema discovery. Missing any grant will cause the connector to fail at startup or during the snapshot phase.

MySQL Connector Configuration

Deploy the connector by POSTing this JSON payload to the Kafka Connect REST API:

json
{
  "name": "mysql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",

    "database.hostname": "your-mysql-host.example.com",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "your-secure-password",
    "database.server.id": "184054",

    "topic.prefix": "mycompany",
    "database.include.list": "orders,inventory",
    "table.include.list": "orders.transactions,orders.customers,inventory.products",

    "database.history.kafka.bootstrap.servers": "kafka-broker-1:9092,kafka-broker-2:9092",
    "database.history.kafka.topic": "schema-changes.mycompany",

    "include.schema.changes": "true",
    "snapshot.mode": "initial",

    "decimal.handling.mode": "double",
    "time.precision.mode": "connect",
    "converters": "temporalConverter",
    "temporalConverter.type": "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter"
  }
}

Deploying with Kafka Connect REST API

bash
# Deploy the connector
curl -X POST \
  -H "Content-Type: application/json" \
  -d @mysql-connector.json \
  http://kafka-connect:8083/connectors

# Check connector status
curl http://kafka-connect:8083/connectors/mysql-source-connector/status

# List all connectors
curl http://kafka-connect:8083/connectors

# Pause the connector (stops consuming but retains offset)
curl -X PUT http://kafka-connect:8083/connectors/mysql-source-connector/pause

# Resume the connector
curl -X PUT http://kafka-connect:8083/connectors/mysql-source-connector/resume

Verifying Events in Kafka

Debezium creates one Kafka topic per table using the naming convention {topic.prefix}.{database}.{table}. For the configuration above, expect topics like mycompany.orders.transactions and mycompany.orders.customers.

bash
# List topics created by Debezium
kafka-topics.sh --list \
  --bootstrap-server kafka-broker-1:9092 | grep mycompany

# Consume events from the transactions topic
kafka-console-consumer.sh \
  --bootstrap-server kafka-broker-1:9092 \
  --topic mycompany.orders.transactions \
  --from-beginning \
  --max-messages 5
Tip

During the initial snapshot, Debezium emits one event per existing row with "op": "r" (read). These can number in the millions for large tables. Monitor connector lag using GET /connectors/{name}/status and check that the connector moves from SNAPSHOT to STREAMING state before declaring the setup complete.


Setting Up Debezium for PostgreSQL

Prerequisites: WAL Configuration and Replication Slot

Confirm or set wal_level=logical in postgresql.conf. This requires a PostgreSQL restart:

bash
# Check current WAL level (no restart needed to check)
psql -c "SHOW wal_level;"
--  wal_level
-- -----------
--  replica

# Set in postgresql.conf
wal_level = logical

# After restarting PostgreSQL, create the replication slot and publication
sql
-- Create a publication for the tables you want to capture
-- Using FOR ALL TABLES captures everything; scope it down in production
CREATE PUBLICATION debezium_pub FOR TABLE
  public.orders,
  public.customers,
  public.products;

-- Create the logical replication slot using pgoutput (built-in, no plugin install needed)
SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');

-- Create a dedicated Debezium user
CREATE USER debezium WITH REPLICATION LOGIN PASSWORD 'your-secure-password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;

-- Allow the user to read the replication slot
GRANT pg_read_all_data TO debezium;   -- PostgreSQL 14+; adjust for older versions

PostgreSQL Connector Configuration

json
{
  "name": "postgresql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",

    "database.hostname": "your-postgres-host.example.com",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "your-secure-password",
    "database.dbname": "myapp",

    "topic.prefix": "mycompany",
    "table.include.list": "public.orders,public.customers,public.products",

    "plugin.name": "pgoutput",
    "publication.name": "debezium_pub",
    "slot.name": "debezium_slot",

    "snapshot.mode": "initial",
    "publication.autocreate.mode": "disabled",

    "decimal.handling.mode": "double",
    "hstore.handling.mode": "json",
    "interval.handling.mode": "microseconds",

    "heartbeat.interval.ms": "10000",
    "heartbeat.action.query": "INSERT INTO public.debezium_heartbeat (ts) VALUES (now()) ON CONFLICT (id) DO UPDATE SET ts = EXCLUDED.ts"
  }
}

Monitoring Replication Slot Lag

Run this query regularly — automate it as a Prometheus metric or a CloudWatch alarm:

sql
SELECT
    slot_name,
    plugin,
    slot_type,
    active,
    active_pid,
    pg_size_pretty(
        pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)
    ) AS replication_lag_bytes,
    pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS lag_bytes_raw,
    (pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) / 1024 / 1024)::bigint AS lag_mb
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
Warning

If active is false and lag_bytes_raw is growing, your Debezium connector is not consuming the slot. PostgreSQL will accumulate WAL indefinitely. Set an alert at 5 GB of lag and a hard limit — drop the slot if it exceeds your disk safety threshold. An inactive slot with multi-terabyte lag can crash your database server.


Understanding Debezium Event Structure

Every Debezium change event follows the same envelope structure regardless of the source database. Understanding this structure is essential for writing correct consumers.

json
{
  "schema": { "..." },
  "payload": {
    "before": {
      "id": 1042,
      "customer_id": 7,
      "amount": 149.99,
      "status": "pending"
    },
    "after": {
      "id": 1042,
      "customer_id": 7,
      "amount": 149.99,
      "status": "completed"
    },
    "source": {
      "version": "2.5.0.Final",
      "connector": "mysql",
      "name": "mycompany",
      "ts_ms": 1708560000000,
      "db": "orders",
      "table": "transactions",
      "server_id": 184054,
      "gtid": null,
      "file": "mysql-bin.000042",
      "pos": 1398271,
      "row": 0
    },
    "op": "u",
    "ts_ms": 1708560000123,
    "transaction": null
  }
}

The op field defines the change type:

  • c — create (INSERT): before is null, after has the new row
  • u — update (UPDATE): both before and after are populated
  • d — delete (DELETE): before has the deleted row, after is null
  • r — read (snapshot): emitted during the initial snapshot phase; before is null, after has the row

The source.ts_ms field is the timestamp of the original database transaction, not the time Debezium processed the event. Use this for event-time processing in Flink or Kafka Streams — do not use ts_ms at the top level (that is when Debezium created the Kafka record).


Handling Schema Changes

Schema evolution is one of the harder operational challenges in CDC pipelines. When a developer adds a column to a table, Debezium must update the schema it uses to serialize events — without breaking existing consumers reading the old schema.

Debezium handles MySQL DDL changes (ALTER TABLE, CREATE TABLE, DROP TABLE) by storing the complete schema history in the database.history.kafka.topic. When the connector restarts, it replays this history to reconstruct the correct schema for each binlog position. This means the history topic must never be deleted or truncated while the connector is in use.

Tip

Use Confluent Schema Registry (or Apicurio Registry) with Avro serialization for production CDC pipelines. Schema Registry stores versioned schemas and enforces compatibility rules (backward, forward, or full compatibility). This prevents consumers from breaking when producers add new fields, and makes schema evolution visible and auditable.

For PostgreSQL, schema changes that alter the publication's table list require manual intervention: you must update the publication and, in some cases, recreate the replication slot if the column layout changes in a way that pgoutput cannot decode against the existing slot's schema.


Production Considerations

Snapshot Mode Selection

The snapshot.mode setting controls what happens when the connector starts for the first time (or when its stored offset is lost):

  • initial (default): performs a consistent snapshot of all included tables, then streams changes. Use this for a clean start with historical data.
  • schema_only: captures the schema but not existing rows, then streams new changes only. Use when downstream systems already have historical data and you only need changes going forward.
  • never: assumes the database position is known and skips snapshotting entirely. Use only when you have manually verified the connector's stored offset is still valid in the binlog or WAL.
  • exported (MySQL only): uses a consistent snapshot aligned to a logical backup (mysqldump). Useful when starting CDC alongside a migration.
Important

If you use snapshot.mode=never and the binlog position stored in Kafka offsets has already been purged from MySQL (because expire_logs_days rotated the file), Debezium will fail with a Could not find first log file name in binary log index file error. Ensure your binlog retention is at least as long as the maximum interval between connector restarts.

Offset Management and At-Least-Once Delivery

Debezium commits offsets to Kafka after each batch of events is successfully written to Kafka topics. If the connector crashes between writing events and committing offsets, the same events will be re-emitted after restart. This is at-least-once delivery — events may be duplicated, never silently lost.

Downstream consumers must handle duplicates. Common strategies include using the event's primary key and source position as a deduplication key in the destination system, or using upsert semantics (INSERT ... ON CONFLICT DO UPDATE in PostgreSQL, MERGE in Spark) so that replaying the same event is idempotent.

Filtering Tables and Columns

json
{
  "table.include.list": "orders.transactions,orders.customers",
  "table.exclude.list": "orders.temp_import,orders.audit_log",

  "column.exclude.list": "orders.customers.credit_card_number,orders.customers.ssn",

  "column.mask.hash.SHA-256.with.salt.CzQma0OczjhHbP": "orders.customers.email"
}

Use column.exclude.list to drop sensitive columns from the CDC stream entirely. Use column.mask.hash to replace PII fields with a consistent SHA-256 hash — useful for downstream analytics that need to join on a user identifier without exposing raw email addresses or phone numbers.


Debezium vs Alternatives

Tool Source Support Output Target Notes
Debezium MySQL, PostgreSQL, MongoDB, Oracle, SQL Server, Db2, Cassandra Kafka (primary), Pulsar, Kinesis, HTTP Most mature CDC ecosystem; runs on Kafka Connect; strong schema change handling; active community
Maxwell MySQL only Kafka, Kinesis, SQS, Redis, stdout Simpler to deploy than Debezium for MySQL-only shops; JSON output only; less active maintenance
Canal MySQL only Custom TCP, Kafka, RocketMQ Alibaba open-source; popular in Chinese tech ecosystem; documentation primarily in Chinese
AWS DMS (CDC mode) MySQL, PostgreSQL, Oracle, SQL Server, and more S3, Kinesis, Kafka MSK, Redshift Fully managed; no Kafka required; higher latency (~seconds); limited transformation capabilities; per-hour pricing

For teams already running Kafka, Debezium is the clear choice: it integrates natively with Kafka Connect, handles schema changes reliably, and supports the widest range of source databases. Maxwell is a viable alternative for MySQL-only pipelines where Kafka Connect infrastructure is too heavy. AWS DMS is the right pick when you need a managed solution with zero infrastructure and can accept slightly higher latency and cost.


Key Takeaways
  • CDC reads database transaction logs directly — not polling — delivering sub-second change propagation with zero additional write load on the primary database.
  • MySQL requires binlog_format=ROW, binlog_row_image=FULL, a unique server-id, and a replication user with exactly five grants: SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT.
  • PostgreSQL requires wal_level=logical, a named replication slot, and a publication; the slot will accumulate WAL indefinitely if Debezium stops consuming — monitor pg_replication_slots lag as a first-class production alert.
  • Every Debezium event carries before and after payloads plus an op field; build consumers around op to handle inserts (c), updates (u), deletes (d), and snapshot reads (r) correctly.
  • Debezium delivers at-least-once semantics — design all downstream consumers to be idempotent using upsert semantics or explicit deduplication on the event's source position.
  • Use column.exclude.list or column.mask.hash to strip or pseudonymize PII before it reaches Kafka; retrofitting data governance after the fact is far more expensive than enforcing it at the connector layer.

Working with JusDB on Debezium and CDC

JusDB designs and operates CDC pipelines using Debezium for teams moving from batch ETL to real-time data streaming. We configure MySQL binlog and PostgreSQL logical replication, tune Kafka Connect workers, and monitor replication slot health — so your downstream systems are always current.

Explore JusDB Debezium Services →  |  Talk to a DBA

Related reading:

Share this article