Database Engineering

MySQL Binlog to Kafka: Building a CDC Pipeline

Build a MySQL CDC pipeline using binlog and Kafka — Debezium, Maxwell, or Canal — trade-offs and production setup

JusDB Team
December 7, 2022
12 min read
153 views
TL;DR
  • MySQL CDC via binlog requires binlog_format=ROW and binlog_row_image=FULL — anything else gives you incomplete change events.
  • Three mature tools exist: Debezium (full ecosystem, schema registry), Maxwell (simple JSON, no registry), and Canal (Alibaba-origin, Java-based, Chinese-first docs).
  • GTID-based offset tracking is more reliable than file+position for failover scenarios.
  • On RDS MySQL, binlog access is restricted — you cannot use SUPER, and retention requires explicit parameter group configuration.
  • Monitor binlog position lag to catch consumer drift before it causes data loss.

Your MySQL database is the system of record. Kafka is your streaming backbone. The gap between them — getting every insert, update, and delete into Kafka in real time — is where CDC pipelines live, and where teams routinely spend weeks fighting configuration problems they didn't know existed. Binlog-based CDC is the right approach: it's low-latency, non-invasive to the application, and captures every row-level change. But getting it right requires understanding MySQL internals, replication permissions, offset tracking semantics, and the operational trade-offs between the three major open-source tools. This post gives you the full picture.


Architecture Overview

A MySQL binlog-to-Kafka CDC pipeline works by impersonating a MySQL replica. The CDC connector connects to MySQL using the replication protocol, reads the binary log stream, decodes each change event, and publishes it to a Kafka topic. From MySQL's perspective, the connector is just another replica — it sends a COM_BINLOG_DUMP command and streams events.

The high-level data flow looks like this:

text
MySQL Primary
     │
     │  binlog stream (replication protocol)
     ▼
CDC Connector (Debezium / Maxwell / Canal)
     │
     │  serialized change events (JSON / Avro)
     ▼
Kafka Topic(s)
     │
     ├── Stream Processor (Flink, Spark, KSQL)
     ├── Data Warehouse Sink (Snowflake, BigQuery, ClickHouse)
     └── Search Index Sink (Elasticsearch, OpenSearch)

The connector maintains an offset — either a GTID set or a binlog file+position — that tracks how far it has read. On restart, it resumes from that offset. Before streaming begins, most connectors perform an initial snapshot: a consistent read of existing table data, usually via REPEATABLE READ isolation or a table-level lock, after which streaming takes over.

MySQL Prerequisites

Before you write a single line of connector configuration, MySQL must be configured correctly. These settings cannot be changed on the fly for most configurations; plan for a restart or failover.

Binary Log Format

ini
# my.cnf / my.ini
[mysqld]
server-id          = 1          # must be unique across all servers in the topology
log_bin            = mysql-bin  # enables binary logging
binlog_format      = ROW        # REQUIRED — statement and mixed are unreliable for CDC
binlog_row_image   = FULL       # REQUIRED — captures before and after images of every row
expire_logs_days   = 7          # retain 7 days of binlog for recovery window
Warning

binlog_format=STATEMENT logs SQL statements, not row images. Non-deterministic functions like NOW() or UUID() will produce different values when replayed. binlog_format=MIXED falls back to row format for some statements but is still unreliable for CDC — a connector cannot know which events were logged in which format without parsing each one. Always use ROW.

Warning

binlog_row_image=MINIMAL only logs changed columns, not the full before-image. This breaks any downstream system that needs the complete previous row state (audit logs, conflict detection, UPSERT logic). Use FULL.

Replication User Permissions

Create a dedicated MySQL user for the CDC connector. Do not reuse application credentials.

sql
-- Create the replication user
CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'strong_password_here';

-- Core replication permissions
GRANT REPLICATION SLAVE  ON *.* TO 'cdc_user'@'%';
GRANT REPLICATION CLIENT ON *.* TO 'cdc_user'@'%';

-- Required for initial snapshot and schema introspection
GRANT SELECT ON *.* TO 'cdc_user'@'%';

-- Required by Debezium for FLUSH TABLES WITH READ LOCK during snapshot
-- (skip this on RDS — use snapshotMode=schema_only or snapshotMode=never)
GRANT RELOAD  ON *.* TO 'cdc_user'@'%';

-- Required for Debezium GTID tracking
GRANT SHOW DATABASES ON *.* TO 'cdc_user'@'%';

FLUSH PRIVILEGES;

REPLICATION SLAVE allows the connector to register as a replica and receive binlog events. REPLICATION CLIENT allows it to execute SHOW MASTER STATUS and SHOW BINARY LOGS — required for offset tracking. SELECT is needed for the initial snapshot query.

Tip

On Amazon RDS MySQL, the SUPER privilege is unavailable. Grant RELOAD only if you are on a self-managed MySQL instance. For RDS, set snapshotMode=schema_only in Debezium or use Maxwell's --replication_host without requesting a lock. Also set the binlog_retention_hours parameter in your RDS parameter group — by default, RDS may purge binlogs before your connector can catch up after a restart.

Debezium MySQL Connector

Debezium is the most full-featured option. It integrates with Kafka Connect, supports Avro + Confluent Schema Registry, and emits structured change events with before/after payloads, source metadata, and transaction boundaries.

Connector Configuration

json
{
  "name": "mysql-cdc-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",

    "database.hostname": "mysql.internal",
    "database.port": "3306",
    "database.user": "cdc_user",
    "database.password": "strong_password_here",
    "database.server.id": "223344",
    "database.server.name": "prod_mysql",

    "database.include.list": "orders,inventory",
    "table.include.list": "orders.order_items,inventory.products",

    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.prod_mysql",

    "gtid.source.enabled": "true",
    "snapshot.mode": "initial",
    "snapshot.locking.mode": "minimal",

    "key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "include.schema.changes": "true",
    "decimal.handling.mode": "string",
    "time.precision.mode": "connect"
  }
}

Key parameters to understand:

  • database.server.id: Must be a unique integer across all servers and connectors in your MySQL topology. Conflicts cause silent replication failures.
  • gtid.source.enabled: Set to true if your MySQL cluster uses GTIDs (recommended). Debezium will track position using the GTID set instead of file+position, which survives failovers and primary switches cleanly.
  • snapshot.mode: initial performs a full table scan on first run. schema_only skips data and starts streaming from the current binlog position — useful on RDS or when you only need future changes.
  • snapshot.locking.mode: minimal holds the global read lock only long enough to capture the binlog position, then releases it before reading table data. Use this in production to minimize lock contention.
  • database.history.kafka.topic: Debezium stores the full DDL history in this topic. It must be a compacted, single-partition topic. Never delete it.
Warning

Do not set tasks.max above 1 for the MySQL connector. The MySQL binlog is a single ordered stream — parallelism at the connector level is not supported and will cause undefined behavior. Parallelism happens downstream in Kafka consumers.

Maxwell

Maxwell is a simpler alternative to Debezium, maintained by Zendesk. It connects directly to MySQL, reads the binlog, and publishes JSON change events to Kafka. There is no Kafka Connect dependency, no schema registry requirement, and no Avro serialization — events are plain JSON from the start.

bash
# Run Maxwell with Kafka output
maxwell --user=cdc_user \
        --password=strong_password_here \
        --host=mysql.internal \
        --port=3306 \
        --producer=kafka \
        --kafka.bootstrap.servers=kafka:9092 \
        --kafka_topic=maxwell \
        --filter='include: orders.*, include: inventory.products' \
        --replica_server_id=223345 \
        --gtid_mode=true

A Maxwell change event looks like this:

json
{
  "database": "orders",
  "table": "order_items",
  "type": "insert",
  "ts": 1708000000,
  "xid": 12345,
  "commit": true,
  "data": {
    "id": 9001,
    "order_id": 500,
    "product_id": 42,
    "quantity": 3,
    "price": "29.99"
  }
}

For update events, Maxwell includes an old field containing the before-image of changed columns (not the full row unless binlog_row_image=FULL is set — another reason to enforce it). Maxwell stores its offset state in a MySQL table (maxwell.positions) rather than Kafka, which simplifies operations but adds a dependency on the Maxwell metadata database.

Tip

Maxwell is a strong choice when your consumers expect simple JSON and you want to avoid the operational overhead of Kafka Connect and Schema Registry. It is less suitable if you need Avro serialization, tight Kafka Connect ecosystem integration, or support for multiple source connectors managed through a unified REST API.

Canal

Canal is an open-source MySQL binlog parser from Alibaba, written in Java. It was built to support Alibaba's internal data synchronization needs and has been in production at massive scale. Canal exposes a client-server architecture: a Canal server connects to MySQL and parses the binlog; Canal clients subscribe to change events over a TCP protocol or via a Kafka adapter.

properties
# canal.properties — Canal server configuration
canal.serverMode = kafka
kafka.bootstrap.servers = kafka:9092
kafka.zookeeper.address = zookeeper:2181

# instance.properties — per-instance MySQL configuration
canal.instance.master.address = mysql.internal:3306
canal.instance.dbUsername = cdc_user
canal.instance.dbPassword = strong_password_here
canal.instance.connectionCharset = UTF-8
canal.instance.tsdb.enable = true
canal.instance.gtidon = true
canal.mq.topic = canal-orders
canal.mq.partition = 0

Canal's primary documentation is in Chinese, which can be a barrier for non-Chinese-speaking teams. The community and issue tracker are active but predominantly Mandarin. For teams at Alibaba Cloud or within China's tech ecosystem, Canal is a natural fit. For everyone else, Debezium or Maxwell are easier to operate.

Tool Comparison

Dimension Debezium Maxwell Canal
Deployment model Kafka Connect plugin Standalone JVM process Client-server JVM
Output format Avro (recommended) or JSON JSON only JSON or FlatMessage
Schema Registry Native support Not supported Not supported
Offset storage Kafka Connect offsets topic MySQL table ZooKeeper or local file
GTID support Yes Yes Yes
RDS MySQL compatible Yes (with config adjustments) Yes Yes
Community / docs language English, very active English, moderate Primarily Chinese
Complexity High (Kafka Connect ecosystem) Low Medium
Before-image support Yes (full before/after) Partial (changed columns only by default) Yes

Production Hardening

GTID vs File+Position Offset Tracking

MySQL supports two ways to track replication position. File+position (mysql-bin.000042:8192) is simpler but fragile: after a primary failover, the new primary starts a new binlog file, and your connector's saved position is meaningless. GTID (Global Transaction Identifier) solves this by assigning each committed transaction a globally unique ID. After failover, the connector can present its GTID set to the new primary and resume from the correct transaction regardless of which binlog file it lives in.

sql
-- Enable GTID mode on MySQL (requires restart)
-- Add to my.cnf:
-- gtid_mode = ON
-- enforce_gtid_consistency = ON

-- Verify GTID mode is active
SHOW VARIABLES LIKE 'gtid_mode';
-- Expected: gtid_mode | ON

-- Check current executed GTID set
SHOW MASTER STATUS\G
-- Executed_Gtid_Set: 3E11FA47-71CA-11E1-9E33-C80AA9429562:1-23
Tip

Always enable GTID mode on new MySQL deployments used for CDC. Retrofitting GTID onto an existing non-GTID cluster is possible but requires a careful migration procedure. The operational benefit during failovers — zero manual offset recalculation — is worth the upfront effort.

Binlog Retention

By default, MySQL purges binary logs aggressively. If your CDC connector is down for longer than the retention window, it will return to a binlog position that no longer exists — and you will need to re-snapshot.

sql
-- Self-managed MySQL: set retention to 14 days
SET GLOBAL expire_logs_days = 14;
-- Or in MySQL 8.0+:
SET GLOBAL binlog_expire_logs_seconds = 1209600;

-- Persist in my.cnf:
-- expire_logs_days = 14

-- RDS MySQL: set via parameter group
-- Parameter: binlog_retention_hours = 336  (14 days)
-- This must be set via AWS Console or CLI — cannot be set via SQL on RDS

Monitoring Binlog Position Lag

The most important CDC health metric is how far behind the connector is from the current binlog position. A growing lag means events are accumulating faster than the connector can process — this leads to delayed data and, if binlogs rotate before the connector reads them, irrecoverable data loss.

sql
-- Check current binlog position on the primary
SHOW MASTER STATUS;
-- File: mysql-bin.000042, Position: 209715200

-- Check what your connector last committed
-- For Debezium: query the Kafka Connect offsets topic
-- kafka-console-consumer --topic __consumer_offsets ...

-- For Maxwell: query the positions table
SELECT * FROM maxwell.positions;

-- For replication-based lag (if connector is registered as a replica):
SHOW SLAVE HOSTS;
SHOW PROCESSLIST;

Expose Debezium's JMX metrics to Prometheus using the JMX Exporter agent. The key metric is debezium_mysql_connector_metrics_millisecondsbehindmaster — alert when this exceeds your SLA threshold (commonly 60 seconds for near-real-time pipelines, 5 minutes for analytics).

yaml
# Prometheus alert rule for Debezium lag
groups:
  - name: cdc_alerts
    rules:
      - alert: CDCReplicationLagHigh
        expr: debezium_mysql_connector_metrics_millisecondsbehindmaster > 60000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "CDC replication lag exceeds 60 seconds"
          description: "Connector {{ $labels.connector }} is {{ $value }}ms behind MySQL primary."

RDS MySQL Specific Considerations

Amazon RDS restricts several MySQL privileges that CDC connectors expect. Here is a checklist for RDS deployments:

  • Do not grant SUPER — it is unavailable on RDS. Configure connectors to operate without it.
  • Set binlog_retention_hours in the RDS parameter group — not via SQL. The default may be as low as 0 (immediate purge).
  • RDS Multi-AZ failovers create a new binlog stream. GTID mode is essential for automatic recovery.
  • Use snapshot.mode=schema_only in Debezium to avoid locking issues on large RDS instances during the initial snapshot.
  • For Aurora MySQL, binlog is not enabled by default. Enable it via the cluster parameter group: binlog_format=ROW and set the aurora_binlog_replication_max_yield_seconds parameter to reduce lag on Aurora replicas.
Warning

Aurora MySQL Serverless v1 does not support binlog-based replication at all. If you are running Aurora Serverless v1, you cannot use any of the binlog CDC tools described here. Aurora Serverless v2 does support binlog replication with explicit configuration.

Key Takeaways
  • Set binlog_format=ROW and binlog_row_image=FULL in MySQL before connecting any CDC tool — these are non-negotiable for reliable change capture.
  • Create a dedicated replication user with REPLICATION SLAVE, REPLICATION CLIENT, and SELECT — never reuse application credentials.
  • Use GTID-based offset tracking over file+position for resilience during primary failovers and topology changes.
  • Debezium is the right choice for production Kafka Connect environments with schema evolution requirements; Maxwell wins for simplicity; Canal fits Alibaba Cloud and Chinese-ecosystem teams.
  • Set binlog_retention_hours (RDS) or expire_logs_days (self-managed) to at least 7 days — connector downtime shorter than the retention window should not require a full re-snapshot.
  • Monitor milliseconds-behind-master and alert before lag grows large enough to cause binlog rotation loss.
  • On RDS, avoid SUPER grants and use snapshot.mode=schema_only to sidestep locking restrictions.

Build CDC Pipelines Faster with JusDB

Configuring MySQL binlog CDC correctly is one part of the puzzle. The other part is having a destination that can absorb a high-volume, continuously updated stream from Kafka and make it queryable in real time. JusDB is built for exactly this workload — high-throughput ingestion from Kafka, efficient columnar storage, and fast analytical queries over live data without manual compaction or index maintenance.

If you are building a pipeline from MySQL binlog through Kafka to an analytical layer, JusDB handles the sink side with native Kafka consumer integration, schema-on-write enforcement, and sub-second query latency over freshly landed data. No intermediate staging tables, no batch ETL jobs, no query performance degradation as your dataset grows.

Explore the JusDB documentation to see how to connect your Kafka CDC topics to a JusDB table and start querying change data in real time.

Share this article