Database Engineering

Kafka Connect: JDBC Source, SMTs, Dead Letter Queues, and Connector Monitoring

Deploy Kafka Connect with JDBC source connectors, Single Message Transforms, Dead Letter Queues for error handling, and consumer group lag monitoring.

JusDB Team
June 16, 2025
5 min read
149 views

Kafka Connect is the integration framework for streaming data into and out of Kafka. Understanding connectors, transformations, and error handling is essential for production data pipelines.

Architecture

text
Source Systems          Kafka Connect              Kafka
MySQL / PostgreSQL  ──► Source Connector  ──────► Topics
Files / S3                                          │
                        Sink Connector   ◄──────── │
Elasticsearch ◄─────────────────────────          │
Data Warehouse ◄────────────────────────          │

Deploy a JDBC Source Connector

json
{
  "name": "jdbc-mysql-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://mysql:3306/myapp",
    "connection.user": "kafka_user",
    "connection.password": "kafka_pass",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "updated_at",
    "incrementing.column.name": "id",
    "table.whitelist": "orders,customers",
    "topic.prefix": "mysql.",
    "poll.interval.ms": "5000"
  }
}

Single Message Transforms (SMTs)

SMTs modify messages in-flight without custom code:

json
"transforms": "addTimestamp,maskSSN",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "ingest_ts",
"transforms.maskSSN.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskSSN.fields": "ssn"

Dead Letter Queue for Error Handling

json
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-connector-errors",
"errors.deadletterqueue.context.headers.enable": true,
"errors.log.enable": true

Monitor Connector Lag

bash
# Check all connector statuses
curl http://kafka-connect:8083/connectors?expand=status | jq

# Check consumer group lag for sink connectors
kafka-consumer-groups --bootstrap-server kafka:9092 \
  --group connect-elasticsearch-sink \
  --describe

Scaling Connectors

json
"tasks.max": "4"  // Run 4 parallel tasks for this connector

For source connectors, each task handles a subset of tables or partitions. Ensure your source system can handle the parallel load.

Key Takeaways

  • Use mode: timestamp+incrementing for JDBC source to capture both inserts and updates
  • SMTs handle common transformations (field masking, routing, timestamp injection) without custom code
  • Configure a Dead Letter Queue to isolate and debug malformed records without stopping the connector
  • Monitor consumer group lag for sink connectors — growing lag means the sink cannot keep up

JusDB Can Help

Kafka Connect pipelines require careful error handling and monitoring design. JusDB builds and operates production-grade data integration pipelines.

Share this article

JusDB Team

Official JusDB content team