Building a Real-Time Data Pipeline with Kafka and dbt
A step-by-step walkthrough of building a production-grade streaming pipeline: from Kafka producers through schema registry to dbt transformations and warehouse delivery.
Why Real-Time Pipelines Are Harder Than They Look
Batch ETL pipelines are straightforward: extract, transform, load, sleep 24 hours, repeat. Real-time pipelines introduce ordering guarantees, exactly-once semantics, schema evolution, and backpressure — all of which can silently corrupt your data warehouse if not handled correctly.
This walkthrough covers the architecture we use for production streaming pipelines.
Architecture Overview
[Source Systems]
│
▼
[Kafka Producers] ──► [Schema Registry] ──► [Kafka Topics]
│
▼
[Kafka Consumers / Flink]
│
▼
[Staging Tables (raw)]
│
▼
[dbt Models]
│
▼
[Warehouse (Snowflake / BigQuery)]
Step 1: Define Your Schema Contract First
The single biggest mistake teams make is publishing events without a schema contract. When upstream teams change their event structure, your pipeline silently breaks.
Use Confluent Schema Registry with Avro:
{
"type": "record",
"name": "OrderPlaced",
"namespace": "com.yourcompany.events",
"fields": [
{ "name": "order_id", "type": "string" },
{ "name": "customer_id", "type": "string" },
{ "name": "amount_cents", "type": "long" },
{ "name": "currency", "type": "string", "default": "USD" },
{ "name": "placed_at", "type": { "type": "long", "logicalType": "timestamp-millis" } }
]
}
Enforce BACKWARD compatibility in the registry. New fields must have defaults. Removing fields is a breaking change.
Step 2: Write a Producer with Idempotency
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
producer_config = {
"bootstrap.servers": "kafka:9092",
"enable.idempotence": True, # Exactly-once delivery
"acks": "all", # Wait for all replicas
"retries": 10,
"delivery.timeout.ms": 120000,
}
producer = Producer(producer_config)
def publish_order(order: dict):
producer.produce(
topic="orders.placed",
key=order["order_id"], # Partition by order_id for ordering
value=avro_serializer(order),
on_delivery=delivery_report,
)
producer.flush()
Always set enable.idempotence=True and acks=all. Without these, network retries can produce duplicate records.
Step 3: Land Raw Events in Staging Tables
Consumers write raw JSON (or Avro-decoded dicts) directly to a staging table — no transformation at this layer.
-- staging/stg_orders_placed.sql (dbt model)
{{ config(materialized='incremental', unique_key='order_id') }}
SELECT
order_id,
customer_id,
amount_cents / 100.0 AS amount_usd,
currency,
placed_at AS event_time,
_ingested_at AS loaded_at
FROM {{ source('kafka_landing', 'orders_placed_raw') }}
{% if is_incremental() %}
WHERE _ingested_at > (SELECT MAX(loaded_at) FROM {{ this }})
{% endif %}
Step 4: Build Marts with dbt
Once data lands in staging, dbt handles all transformations. Incremental models keep processing efficient.
-- marts/fct_daily_revenue.sql
{{ config(materialized='table') }}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders_placed') }}
WHERE event_time >= CURRENT_DATE - INTERVAL '90 days'
),
daily AS (
SELECT
DATE_TRUNC('day', event_time) AS date,
currency,
COUNT(*) AS order_count,
SUM(amount_usd) AS revenue_usd
FROM orders
GROUP BY 1, 2
)
SELECT * FROM daily
Step 5: Monitor Pipeline Health
Track these metrics for every pipeline:
| Metric | Alert threshold |
|---|---|
| Consumer lag | > 10,000 messages |
| Processing latency (p95) | > 30 seconds |
| Dead letter queue size | > 0 |
| Schema compatibility failures | > 0 |
Set up Grafana dashboards on these. Consumer lag growing unchecked is the most common cause of "why is our dashboard 6 hours stale?"
Key Takeaways
- Schema contract first — register Avro schemas before writing a single producer
- Idempotent producers —
enable.idempotence=Trueis non-negotiable - Staging layer — land raw, transform with dbt, never both in one step
- Incremental models — never full-refresh a table with >1M rows in production
- Monitor consumer lag — it's the canary in the coal mine for pipeline health