Database Engineering

Kafka Connect: Building Database Integration Pipelines

Use Kafka Connect source and sink connectors to stream data between databases, warehouses, and event systems

JusDB Team
January 24, 2026
12 min read
175 views

Running database migrations at 2 AM because your ETL pipeline silently dropped rows is a rite of passage nobody wants. Kafka Connect changes that story: it gives you a declarative, fault-tolerant framework for streaming data between databases, data warehouses, and event systems without writing a single line of custom producer or consumer code. Once you understand how connectors, tasks, and offset management fit together, you can replace fragile cron-based exports with pipelines that survive broker restarts and schema changes. This post walks through every layer of that architecture — from JDBC source connectors to dead letter queues — so you can build integrations that actually stay up.

TL;DR
  • Kafka Connect is a managed framework for building source (database → Kafka) and sink (Kafka → database) connectors without custom producer/consumer code.
  • Distributed mode is required for production; standalone mode is only for local development.
  • JDBC source connectors support incrementing and timestamp query modes to capture new and updated rows.
  • Debezium provides log-based Change Data Capture (CDC), capturing deletes and updates with sub-second latency.
  • Dead letter queues (DLQ) and Single Message Transforms (SMT) make pipelines resilient and flexible without custom code.
  • Monitor connector health through JMX metrics and the built-in REST API.

What Is Kafka Connect?

Kafka Connect is a component of the Apache Kafka ecosystem designed specifically for integrating Kafka with external systems — relational databases, object stores, search indexes, and SaaS platforms. Rather than writing producers and consumers from scratch, you configure connectors: self-describing plugins that handle serialization, offset tracking, schema evolution, and error handling on your behalf.

Two connector types cover every integration scenario:

  • Source connectors read from an external system and write records into Kafka topics.
  • Sink connectors read from Kafka topics and write records into an external system.

The Kafka Connect REST API lets you deploy, pause, resume, and delete connectors without touching configuration files or restarting workers. This operational model is what separates Kafka Connect from hand-rolled ETL scripts: the framework owns the lifecycle, not you.

Architecture: Distributed vs. Standalone Mode

Kafka Connect workers run in one of two modes, and choosing the wrong one for production is one of the most common mistakes teams make early on.

Standalone Mode

Standalone mode runs a single worker process that stores offsets in a local file. It is simple to start and ideal for development and testing, but it has no fault tolerance — if the process crashes, your pipeline stops. Connector configurations are loaded from files on disk, not from Kafka topics, which makes dynamic management impossible.

bash
# Start a standalone worker
bin/connect-standalone.sh config/connect-standalone.properties connector.properties
Warning

Never run standalone mode in production. Offsets stored in local files are not replicated, so a node failure means losing track of how far the connector has progressed. You will either reprocess data or miss records entirely.

Distributed Mode

Distributed mode runs multiple worker processes that coordinate through three internal Kafka topics: connect-configs, connect-offsets, and connect-status. Workers form a consumer group — if one worker fails, its connector tasks are automatically redistributed to the remaining workers. All configuration changes go through the REST API, which means you can add or reconfigure connectors without touching any worker process.

properties
# connect-distributed.properties (key settings)
bootstrap.servers=kafka-broker-1:9092,kafka-broker-2:9092
group.id=connect-cluster-prod

# Internal topic names (use replication factor >= 3 in production)
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

# Converter defaults (can be overridden per connector)
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
Tip

Set offset.flush.interval.ms to a value appropriate for your latency requirements. The default is 60,000 ms (1 minute). Lowering it reduces potential duplicate processing after a worker failure, at the cost of more frequent Kafka writes.

Source Connectors: Getting Data Into Kafka

JDBC Source Connector

The Confluent JDBC source connector polls a relational database and publishes rows to Kafka topics. The most critical configuration decision is query.mode, which determines how the connector tracks which rows it has already seen.

Incrementing mode uses a monotonically increasing integer column (typically a surrogate primary key) to identify new rows. It cannot detect updates or deletes — only inserts.

json
{
  "name": "postgres-orders-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://db-host:5432/orders_db",
    "connection.user": "kafka_reader",
    "connection.password": "${file:/opt/kafka/secrets.properties:db.password}",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "table.whitelist": "public.orders",
    "topic.prefix": "prod.postgres.",
    "poll.interval.ms": "5000",
    "batch.max.rows": "5000",
    "tasks.max": "3"
  }
}

Timestamp mode uses a TIMESTAMP column to detect both inserts and updates, but it cannot detect deletes. It is more sensitive to clock skew between application servers and database servers.

Timestamp+incrementing mode combines both columns for the most reliable detection of inserts and updates, using the timestamp to filter candidate rows and the incrementing column to break ties.

json
{
  "name": "mysql-products-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://db-host:3306/catalog",
    "connection.user": "kafka_reader",
    "connection.password": "${file:/opt/kafka/secrets.properties:mysql.password}",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "updated_at",
    "incrementing.column.name": "product_id",
    "table.whitelist": "catalog.products",
    "topic.prefix": "prod.mysql.",
    "poll.interval.ms": "10000",
    "tasks.max": "1"
  }
}

Debezium: Log-Based CDC

When you need to capture deletes, or when polling latency is unacceptable, Debezium is the answer. Instead of polling the database, Debezium reads the database's transaction log directly — the PostgreSQL WAL, MySQL binlog, or SQL Server CDC log — and emits a change event for every insert, update, and delete.

The result is sub-second latency and a complete history of every row mutation, including before and after images. This makes Debezium the right choice for audit logging, cache invalidation, and real-time analytics pipelines.

json
{
  "name": "debezium-postgres-orders",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "db-host",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${file:/opt/kafka/secrets.properties:debezium.password}",
    "database.dbname": "orders_db",
    "database.server.name": "prod-postgres",
    "table.include.list": "public.orders,public.order_items",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "debezium_pub",
    "topic.prefix": "cdc",
    "tasks.max": "1"
  }
}
Warning

Debezium uses replication slots in PostgreSQL. An unconsumed or stalled slot will cause WAL segments to accumulate on disk indefinitely, eventually filling your database server's disk. Always monitor replication slot lag in production and set max_slot_wal_keep_size in PostgreSQL 13+ as a safeguard.

Sink Connectors: Getting Data Out of Kafka

JDBC Sink Connector

The JDBC sink connector writes Kafka records to a relational database table. Two features make it particularly useful for teams managing evolving schemas: auto.create and auto.evolve.

  • auto.create=true creates the destination table if it does not exist, inferring the schema from the first batch of records.
  • auto.evolve=true adds new columns to the destination table as new fields appear in incoming records.
json
{
  "name": "postgres-orders-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://warehouse-host:5432/analytics",
    "connection.user": "kafka_writer",
    "connection.password": "${file:/opt/kafka/secrets.properties:sink.password}",
    "topics": "prod.postgres.orders",
    "table.name.format": "kafka_${topic}",
    "insert.mode": "upsert",
    "pk.mode": "record_value",
    "pk.fields": "order_id",
    "auto.create": "true",
    "auto.evolve": "true",
    "batch.size": "3000",
    "tasks.max": "4"
  }
}

S3 Sink Connector

For archival and data lake use cases, the S3 sink connector writes Kafka records to Amazon S3 as Parquet, Avro, or JSON files, partitioned by time or a field value. This is a common pattern for feeding analytics platforms that batch-read from object storage.

json
{
  "name": "s3-orders-archive",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "4",
    "topics": "prod.postgres.orders",
    "s3.region": "us-east-1",
    "s3.bucket.name": "data-lake-prod",
    "s3.part.size": "67108864",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "flush.size": "50000",
    "rotate.interval.ms": "600000",
    "locale": "en_US",
    "timezone": "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "created_at",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "partition.duration.ms": "3600000"
  }
}

Configuration: Tasks, Parallelism, and Error Handling

Connector Tasks and Parallelism

A connector is the logical configuration unit; tasks are the units of parallelism that actually do the work. Setting tasks.max tells the framework the maximum number of tasks it may spawn for a connector — the actual number depends on what the connector can parallelize. A JDBC source connector reading a single table typically runs as one task regardless of tasks.max. A connector reading from multiple tables or partitions can parallelize across tasks up to the configured maximum.

Tasks are distributed across available workers in a distributed cluster. Adding more workers and increasing tasks.max is how you scale throughput horizontally.

Dead Letter Queue

By default, a single malformed record will cause a connector to fail and stop processing. The dead letter queue (DLQ) feature routes problematic records to a separate Kafka topic instead of halting the connector, allowing the pipeline to continue while you investigate failures separately.

json
{
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "dlq.orders-sink",
  "errors.deadletterqueue.topic.replication.factor": "3",
  "errors.deadletterqueue.context.headers.enable": "true",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true"
}
Tip

Enable errors.deadletterqueue.context.headers.enable=true to attach error context as Kafka record headers to each DLQ message. This gives you the exception class, exception message, and the connector and task that failed — making debugging significantly faster.

Single Message Transforms (SMT)

SMTs are lightweight transformations applied to records as they pass through a connector, without needing a separate stream processing step. Common use cases include renaming fields, filtering records by value, inserting metadata fields, and masking sensitive data.

json
{
  "transforms": "addTimestamp,maskEmail",
  "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.addTimestamp.timestamp.field": "kafka_ingest_time",
  "transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.maskEmail.fields": "customer_email",
  "transforms.maskEmail.replacement": "***@***.***"
}
Warning

SMTs run synchronously in the connector's task thread. Expensive or poorly written transforms directly reduce connector throughput. For complex transformations — joins, aggregations, enrichment from external lookups — use a stream processing layer like Kafka Streams or Apache Flink instead.

Monitoring: JMX and the REST API

REST API for Connector Status

The Kafka Connect REST API is the primary operational interface. Use it to check connector and task health, retrieve configurations, and diagnose failures without touching log files.

bash
# List all connectors
curl -s http://connect-host:8083/connectors | jq .

# Get connector status (shows task states and error messages)
curl -s http://connect-host:8083/connectors/postgres-orders-source/status | jq .

# Pause a connector
curl -X PUT http://connect-host:8083/connectors/postgres-orders-source/pause

# Resume a connector
curl -X PUT http://connect-host:8083/connectors/postgres-orders-source/resume

# Restart a failed task
curl -X POST http://connect-host:8083/connectors/postgres-orders-source/tasks/0/restart

JMX Metrics

Kafka Connect exposes detailed metrics through JMX under the kafka.connect domain. The most operationally important metrics to track are:

  • source-record-poll-total / source-record-write-total: track throughput and identify gaps between records polled and records committed.
  • sink-record-read-total / sink-record-send-total: same pattern for sink connectors.
  • connector-rebalances-completed-total: frequent rebalances indicate worker instability.
  • offset-commit-success-percentage: dropping below 100% means offsets are not being committed reliably.
  • batch-size-avg and poll-batch-avg-time-ms: diagnose throughput bottlenecks at the connector level.

Export JMX metrics to Prometheus using the JMX Exporter agent, then build Grafana dashboards that alert on task failures, throughput drops, and DLQ growth. A connector in FAILED state that nobody notices for six hours is a data quality incident waiting to happen.

Tip

The Connect REST API's /connectors/{name}/status endpoint returns task-level state and the last exception message for failed tasks. Poll this endpoint from your monitoring system as a lightweight health check that does not require JMX access.

Key Takeaways
  • Use distributed mode for all production deployments — standalone mode has no fault tolerance and no dynamic management.
  • JDBC source connectors cover insert/update detection with incrementing, timestamp, and timestamp+incrementing modes; use Debezium when you need deletes or minimal latency.
  • JDBC sink connectors with auto.create and auto.evolve reduce schema management overhead for evolving datasets.
  • Configure dead letter queues with errors.tolerance=all and context headers enabled so malformed records do not halt your pipeline and failures are traceable.
  • Use SMTs for lightweight, stateless record transformations; defer complex logic to a dedicated stream processing layer.
  • Monitor connector health via the REST API for task state and JMX for throughput and offset commit metrics — set up alerts before you go to production, not after an incident.
  • The tasks.max setting controls parallelism per connector; pair it with sufficient workers to distribute load effectively across the cluster.

Build Reliable Database Pipelines with JusDB

Kafka Connect gives you the scaffolding; what you build on top of it determines whether your data pipelines are a competitive advantage or a maintenance burden. Choosing the right database on each end of the pipeline — one that handles high write throughput from sink connectors, supports the query patterns your analysts actually use, and integrates cleanly with your existing Kafka infrastructure — makes the difference between a pipeline that scales and one that becomes a bottleneck.

JusDB publishes in-depth comparisons of databases across performance, operational overhead, and ecosystem compatibility, including how they behave as Kafka Connect sources and sinks under real production workloads. Whether you are evaluating PostgreSQL, StarRocks, ClickHouse, or a managed warehouse, the analysis you need to make an informed decision is here.

Explore the JusDB blog for database performance benchmarks, architecture guides, and integration deep-dives written for data engineers and DBAs who care about the details.

Share this article