Back to Blog
Data Pipelines
Architecture

Building a Real-Time Data Pipeline with Kafka and dbt

A step-by-step walkthrough of building a production-grade streaming pipeline: from Kafka producers through schema registry to dbt transformations and warehouse delivery.

10 min readMay 20, 2026Netvionix Team
Building a Real-Time Data Pipeline with Kafka and dbt

Why Real-Time Pipelines Are Harder Than They Look

Batch ETL pipelines are straightforward: extract, transform, load, sleep 24 hours, repeat. Real-time pipelines introduce ordering guarantees, exactly-once semantics, schema evolution, and backpressure — all of which can silently corrupt your data warehouse if not handled correctly.

This walkthrough covers the architecture we use for production streaming pipelines.


Architecture Overview

[Source Systems]
    │
    ▼
[Kafka Producers] ──► [Schema Registry] ──► [Kafka Topics]
                                                    │
                                                    ▼
                                          [Kafka Consumers / Flink]
                                                    │
                                                    ▼
                                          [Staging Tables (raw)]
                                                    │
                                                    ▼
                                            [dbt Models]
                                                    │
                                                    ▼
                                          [Warehouse (Snowflake / BigQuery)]

Step 1: Define Your Schema Contract First

The single biggest mistake teams make is publishing events without a schema contract. When upstream teams change their event structure, your pipeline silently breaks.

Use Confluent Schema Registry with Avro:

{
  "type": "record",
  "name": "OrderPlaced",
  "namespace": "com.yourcompany.events",
  "fields": [
    { "name": "order_id", "type": "string" },
    { "name": "customer_id", "type": "string" },
    { "name": "amount_cents", "type": "long" },
    { "name": "currency", "type": "string", "default": "USD" },
    { "name": "placed_at", "type": { "type": "long", "logicalType": "timestamp-millis" } }
  ]
}

Enforce BACKWARD compatibility in the registry. New fields must have defaults. Removing fields is a breaking change.


Step 2: Write a Producer with Idempotency

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

producer_config = {
    "bootstrap.servers": "kafka:9092",
    "enable.idempotence": True,        # Exactly-once delivery
    "acks": "all",                     # Wait for all replicas
    "retries": 10,
    "delivery.timeout.ms": 120000,
}

producer = Producer(producer_config)

def publish_order(order: dict):
    producer.produce(
        topic="orders.placed",
        key=order["order_id"],         # Partition by order_id for ordering
        value=avro_serializer(order),
        on_delivery=delivery_report,
    )
    producer.flush()

Always set enable.idempotence=True and acks=all. Without these, network retries can produce duplicate records.


Step 3: Land Raw Events in Staging Tables

Consumers write raw JSON (or Avro-decoded dicts) directly to a staging table — no transformation at this layer.

-- staging/stg_orders_placed.sql (dbt model)
{{ config(materialized='incremental', unique_key='order_id') }}

SELECT
    order_id,
    customer_id,
    amount_cents / 100.0  AS amount_usd,
    currency,
    placed_at             AS event_time,
    _ingested_at          AS loaded_at
FROM {{ source('kafka_landing', 'orders_placed_raw') }}

{% if is_incremental() %}
WHERE _ingested_at > (SELECT MAX(loaded_at) FROM {{ this }})
{% endif %}

Step 4: Build Marts with dbt

Once data lands in staging, dbt handles all transformations. Incremental models keep processing efficient.

-- marts/fct_daily_revenue.sql
{{ config(materialized='table') }}

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders_placed') }}
    WHERE event_time >= CURRENT_DATE - INTERVAL '90 days'
),

daily AS (
    SELECT
        DATE_TRUNC('day', event_time) AS date,
        currency,
        COUNT(*)                       AS order_count,
        SUM(amount_usd)                AS revenue_usd
    FROM orders
    GROUP BY 1, 2
)

SELECT * FROM daily

Step 5: Monitor Pipeline Health

Track these metrics for every pipeline:

MetricAlert threshold
Consumer lag> 10,000 messages
Processing latency (p95)> 30 seconds
Dead letter queue size> 0
Schema compatibility failures> 0

Set up Grafana dashboards on these. Consumer lag growing unchecked is the most common cause of "why is our dashboard 6 hours stale?"


Key Takeaways

  1. Schema contract first — register Avro schemas before writing a single producer
  2. Idempotent producersenable.idempotence=True is non-negotiable
  3. Staging layer — land raw, transform with dbt, never both in one step
  4. Incremental models — never full-refresh a table with >1M rows in production
  5. Monitor consumer lag — it's the canary in the coal mine for pipeline health