TechLead
Lesson 21 of 22
6 min read
Data Engineering

Real-Time Analytics

Build real-time analytics systems with streaming ingestion, materialized views, OLAP engines, and live dashboards

What is Real-Time Analytics?

Real-time analytics is the ability to analyze and query data as it arrives, with latency measured in seconds rather than hours. Traditional batch analytics processes data on a schedule (hourly, daily); real-time analytics processes data continuously, enabling live dashboards, instant alerting, and up-to-the-second reporting. Use cases include monitoring website traffic in real-time, detecting fraud as transactions occur, tracking delivery ETAs live, and powering operational dashboards for customer support teams.

Real-time analytics requires a different architecture than batch analytics. Data must flow from source to queryable state in seconds, which means streaming ingestion, specialized storage engines optimized for concurrent reads and writes, and pre-computed aggregations. The trade-off is increased complexity and cost compared to batch systems, so you should only build real-time analytics when the business genuinely needs sub-minute data freshness.

Real-Time vs Near-Real-Time vs Batch

  • Real-Time (< 1 second): Fraud detection, live trading dashboards, operational alerting. Requires streaming architecture end-to-end.
  • Near-Real-Time (1-60 seconds): Live website analytics, operational dashboards, real-time KPI monitoring. Micro-batch or materialized views.
  • Batch (minutes to hours): Daily reports, historical analysis, ML training. Standard warehouse + dbt approach.

Real-Time Architecture

A typical real-time analytics architecture consists of: a streaming layer (Kafka) for ingestion, a stream processor (Flink, Kafka Streams) for transformations, a real-time OLAP engine (ClickHouse, Apache Druid, Apache Pinot) for fast queries, and a visualization layer (Grafana, Superset) for dashboards.

# Real-time analytics architecture
#
# Sources -> Kafka -> Stream Processor -> OLAP Engine -> Dashboard
#
# 1. Clickstream events -> Kafka topic "page_views"
# 2. Flink job aggregates by page/minute -> Kafka topic "page_views_agg"
# 3. ClickHouse ingests aggregated data
# 4. Grafana queries ClickHouse for live dashboards

# docker-compose.yml for a real-time analytics stack
version: '3.8'
services:
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    ports: ["9092:9092"]

  clickhouse:
    image: clickhouse/clickhouse-server:24.3
    ports: ["8123:8123", "9000:9000"]
    volumes:
      - ./clickhouse/init.sql:/docker-entrypoint-initdb.d/init.sql

  grafana:
    image: grafana/grafana:10.4.0
    ports: ["3000:3000"]
    environment:
      GF_INSTALL_PLUGINS: grafana-clickhouse-datasource

Streaming Ingestion to ClickHouse

-- ClickHouse: Create table optimized for real-time analytics
-- MergeTree engine with partitioning and ordering for fast aggregation

CREATE TABLE analytics.page_views (
    event_id       String,
    user_id        String,
    page_url       String,
    referrer       String,
    device_type    LowCardinality(String),
    country        LowCardinality(String),
    event_time     DateTime64(3),
    session_id     String,
    duration_ms    UInt32
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (country, toStartOfMinute(event_time), user_id)
TTL event_time + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;

-- Materialized view: Pre-aggregate per minute for fast dashboard queries
CREATE MATERIALIZED VIEW analytics.pageviews_per_minute
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(minute)
ORDER BY (country, page_url, minute)
AS
SELECT
    toStartOfMinute(event_time) AS minute,
    country,
    page_url,
    count() AS view_count,
    uniq(user_id) AS unique_users,
    avg(duration_ms) AS avg_duration
FROM analytics.page_views
GROUP BY minute, country, page_url;

Real-Time Ingestion with Python

from confluent_kafka import Consumer
import clickhouse_connect
import json
from datetime import datetime

# Kafka consumer for real-time ingestion
consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'clickhouse-ingester',
    'auto.offset.reset': 'latest',
    'enable.auto.commit': True,
})
consumer.subscribe(['page_views'])

# ClickHouse client
ch_client = clickhouse_connect.get_client(host='clickhouse', port=8123)

BATCH_SIZE = 1000
FLUSH_INTERVAL = 5  # seconds
batch = []
last_flush = datetime.now()

def flush_batch():
    """Insert batch into ClickHouse."""
    global batch, last_flush
    if not batch:
        return

    ch_client.insert(
        'analytics.page_views',
        batch,
        column_names=[
            'event_id', 'user_id', 'page_url', 'referrer',
            'device_type', 'country', 'event_time',
            'session_id', 'duration_ms'
        ]
    )
    print(f"Inserted {len(batch)} rows into ClickHouse")
    batch = []
    last_flush = datetime.now()

print("Starting real-time analytics ingester...")
while True:
    msg = consumer.poll(timeout=1.0)

    if msg is not None and not msg.error():
        event = json.loads(msg.value().decode())
        batch.append([
            event['event_id'],
            event['user_id'],
            event['page_url'],
            event.get('referrer', ''),
            event.get('device_type', 'unknown'),
            event.get('country', 'unknown'),
            datetime.fromisoformat(event['event_time']),
            event.get('session_id', ''),
            event.get('duration_ms', 0),
        ])

    # Flush on batch size or time interval
    elapsed = (datetime.now() - last_flush).total_seconds()
    if len(batch) >= BATCH_SIZE or (batch and elapsed >= FLUSH_INTERVAL):
        flush_batch()

Querying Real-Time Data

-- Real-time dashboard queries against ClickHouse

-- Live visitors in the last 5 minutes
SELECT
    count() AS total_views,
    uniq(user_id) AS unique_visitors,
    avg(duration_ms) AS avg_duration_ms
FROM analytics.page_views
WHERE event_time >= now() - INTERVAL 5 MINUTE;

-- Top pages right now (last 10 minutes)
SELECT
    page_url,
    count() AS views,
    uniq(user_id) AS visitors,
    avg(duration_ms) AS avg_duration
FROM analytics.page_views
WHERE event_time >= now() - INTERVAL 10 MINUTE
GROUP BY page_url
ORDER BY views DESC
LIMIT 20;

-- Traffic by country, per minute (for time-series charts)
SELECT
    minute,
    country,
    sum(view_count) AS views,
    sum(unique_users) AS visitors
FROM analytics.pageviews_per_minute
WHERE minute >= now() - INTERVAL 1 HOUR
GROUP BY minute, country
ORDER BY minute DESC;

-- Anomaly detection: Compare current hour to same hour last week
WITH current_hour AS (
    SELECT count() AS current_views
    FROM analytics.page_views
    WHERE event_time >= toStartOfHour(now())
),
last_week AS (
    SELECT count() AS prev_views
    FROM analytics.page_views
    WHERE event_time >= toStartOfHour(now()) - INTERVAL 7 DAY
      AND event_time < toStartOfHour(now()) - INTERVAL 7 DAY + INTERVAL 1 HOUR
)
SELECT
    current_views,
    prev_views,
    round((current_views - prev_views) / prev_views * 100, 1) AS change_pct
FROM current_hour, last_week;

Materialized Views for Performance

Materialized views pre-compute aggregations as data arrives, making dashboard queries instant regardless of data volume. Instead of scanning billions of raw events for each dashboard refresh, you query pre-aggregated tables with millions of rows.

Real-Time API with TypeScript

// API endpoint for real-time dashboard data
import { createClient } from '@clickhouse/client';

const clickhouse = createClient({
  url: 'http://clickhouse:8123',
  database: 'analytics',
});

interface LiveMetrics {
  totalViews: number;
  uniqueVisitors: number;
  avgDuration: number;
  topPages: { url: string; views: number }[];
}

async function getLiveMetrics(interval: string = '5 MINUTE'): Promise {
  const [summary, topPages] = await Promise.all([
    clickhouse.query({
      query: `
        SELECT
          count() AS total_views,
          uniq(user_id) AS unique_visitors,
          avg(duration_ms) AS avg_duration
        FROM analytics.page_views
        WHERE event_time >= now() - INTERVAL ${interval}
      `,
      format: 'JSONEachRow',
    }),
    clickhouse.query({
      query: `
        SELECT page_url AS url, count() AS views
        FROM analytics.page_views
        WHERE event_time >= now() - INTERVAL ${interval}
        GROUP BY page_url
        ORDER BY views DESC
        LIMIT 10
      `,
      format: 'JSONEachRow',
    }),
  ]);

  const summaryData = await summary.json();
  const pagesData = await topPages.json();

  return {
    totalViews: summaryData[0]?.total_views || 0,
    uniqueVisitors: summaryData[0]?.unique_visitors || 0,
    avgDuration: summaryData[0]?.avg_duration || 0,
    topPages: pagesData,
  };
}

Key Takeaways

  • Real-time analytics provides sub-second to sub-minute data freshness for live dashboards and alerting
  • The architecture consists of streaming ingestion (Kafka), transformation (Flink/Streams), OLAP storage (ClickHouse), and visualization (Grafana)
  • Materialized views pre-compute aggregations as data arrives, making dashboard queries instant
  • ClickHouse, Apache Druid, and Apache Pinot are purpose-built for real-time analytical queries at scale
  • Only build real-time analytics when the business genuinely needs sub-minute freshness — batch is simpler and cheaper
  • Micro-batching (inserting every few seconds) is often sufficient and much simpler than true event-at-a-time processing

Continue Learning