Your analytics pipeline is always one batch job away from being stale — and in production, "stale" translates directly to wrong decisions. Apache Flink CDC bridges the gap between your operational databases and your downstream systems by turning every INSERT, UPDATE, and DELETE into a first-class streaming event, with sub-second latency and no polling overhead. Unlike bolt-on solutions that hand off change events to a separate processing layer, Flink CDC integrates capture directly into the Flink runtime, giving you stateful joins, windowed aggregations, and exactly-once delivery without stitching together three different tools. If you are running MySQL or PostgreSQL at scale and feeding a data lake, a real-time OLAP store, or a Kafka topic, this guide walks you through the full production setup from connector configuration to Kubernetes deployment.
- Flink CDC embeds Debezium-based change capture directly inside the Flink runtime, eliminating a separate CDC broker tier.
- Both MySQL and PostgreSQL sources support full snapshot followed by incremental snapshot mode with configurable parallelism.
- Exactly-once semantics are guaranteed via Flink checkpointing — no duplicates even after task manager restarts.
- CDC streams can be joined statelessly or statelessly across sources and written to Kafka, Apache Iceberg, or ClickHouse sinks.
- The Flink Kubernetes Operator handles deployment, scaling, and savepoint management in production clusters.
What Is Flink CDC?
Flink CDC (Change Data Capture) is a set of source connectors for Apache Flink that reads the binary log (binlog for MySQL, WAL for PostgreSQL) and emits row-level change events as a continuous Flink DataStream or SQL table. The connectors are built on top of Debezium's battle-tested log parsing logic, but unlike a standalone Debezium deployment that writes to Kafka, Flink CDC runs the capture logic inside a Flink task manager. This means your change events enter the Flink execution graph directly and can be processed, enriched, windowed, and routed without a Kafka intermediary unless you explicitly want one as a sink.
The practical consequence of this architecture is significant. With a traditional Debezium-to-Kafka pipeline, exactly-once semantics require careful Kafka producer configuration, consumer group management, and idempotent consumers on every downstream system. With Flink CDC, exactly-once is enforced by Flink's checkpointing mechanism across the entire graph — from the binlog offset to the final sink transaction — as a single atomic guarantee.
Flink CDC vs. Debezium: Where the Processing Layer Lives
Debezium's role is change event production. It reads the log, structures the event, and writes it to Kafka. Everything downstream — filtering, joining, aggregating — is the responsibility of a separate consumer application. Flink CDC collapses this boundary: capture, processing, and delivery are a single streaming job. You lose the decoupling benefit of Kafka as an intermediate buffer, but you gain lower end-to-end latency, unified operator state, and a single failure domain to reason about. For teams that already operate Flink for stream processing, adding CDC sources is additive rather than architectural — you are not introducing a new system.
Architecture Overview
A production Flink CDC pipeline consists of four layers:
- Source layer — one or more CDC source operators reading binlog or WAL, each running the initial full snapshot followed by ongoing incremental streaming.
- Processing layer — stateful Flink operators performing joins, deduplication, enrichment, or windowed aggregations over the CDC event stream.
- Sink layer — connectors writing to Kafka topics, Apache Iceberg tables, ClickHouse, or any other Flink-supported sink with transactional guarantees.
- Coordination layer — the Flink JobManager plus the Kubernetes Operator managing checkpoints, savepoints, scaling, and restart strategies.
The binlog offset (MySQL) or LSN (PostgreSQL) is stored as part of the Flink checkpoint. On recovery, the source operator resumes exactly where it left off — no data loss, no re-snapshot of the full table.
MySQL binary logging must be enabled with binlog_format=ROW and binlog_row_image=FULL before connecting a Flink CDC source. Connecting to a server without row-format binlog will cause the connector to fail at snapshot time with an opaque JDBC error. Verify with SHOW VARIABLES LIKE 'binlog_format'; before deploying.
Project Setup and Dependencies
Add the Flink CDC connectors to your Maven project. Use the versions that match your Flink runtime — the connector artifact names changed between Flink CDC 2.x and 3.x:
org.apache.flink
flink-connector-mysql-cdc
3.1.0
org.apache.flink
flink-connector-postgres-cdc
3.1.0
org.apache.iceberg
iceberg-flink-runtime-1.18
1.5.0
If you prefer Flink SQL over the DataStream API, you do not need to add the connector jars to your project — drop the fat JARs into $FLINK_HOME/lib/ and the SQL client picks them up automatically via the factory discovery mechanism.
MySQL CDC Source
The MySQL CDC source uses Debezium's binlog reader wrapped as a Flink SourceFunction (legacy) or the newer Source interface supporting parallelism. The incremental snapshot mode introduced in Flink CDC 2.3 allows multiple subtasks to snapshot different table chunks concurrently, dramatically reducing the time to finish the initial load before switching to streaming mode.
// Java DataStream API — MySQL CDC with incremental snapshot
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MysqlCdcJob {
public static void main(String[] args) throws Exception {
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("mysql.prod.internal")
.port(3306)
.databaseList("orders_db")
.tableList("orders_db.orders", "orders_db.order_items")
.username("flink_cdc")
.password(System.getenv("MYSQL_CDC_PASSWORD"))
// Full snapshot then stream; use StartupOptions.latest() to skip snapshot
.startupOptions(StartupOptions.initial())
// Parallelism for the incremental snapshot phase
.splitSize(8096)
.fetchSize(1024)
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing for exactly-once semantics
env.enableCheckpointing(30_000); // 30 seconds
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
.setParallelism(4) // Parallel incremental snapshot readers
.print()
.setParallelism(1);
env.execute("MySQL CDC Pipeline");
}
}
The splitSize parameter controls how large each table chunk is during the parallel snapshot phase. Larger splits mean fewer parallel tasks but more memory pressure per task. Start with 8096 rows per split and tune based on your row width and task manager heap.
The MySQL CDC source requires a dedicated replication user with REPLICATION SLAVE and REPLICATION CLIENT privileges, plus SELECT on the target databases. Do not reuse an application user — if the application user's password rotates, your CDC pipeline drops its binlog position and must re-snapshot.
PostgreSQL CDC Source
PostgreSQL CDC reads from logical replication slots using the pgoutput or decoderbufs plugin. Unlike MySQL, the PostgreSQL source currently runs at parallelism 1 during streaming (the WAL is a single sequential log), but the snapshot phase can be parallelized similarly to MySQL.
// Java DataStream API — PostgreSQL CDC
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.connectors.postgres.source.PostgresSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class PostgresCdcJob {
public static void main(String[] args) throws Exception {
PostgresSource postgresSource = PostgresSource.builder()
.hostname("postgres.prod.internal")
.port(5432)
.database("inventory")
.schemaList("public")
.tableList("public.products", "public.inventory_levels")
.username("flink_cdc")
.password(System.getenv("PG_CDC_PASSWORD"))
.slotName("flink_cdc_slot") // Must be unique per pipeline
.decodingPluginName("pgoutput")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30_000);
env.fromSource(postgresSource, WatermarkStrategy.noWatermarks(), "Postgres CDC Source")
.setParallelism(1) // Streaming phase is single-threaded
.print();
env.execute("Postgres CDC Pipeline");
}
} PostgreSQL replication slots accumulate WAL segments until the consumer acknowledges them. If your Flink job is stopped for an extended period without dropping the slot, the PostgreSQL server will run out of disk space. Set max_slot_wal_keep_size in postgresql.conf as a safety limit, and consider dropping and recreating the slot with an initial snapshot if the job has been offline for more than a few hours.
Flink SQL CDC Table Definition
If you prefer SQL over the Java API, Flink CDC sources are declared as CREATE TABLE statements with a CDC-specific connector type. This is the recommended approach for teams that want to iterate quickly without recompiling Java:
-- Flink SQL: MySQL CDC source table
CREATE TABLE orders_cdc (
order_id BIGINT,
customer_id BIGINT,
total_amount DECIMAL(10, 2),
status STRING,
created_at TIMESTAMP(3),
updated_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql.prod.internal',
'port' = '3306',
'username' = 'flink_cdc',
'password' = '${MYSQL_CDC_PASSWORD}',
'database-name' = 'orders_db',
'table-name' = 'orders',
'scan.startup.mode' = 'initial',
'scan.incremental.snapshot.enabled' = 'true',
'scan.incremental.snapshot.chunk.size' = '8096'
);
-- PostgreSQL CDC source table
CREATE TABLE products_cdc (
product_id BIGINT,
name STRING,
price DECIMAL(10, 2),
stock INT,
updated_at TIMESTAMP(3),
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres.prod.internal',
'port' = '5432',
'username' = 'flink_cdc',
'password' = '${PG_CDC_PASSWORD}',
'database-name' = 'inventory',
'schema-name' = 'public',
'table-name' = 'products',
'slot.name' = 'flink_cdc_slot',
'decoding.plugin.name'= 'pgoutput'
);
-- Kafka sink
CREATE TABLE orders_kafka_sink (
order_id BIGINT,
customer_id BIGINT,
total_amount DECIMAL(10, 2),
status STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'orders.changes',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'sink.delivery-guarantee' = 'exactly-once',
'sink.transactional-id-prefix' = 'orders-cdc'
);
-- Materialize order changes into Kafka
INSERT INTO orders_kafka_sink
SELECT order_id, customer_id, total_amount, status, updated_at
FROM orders_cdc;Stateful Processing: Joining CDC Streams
One of Flink CDC's most powerful capabilities is joining two CDC streams with Flink's stateful operators. Because both streams are continuous, the join must be a temporal join or an interval join — a regular inner join would buffer unbounded state. The following example joins the MySQL orders stream with the PostgreSQL products stream using a temporal join, enriching each order item with the current product price at the time of the order event:
-- Temporal join: enrich order_items with current product data
-- Requires watermarks on the versioned table side
CREATE VIEW orders_enriched AS
SELECT
oi.order_id,
oi.product_id,
p.name AS product_name,
p.price AS unit_price,
oi.quantity,
oi.quantity * p.price AS line_total
FROM order_items_cdc AS oi
-- Temporal join uses the latest version of the product at the event time
LEFT JOIN products_cdc FOR SYSTEM_TIME AS OF oi.proc_time
ON oi.product_id = p.product_id;For event-time processing, you need to assign watermarks on your CDC source. Because CDC events carry database commit timestamps, you can use those as the event-time attribute:
// Watermark strategy using the database event timestamp embedded in the Debezium payload
WatermarkStrategy watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, recordTimestamp) -> extractCommitTs(event)); For Iceberg sinks, use a 5–10 second bounded out-of-orderness watermark rather than a monotone watermark. Distributed database writes rarely arrive in perfect commit-timestamp order across shards, and a too-tight watermark will cause Flink to drop late events before they reach the Iceberg file writer.
Writing to Iceberg and ClickHouse Sinks
Flink CDC pipelines commonly target Apache Iceberg for lake storage and ClickHouse for real-time OLAP queries. The Iceberg sink supports row-level UPSERT and DELETE operations natively when the source table has a primary key defined, making CDC-to-lake pipelines practical without custom merge logic:
-- Iceberg sink with upsert mode (requires primary key on source)
CREATE TABLE orders_iceberg (
order_id BIGINT,
customer_id BIGINT,
total_amount DECIMAL(10, 2),
status STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'prod_catalog',
'catalog-type' = 'hive',
'uri' = 'thrift://hive-metastore:9083',
'warehouse' = 's3://my-data-lake/warehouse',
'format-version' = '2', -- Required for row-level deletes (MOR)
'write.upsert.enabled'= 'true'
);
INSERT INTO orders_iceberg SELECT * FROM orders_cdc;
For ClickHouse, use the community flink-connector-clickhouse with ReplacingMergeTree as the target engine. Set the sink parallelism to match your ClickHouse shard count and use a batch interval of 5–10 seconds to amortize the INSERT overhead.
Kubernetes Deployment with the Flink Operator
The Apache Flink Kubernetes Operator (version 1.8+) provides a FlinkDeployment custom resource that manages job submission, savepoints, and rolling upgrades declaratively. A minimal CDC deployment looks like this:
# flink-cdc-deployment.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: orders-cdc-pipeline
namespace: flink-jobs
spec:
image: my-registry/flink-cdc:1.18-3.1.0
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
state.checkpoints.dir: "s3://my-flink-state/checkpoints"
state.savepoints.dir: "s3://my-flink-state/savepoints"
execution.checkpointing.interval: "30000"
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.min-pause: "10000"
restart-strategy: exponential-delay
restart-strategy.exponential-delay.max-backoff: "5min"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "4096m"
cpu: 2
replicas: 3
job:
jarURI: "s3://my-flink-jars/orders-cdc-pipeline-1.0.jar"
entryClass: "com.example.OrdersCdcJob"
args: []
parallelism: 4
upgradeMode: savepoint # Zero-downtime upgrades via savepointsSet upgradeMode: savepoint in production. This instructs the Flink Operator to trigger a savepoint before terminating the old job and restore from it when starting the new version. The binlog offset and any operator state (join buffers, aggregation accumulators) are preserved across upgrades, so your pipeline picks up exactly where it left off.
CDC source operators have non-parallelizable state during the streaming phase (the binlog offset is a single point in the log). If you change the source parallelism between upgrades, the savepoint restore will fail with a state incompatibility error. Treat the source parallelism as an immutable property of a pipeline version; scale downstream operators instead.
Key Takeaways
- Flink CDC embeds Debezium capture directly in the Flink runtime, removing the need for a Kafka broker as a CDC intermediary and giving you a single exactly-once boundary from binlog to sink.
- Both MySQL and PostgreSQL connectors support parallel incremental snapshot mode, allowing the initial table load to scale horizontally before switching to single-threaded streaming from the log.
- Enable checkpointing at 30-second intervals with
EXACTLY_ONCEmode — this is what guarantees no duplicates on task manager restarts, not the connector itself. - Temporal joins let you enrich CDC streams across sources without unbounded state accumulation; pair them with a bounded out-of-orderness watermark strategy based on database commit timestamps.
- Apache Iceberg with
format-version=2andwrite.upsert.enabled=truegives you a CDC-capable data lake target that handlesUPDATEandDELETEevents natively. - The Flink Kubernetes Operator with
upgradeMode: savepointis the only production-safe way to upgrade a running CDC pipeline without replaying the full table snapshot. - Never change the parallelism of a CDC source operator across savepoint-restored upgrades — it will break state restore. Scale at the processing and sink layers instead.
Simplify Your Real-Time Data Stack with JusDB
Running Flink CDC in production means managing binlog permissions, replication slot retention, checkpoint storage, savepoint orchestration, and Kubernetes operator upgrades — before you even start writing business logic. JusDB provides managed connectors, pre-configured exactly-once pipelines, and a unified observability layer so your team spends time on the data, not the infrastructure.
Talk to a JusDB engineer about migrating your batch ETL to a real-time CDC architecture, or explore the JusDB documentation to see how our managed Flink environment handles snapshot coordination, slot management, and sink compatibility out of the box.