Skip to main content

Chapter 12. Stream Processing

A complex system that works is invariably found to have evolved from a simple system that works. The inverse proposition also appears to be true: A complex system designed from scratch never works and cannot be made to work.

John Gall, Systemantics (1975)

Table of Contents

  1. Introduction
  2. Transmitting Event Streams
  3. Databases and Streams
  4. Processing Streams
  5. Summary

1. Introduction

In plain English: Imagine a restaurant kitchen. Batch processing is like preparing meals for a catered event—you cook everything at once, then serve it. Stream processing is like a busy restaurant during dinner service—orders come in continuously, you cook them as they arrive, and serve them immediately. There's no "end" to dinner service; it's an ongoing flow.

In technical terms: Stream processing handles unbounded data that arrives continuously over time, processing each event as it happens rather than waiting to accumulate a batch. Unlike batch jobs that have a definite beginning and end, stream processing runs continuously, transforming events in real-time.

Why it matters: Many modern applications need near-instant responses. A fraud detection system can't wait 24 hours to process yesterday's transactions—it must flag suspicious activity within seconds. Stream processing enables real-time analytics, live dashboards, instant recommendations, and immediate alerts.

1.1. From Batch to Stream

In Chapter 11, we discussed batch processing with one key assumption: the input is bounded—it has a known, finite size. For example, MapReduce must read its entire input before producing output because the very last record might have the lowest key and need to be first in the sorted output.

Batch Processing
Stream Processing
Input
Bounded (finite dataset)
Unbounded (continuous stream)
Processing
Process entire dataset
Process each event as it arrives
Duration
Minutes to hours
Never ends
Latency
Hours to days
Seconds
Example
Daily ETL job
Real-time fraud detection

In reality, most data is unbounded—your users produced data yesterday, today, and will continue producing data tomorrow. Unless you go out of business, this never ends. Batch processors artificially divide this into chunks: processing a day's worth of data at the end of every day, or an hour's worth at the end of every hour.

The problem: Daily batch processing means changes in input are only reflected in output a day later. To reduce delay, we can run processing more frequently—every second, or even continuously, processing each event as it happens. That's stream processing.

💡 Insight

Stream processing doesn't replace batch processing—they're complementary. Batch excels at processing large historical datasets with high throughput. Stream excels at low-latency responses to individual events. Many modern architectures use both: streams for real-time responses, batch for comprehensive analytics.

1.2. What is a Stream?

In plain English: A stream is like a river of data—it flows continuously, you can observe it at any point, but you can't "finish" observing a river because it keeps flowing.

In technical terms: A stream is data that is incrementally made available over time. In stream processing, a record is commonly called an event—a small, self-contained, immutable object containing details of something that happened at some point in time, usually with a timestamp.

Events come from many sources:

👤
User Actions
  • Page views
  • Button clicks
  • Shopping cart updates
  • Form submissions
📡
Sensors & IoT
  • Temperature readings
  • GPS coordinates
  • Machine metrics
  • Heart rate monitors
⚙️
System Events
  • Log entries
  • Database changes
  • API calls
  • Error reports

Event structure:

  • Timestamp: When the event occurred (according to some clock)
  • Type: What happened (e.g., "user_registered", "payment_processed")
  • Payload: Details specific to the event (user ID, amount, etc.)
  • Encoding: JSON, Avro, Protobuf, or other serialization format

Related events are grouped into a topic or stream. A producer (also called publisher or sender) generates events. A consumer (also called subscriber or recipient) processes events.


2. Transmitting Event Streams

In batch processing, files connect producers and consumers. What's the streaming equivalent?

2.1. Messaging Systems

In plain English: A messaging system is like a postal service for events. Producers send packages (events), the service delivers them to recipients (consumers), and handles complications like lost mail, wrong addresses, and recipients being away from home.

In technical terms: A messaging system enables producers to send events that are pushed to consumers. Unlike a simple TCP connection between one sender and one receiver, messaging systems allow multiple producers to send to the same topic and multiple consumers to receive from the same topic.

Two critical questions determine system design:

  1. What happens if producers send faster than consumers can process?

    • Drop messages: Simplest but loses data
    • Buffer in queue: Safe but queues can grow unbounded
    • Backpressure: Block producer until consumer catches up
  2. What happens if nodes crash?

    • In-memory only: Fast but messages lost on crash
    • Write to disk: Durable but slower
    • Replicate: Highest durability, highest cost
Low Durability
Fast but can lose messages
  • In-memory buffers
  • UDP multicast
  • StatsD metrics
💾
Medium Durability
Balanced performance and reliability
  • RabbitMQ
  • ActiveMQ
  • Google Cloud Pub/Sub
🛡️
High Durability
Replicated, disk-backed, recoverable
  • Apache Kafka
  • Amazon Kinesis
  • Apache Pulsar

2.2. Direct Messaging

Some systems skip intermediary nodes and connect producers directly to consumers:

Approaches:

  1. UDP multicast: Used in financial systems for stock market feeds where low latency is critical. Unreliable but fast.

  2. ZeroMQ/nanomsg: Brokerless messaging libraries implementing publish/subscribe over TCP or IP multicast.

  3. Webhooks: Consumer exposes an HTTP endpoint, producer makes requests to it when events occur.

Limitations:

  • Consumers must be constantly online
  • If consumer is offline, it misses messages
  • Producer crash loses buffered messages
  • Limited fault tolerance

2.3. Message Brokers

In plain English: Think of a message broker as a post office. Instead of direct delivery (you personally taking mail to someone's house), you drop letters at the post office, which handles delivery even if the recipient isn't home right now.

In technical terms: A message broker is essentially a database optimized for message streams. It runs as a server with producers and consumers connecting as clients. Producers write messages to the broker; consumers read from it.

Producer 1
Producer 2
write
Message Broker
Consumer 1
Consumer 2

Benefits:

  • Tolerates clients connecting, disconnecting, and crashing
  • Durability moves from individual clients to centralized broker
  • Can buffer messages if consumers are slow
  • Asynchronous delivery

Message brokers vs. databases:

AspectDatabaseMessage Broker
Data retentionUntil explicitly deletedAuto-delete after delivery
Working setCan be very largeAssumes small (short queues)
Query capabilitiesRich (SQL, indexes)Simple (pattern matching)
Data changesPolling requiredPush notifications
Performance with large queuesGenerally stableDegrades if queues grow

Traditional message brokers: RabbitMQ, ActiveMQ, HornetQ, TIBCO, IBM MQ, Azure Service Bus, Google Cloud Pub/Sub

Multiple Consumers

When multiple consumers read from the same topic, two main patterns emerge:

Load Balancing
Producer
Message Queue
Consumer A
Consumer B
Consumer C
Fan-out
Producer
Topic
→ All Consumers A
→ All Consumers B
→ All Consumers C

Load balancing: Each message goes to one consumer (share the work). Useful when processing is expensive and you want to parallelize.

Fan-out: Each message goes to all consumers (broadcast). Useful when multiple independent systems need the same data.

Kafka combines both: Consumer groups enable load balancing within a group, with fan-out across groups.

Acknowledgments and Redelivery

Consumers may crash while processing. Acknowledgments ensure messages aren't lost:

  1. Broker delivers message to consumer
  2. Consumer processes message
  3. Consumer sends acknowledgment to broker
  4. Broker deletes message from queue

If connection closes without acknowledgment, broker assumes message wasn't processed and redelivers it to another consumer.

Problem: This can reorder messages. If Consumer 2 crashes processing message m3 while Consumer 1 processes m4, m3 gets redelivered to Consumer 1, which then processes messages out of order: m4, m3, m5.

Dead Letter Queues (DLQs): Handle messages that repeatedly fail processing (e.g., malformed data). Instead of retrying forever, move them to a separate queue for manual inspection. Monitoring alerts operators to investigate, fix, or discard problematic messages.

2.4. Log-Based Message Brokers

In plain English: Traditional message brokers think of messages as transient—deliver them, then delete them. This is like reading a letter and throwing it away. Log-based brokers think differently: they're like keeping a diary where you can go back and reread entries anytime.

In technical terms: Log-based message brokers combine database-style durable storage with low-latency notifications. They use an append-only log on disk, similar to database write-ahead logs and replication logs.

Log-Based Message Broker Architecture
Topic (sharded across partitions)
Partition 0
[0] Event A
[1] Event B
[2] Event C
[3] Event D
Partition 1
[0] Event E
[1] Event F
[2] Event G
[3] Event H
Partition 2
[0] Event I
[1] Event J
[2] Event K
[3] Event L

How it works:

  1. Producer sends message: Append to end of log (fast, sequential write)
  2. Consumer reads messages: Read log sequentially from current offset
  3. Consumer reaches end: Wait for notification of new messages
  4. Scale throughput: Shard log across multiple machines (partitions)

Within each partition, the broker assigns a monotonically increasing offset (sequence number) to every message. Partitions are totally ordered, but no ordering guarantee across different partitions.

Systems: Apache Kafka, Amazon Kinesis, Google Cloud Pub/Sub, Apache Pulsar

💡 Insight

Log-based brokers achieve millions of messages per second by combining two key properties: (1) sequential disk I/O is extremely fast—comparable to network bandwidth, and (2) sharding across machines multiplies capacity. This makes them suitable for both messaging and durable storage.

Logs Compared to Traditional Messaging

Fan-out: Trivially supported—multiple consumers independently read the log without affecting each other.

Load balancing: Instead of assigning individual messages, assign entire partitions to consumers in a group. Each consumer reads all messages in its assigned partitions sequentially.

Load Balancing in Log-Based Systems
Partition 0
Consumer 1
Partition 1
Consumer 1
Partition 2
Consumer 2
Partition 3
Consumer 2

Trade-offs:

Downsides:

  • Maximum parallelism limited by number of partitions
  • Slow message blocks all subsequent messages in partition (head-of-line blocking)

Best fit:

  • High message throughput
  • Fast per-message processing
  • Message ordering is important
  • All messages for a key should go to same partition (e.g., all events for user_id=123)

Consumer Offsets

Consuming a partition sequentially simplifies tracking: all messages with offset less than consumer's current offset are processed, greater offsets are not yet seen.

Broker only needs to periodically record consumer offsets—no per-message acknowledgments. Reduced bookkeeping increases throughput.

This is identical to database replication: broker behaves like a leader, consumer like a follower. If consumer fails, another takes over at the last recorded offset.

Disk Space Usage

Append-only logs eventually fill disk. Solutions:

Segmentation: Divide log into segments, delete or archive old segments. Acts like a large circular buffer.

How much can you buffer?

Back-of-the-envelope calculation:

  • Typical large hard drive: 20 TB capacity, 250 MB/s sequential write
  • Time to fill: 20,000,000 MB ÷ 250 MB/s = 80,000 seconds ≈ 22 hours
  • With multiple disks/machines: days or weeks of messages

Modern approach: Store older messages in object storage (S3, GCS). Systems like Apache Kafka, Redpanda, WarpStream, and Bufstream use tiered storage to keep recent messages on fast disks and older messages in cheap object storage.

Benefit: Messages stored as Iceberg tables enable batch and data warehouse jobs to query directly without copying.

When Consumers Can't Keep Up

If consumer falls so far behind that messages are older than retention, it misses those messages. But only that consumer is affected—others continue normally.

Operational advantage: You can experiment with production logs for development or debugging without disrupting production services. When consumer shuts down, only its offset remains.

Contrast with traditional brokers: unused queues keep accumulating messages, consuming memory.

Replaying Old Messages

In plain English: Traditional message brokers are like watching live TV—once you've seen it, it's gone. Log-based brokers are like Netflix—you can rewatch episodes anytime.

In technical terms: Consuming messages from a log is a read-only operation. The only side effect is the consumer offset moving forward, but the consumer controls this.

Use cases:

  1. Debugging: Start consumer with yesterday's offsets, reprocess, write output elsewhere
  2. Bug fixes: Fix buggy processing code, replay messages from before the bug
  3. Experimentation: Try new processing logic on historical data
  4. Integration: New system can consume full history to build its initial state

This makes log-based messaging similar to batch processing: derived data is separated from input through a repeatable transformation.


3. Databases and Streams

Log-based message brokers take ideas from databases and apply them to messaging. We can also go in reverse: apply ideas from streams to databases.

3.1. Keeping Systems in Sync

In plain English: Imagine you have a restaurant chain. Each location needs the same menu. When you update prices at headquarters, every location must get the update. If you fax each location separately, timing issues cause inconsistencies. Better to have one authoritative source that all locations sync from.

In technical terms: Most applications combine multiple technologies: OLTP database, cache, search index, data warehouse. Each has its own copy of data in its own representation. They must stay in sync.

Heterogeneous Data Systems
Source: PostgreSQL Database
How to sync?
Redis Cache
Elasticsearch
Snowflake Warehouse

Traditional approach: Dual writes

Application code explicitly writes to each system:

# DON'T DO THIS
def update_user(user_id, new_email):
db.execute("UPDATE users SET email = ? WHERE id = ?", new_email, user_id)
cache.set(f"user:{user_id}", new_email)
search_index.update(user_id, {"email": new_email})

Problems with dual writes:

  1. Race conditions: Two concurrent updates may arrive at different systems in different orders, causing permanent inconsistency.
Dual Write Race Condition
Client 1: Set X=A
Database: X=A
Search: X=A (late)
Client 2: Set X=B
Database: X=B (wins)
Search: X=A (wins)

Result: Database has X=B, search index has X=A. Permanently inconsistent.

  1. Partial failures: One write succeeds, another fails. Systems diverge.

  2. No single leader: Database has a leader, search index has a leader, but neither follows the other.

💡 Insight

The root problem with dual writes is that there's no single source of truth determining the order of writes. Without a single leader (like the database), concurrent writes create conflicts that can't be resolved consistently across systems.

3.2. Change Data Capture

In plain English: Instead of your application manually telling every system about changes, let the database broadcast its changes like a radio station. Other systems tune in and stay updated automatically.

In technical terms: Change Data Capture (CDC) is the process of observing all data changes written to a database and extracting them in a form that can be replicated to other systems. CDC makes changes available as a stream, immediately as they're written.

Change Data Capture Flow
Client 1: Set X=A
Client 2: Set X=B
PostgreSQL (decides order: A then B)
Replication log
CDC Stream: [X=A, X=B]
Same order guaranteed
Search: X=B
Cache: X=B
Warehouse: X=B

How it works:

  1. Database is the leader (system of record)
  2. CDC captures changes from replication log
  3. Log-based message broker transports changes
  4. Derived systems consume changes in same order
  5. All systems eventually have same data

Implementation:

Debezium: Open source CDC for MySQL, PostgreSQL, Oracle, SQL Server, MongoDB, Cassandra. Reads replication logs, outputs standardized events.

Other tools:

  • Maxwell (MySQL binlog)
  • GoldenGate (Oracle)
  • pgcapture (PostgreSQL)
  • Kafka Connect (various databases)

Key properties:

  • Asynchronous: Database doesn't wait for consumers
  • Non-blocking: Adding slow consumer doesn't affect database
  • Eventually consistent: Replication lag applies

Initial Snapshot

Problem: CDC captures new changes, but what about existing data?

If you add a new search index, you need a full copy of existing data. Applying only recent changes misses items that weren't recently updated.

Solution: Start with a consistent snapshot at a known log position.

1
Take Snapshot
Consistent copy of database at time T
2
Record Position
Mark log offset at snapshot time
3
Load Snapshot
Build initial state in derived system
4
Resume from Position
Apply CDC events from snapshot point forward

Debezium uses Netflix's DBLog algorithm for incremental snapshots without locking the database.

Log Compaction

In plain English: Instead of keeping every version of every change forever, keep only the latest value for each key. Like keeping only the most recent photo of each person instead of every photo ever taken.

In technical terms: Periodically scan the log for records with the same key, discard duplicates, keep only the most recent update. Tombstones (special null values) indicate deletions.

Log Compaction
Original Log: [cat:1 → 10 views] [dog:2 → 5 views] [cat:1 → 15 views] [cat:1 → 20 views] [dog:2 → 8 views]
Compact
Compacted Log: [cat:1 → 20 views] [dog:2 → 8 views]

Benefits:

  • Disk space depends on current data size, not total writes
  • Rebuild derived system by scanning compacted log from offset 0
  • Get full database copy without taking another snapshot
  • Supported by Apache Kafka

API Support for Change Streams

Most modern databases now expose change streams as first-class features:

Relational:

  • MySQL, PostgreSQL: Replication log streaming
  • Google Cloud Datastream: CDC for Cloud SQL
  • AWS DMS: Database Migration Service with CDC

NoSQL:

  • MongoDB: Change Streams
  • Cassandra: CDC through per-node log segments
  • DynamoDB: DynamoDB Streams

Integration: Kafka Connect integrates CDC from many databases into Kafka topics.

CDC vs. Event Sourcing

Change Data Capture:

  • Application uses database mutably (updates, deletes)
  • CDC extracts changes at low level (replication log)
  • Log reflects what happened to database state

Event Sourcing:

  • Application explicitly writes immutable events
  • Event store is append-only by design
  • Events reflect application-level intent, not state changes
CDC
Event Sourcing
Abstraction Level
Low-level state changes
High-level domain events
Application Changes
Minimal (add CDC connector)
Significant (redesign around events)
Event Design
Database schema changes
Application domain events
Log Compaction
Possible (keep latest per key)
Often not possible (need full history)

CDC and Database Schemas

Challenge: In microservices, databases are implementation details. CDC exposes database schemas as public APIs.

Problem: Developer removes a column → breaks downstream consumers depending on that field.

Solution: Outbox Pattern

1
Internal Tables
Service modifies its own schema freely
2
Outbox Table
Stable schema for external consumers
3
Single Transaction
Both writes in same database transaction
4
CDC from Outbox
External systems consume outbox only

This decouples internal and external schemas while avoiding dual-write problems.

3.3. State, Streams, and Immutability

In plain English: Your bank account balance is the current state. The list of all deposits and withdrawals is the stream of events. The balance is the result of processing the event stream. You can always recalculate the balance by replaying all transactions.

In technical terms: Mutable state and an append-only log of immutable events are two sides of the same coin. The log represents the evolution of state over time.

State and Streams: Two Sides of the Same Coin
State (Current Value)
Account Balance: $1,000
Derived from
Stream (Event Log)
+ $500 deposit
- $200 withdrawal
+ $1,000 paycheck
- $300 rent

Mathematical analogy:

  • State = integral of event stream over time
  • Change stream = derivative of state by time

If you store the changelog durably, state becomes reproducible. Treating the log as the system of record and mutable state as derived makes data flow easier to reason about.

💡 Insight

As database researcher Pat Helland says: "The truth is the log. The database is a cache of a subset of the log." This inverts the traditional view. Once you see databases this way, many integration problems become simpler: just consume the log.

Advantages of Immutable Events

Auditability: Financial bookkeeping has used immutability for centuries. Accountants record transactions in an append-only ledger. Corrections are new transactions, not erasures. Incorrect figures are corrected in the next period, not rewritten in history.

Debugging: If buggy code writes bad data, recovery is easier with immutable events. You can diagnose what happened and fix it. Destructive overwrites make this nearly impossible.

Rich information: Events capture more than just current state. Example: Customer adds item to cart, then removes it. Current state: empty cart. Event log: customer considered the item but decided against it. Valuable for analytics.

Deriving Multiple Views

Separating immutable events from mutable state enables deriving multiple read-oriented representations from the same log.

Multiple Views from One Event Log
Single Event Log (System of Record)
PostgreSQL (Transactions)
Elasticsearch (Search)
Redis (Cache)
Snowflake (Analytics)

Benefits:

  • Add new features without modifying existing systems
  • Run old and new systems side-by-side during migration
  • Each system optimized for its use case
  • Shutdown old system when migration complete

This is CQRS (Command Query Responsibility Segregation): write in one form, translate to different read-optimized forms.

Concurrency Control

Downside of CQRS: Asynchronous consumers mean eventual consistency. User writes to log, reads from derived view, might not see their write yet.

Upside of CQRS: Simplifies some concurrency. Much need for multi-object transactions stems from one user action requiring changes in multiple places. With event sourcing, design events as self-contained descriptions. One action = one write (appending event to log).

If log and state are sharded the same way, single-threaded log consumer needs no concurrency control. The log defines a serial order of events.

Limitations of Immutability

Performance: Some workloads have high update/delete rates. Immutable history grows prohibitively large. Compaction and garbage collection become critical.

Legal/regulatory: GDPR requires deleting personal information on demand. Append-only is insufficient—you need to actually delete data.

Truly deleting data is hard: Copies exist in many places—storage engines, filesystems, SSDs, backups. Many storage systems write to new locations rather than overwriting.

Crypto-shredding: Store data encrypted, delete the encryption key to make data unrecoverable. Moves the problem: data is immutable, but key storage is mutable. Requires careful key management.


4. Processing Streams

We've discussed where streams come from and how they're transported. Now: what can you do with them?

Three options:

  1. Write to storage: Database, cache, search index. Keep derived systems in sync (similar to batch ETL).
  2. Push to humans: Email alerts, push notifications, real-time dashboards.
  3. Process to produce output streams: Transform input streams into derived output streams.

This section focuses on option 3: stream operators or jobs that process streams.

4.1. Uses of Stream Processing

Complex Event Processing (CEP)

In plain English: CEP is like a security guard watching multiple camera feeds, looking for specific patterns of suspicious behavior. Not just "person entered building," but "person entered building without badge, then went to restricted area, then accessed server room."

In technical terms: CEP searches for patterns of events in streams, similar to how regular expressions search for character patterns in strings. Queries are stored long-term; events flow past them.

Use cases:

  • Fraud detection: Unusual spending patterns
  • Trading systems: Price movements matching buy/sell rules
  • Manufacturing: Sensor patterns indicating machine failure
  • Security: Attack signatures in network traffic

Inversion of database model:

  • Traditional DB: Store data, queries are transient
  • CEP: Store queries, data is transient

Implementations: Esper, Apama, TIBCO StreamBase, Flink SQL, Spark Streaming SQL

Stream Analytics

In plain English: Stream analytics is like a sports commentator giving you live stats—"average speed this lap," "fastest time in last 5 minutes," "this is 15% faster than last week."

In technical terms: Analytics over streams focuses on aggregations and statistics over many events, typically within time windows.

Common metrics:

  • Rate of events (requests per second)
  • Rolling averages (average response time over 5 minutes)
  • Trend detection (traffic higher than same time last week)
  • Percentiles (99th percentile latency)

Time windows: Aggregate over fixed intervals (e.g., last 5 minutes). Smooths out irrelevant fluctuations while providing timely picture of changes.

Probabilistic algorithms: Used for memory efficiency:

  • Bloom filters: Set membership
  • HyperLogLog: Cardinality estimation
  • t-digest: Percentile approximation

These produce approximate results but use much less memory than exact algorithms.

Frameworks: Apache Storm, Spark Streaming, Flink, Samza, Kafka Streams, Google Cloud Dataflow, Azure Stream Analytics

💡 Insight

Stream processing is not inherently lossy or approximate. Probabilistic algorithms are an optimization choice for scenarios where approximate answers are acceptable and memory is limited. Many stream applications require exact results and get them.

Maintaining Materialized Views

CDC keeps derived systems up-to-date with source databases. This is materialized view maintenance—deriving alternative views and updating them when underlying data changes.

Traditional database approach: Refresh materialized views periodically (e.g., PostgreSQL's REFRESH MATERIALIZED VIEW).

Problems:

  • Inefficient: Reprocesses all data even if most unchanged
  • Stale: Changes not reflected until next refresh

Incremental View Maintenance (IVM): Convert SQL to operators capable of incremental computation. Process only changed data, dramatically increasing freshness and efficiency.

1
Source Data Changes
New order placed in database
2
Detect Changes
CDC captures insert event
3
Incremental Update
Update only affected aggregates
4
View Updated
Materialized view reflects change in seconds

Databases with IVM: Materialize, RisingWave, ClickHouse, Feldera

These support both real-time stream ingestion and large-scale data warehouse queries.

Search on Streams

In plain English: Instead of searching documents for queries, search queries for matching documents. Like setting up alerts for "tell me when any news mentions my company" rather than manually searching news every day.

In technical terms: Store queries long-term, run documents past them as they arrive. When document matches query, emit alert.

Use cases:

  • Media monitoring: Alert when news mentions specific companies/topics
  • Real estate: Notify when property matches search criteria
  • Job boards: Alert when job matches resume keywords

Optimization: Index the queries themselves to narrow down which might match each document.

Implementation: Elasticsearch percolator

4.2. Reasoning About Time

Stream processors often need time windows ("average over last 5 minutes"). But what does "last 5 minutes" mean?

Event Time vs. Processing Time

Event time: When the event actually occurred (embedded timestamp) Processing time: When the processor handles the event (system clock)

Event Time vs. Processing Time
Events created: 10:03, 10:04, 10:05, 10:06, 10:07
Network delays, queuing
Events processed: 10:08, 10:08, 10:09, 10:09, 10:10
Wrong: "Traffic spike at 10:08!" (processing time)
Correct: "Steady traffic 10:03-10:07" (event time)

Why processing time is problematic:

  • Queueing delays
  • Network faults
  • Performance issues
  • Consumer restarts
  • Reprocessing historical data
  • Message reordering

Analogy: Star Wars movies released out of chronological order. Episode IV (1977), V (1980), VI (1983), then I (1999), II (2002), III (2005). Processing order (when you watched) ≠ event order (narrative timeline).

Using processing time introduces artifacts. Example: Streaming processor shut down for 1 minute. Backlog processed when restarted. If measuring requests per second by processing time, appears as huge spike. Reality: steady rate.

Knowing When You're Ready

Problem: With event-time windows, when do you know all events for a window have arrived?

Example: Counting requests per minute. You've seen events with timestamps in the 37th minute. Time moves on; now seeing events from 38th and 39th minutes. When do you declare the 37th-minute window complete?

Challenges:

  • Events may be buffered elsewhere
  • Network interruptions delay delivery
  • Stragglers arrive after window "closed"

Options:

  1. Ignore stragglers: Drop late events, track dropped count as metric
  2. Publish corrections: Output updated value when stragglers arrive, retract previous output

Watermarks: Special message indicating "no more events with timestamp earlier than T." Helps trigger window completion.

Complexity with multiple producers: Each producer has its own minimum timestamp threshold. Consumer must track all producers individually.

Clock Accuracy

Mobile devices: Events buffered offline, sent hours or days later. Timestamp should be when user interaction occurred, but device clocks are unreliable (accidentally or deliberately wrong).

Solution: Three timestamps approach

  1. Event occurrence time (device clock)
  2. Event sent time (device clock)
  3. Event received time (server clock)

Subtract timestamp 2 from 3 to estimate clock offset. Apply offset to timestamp 1 to estimate true event time.

Types of Windows

⏱️
Tumbling Window
Fixed length, no overlap
  • Each event in exactly one window
  • Example: 1-minute buckets
  • 10:03:00-10:03:59, then 10:04:00-10:04:59
🦘
Hopping Window
Fixed length, overlapping
  • Windows overlap for smoothing
  • Example: 5-minute window, 1-minute hop
  • 10:03:00-10:07:59, then 10:04:00-10:08:59
📊
Sliding Window
Events within interval of each other
  • Dynamic window boundaries
  • Example: Events within 5 minutes
  • Keep sorted buffer, remove expired events
👤
Session Window
User activity sessions
  • No fixed duration
  • Groups events for same user close in time
  • Ends after period of inactivity (e.g., 30 min)

4.3. Stream Joins

Just like batch jobs join datasets, stream processors join streams. Three types:

Stream-Stream Join (Window Join)

Scenario: Detect click-through rate on search results.

Events:

  • Search event: User searches for "laptops"
  • Click event: User clicks search result

Challenge:

  • Click may never come (user abandons search)
  • Delay between search and click varies (seconds to weeks)
  • Network delays may deliver click before search

Implementation:

1
Buffer Events
Keep events from last hour, indexed by session_id
2
New Event Arrives
Search or click event occurs
3
Check for Match
Look up session_id in other index
4
Emit Result
If match found, emit joined event

Emit "clicked" event when both arrive, or "not clicked" when search expires without click.

Stream-Table Join (Stream Enrichment)

Scenario: Enrich activity events with user profile data.

Input:

  • Stream: Activity events (user_id, action, timestamp)
  • Table: User profiles (user_id, name, age, location)

Goal: Output activity events with profile information included.

Stream Enrichment
Activity Event
user_id: 123, action: view_product
Lookup
Local Profile Copy
user_id: 123, age: 32, city: Seattle
Enriched Event
user_id: 123, action: view_product, age: 32, city: Seattle

Implementation:

  1. Load user profile database into stream processor (hash join)
  2. Subscribe to user profile changelog via CDC
  3. Update local copy when profiles change
  4. Join activity events with current profile data

This is actually a stream-stream join where the profile changelog has an infinite window (beginning of time) with newer values overwriting older ones.

Table-Table Join (Materialized View Maintenance)

Scenario: Social network timeline cache.

Requirements:

  • When user posts, add to followers' timelines
  • When post deleted, remove from timelines
  • When user follows someone, add their recent posts to timeline
  • When user unfollows, remove their posts from timeline

Streams needed:

  • Posts stream (create, delete)
  • Follows stream (follow, unfollow)

Maintain state: Set of followers for each user

Equivalent SQL:

SELECT follows.follower_id AS timeline_id,
array_agg(posts.* ORDER BY posts.timestamp DESC)
FROM posts
JOIN follows ON follows.followee_id = posts.sender_id
GROUP BY follows.follower_id

The stream processor maintains this materialized view, updating it incrementally as events arrive.

💡 Insight

If you regard a stream as the derivative of a table (changes over time), and a join as a product of two tables (u·v), the stream of changes to the join follows the product rule: (u·v)′ = u′v + uv′. Any change in posts joins with current followers, and any change in followers joins with current posts.

Time-Dependence of Joins

All three join types maintain state from one input and query it with the other input. Order matters.

Example: User updates profile. Which activity events join with old profile vs. new profile?

If event ordering across streams is nondeterministic, the join is nondeterministic. Running the job again on same input may produce different output.

In data warehouses: Known as slowly changing dimension (SCD). Solution: Use unique identifier for each version. Invoice records tax rate version ID at time of sale, making join deterministic but preventing log compaction.

4.4. Fault Tolerance

Batch processing tolerates faults easily: restart failed task, discard partial output. Stream processing is harder: streams are infinite, can't restart from beginning.

Microbatching and Checkpointing

Microbatching (Spark Streaming):

  • Break stream into small blocks (typically ~1 second)
  • Treat each block as mini-batch
  • Provides exactly-once semantics within framework
  • Trade-off: Smaller batches = more overhead, larger batches = higher latency

Checkpointing (Apache Flink):

  • Periodically snapshot state to durable storage
  • On crash, restart from most recent checkpoint
  • Barriers in message stream trigger checkpoints
  • Doesn't force particular window size

Limitation: Both approaches lose exactly-once guarantee when output leaves the framework (external database, message broker, emails). Restarted task causes external side effects twice.

Atomic Commit

Goal: All outputs and side effects of processing an event happen atomically or not at all:

  • Messages to downstream operators
  • Database writes
  • State changes
  • Input acknowledgments (offset advances)

Implementations:

  • Google Cloud Dataflow
  • VoltDB
  • Apache Kafka

Unlike XA transactions (Chapter 10), these keep transactions internal to the stream processing framework. Transaction overhead amortized by processing multiple messages per transaction.

Idempotence

In plain English: An idempotent operation produces the same result whether you do it once or multiple times. Like pressing an elevator button—pressing it twice doesn't call two elevators.

Examples:

  • Idempotent: Set key to fixed value (writing again overwrites with identical value)
  • Not idempotent: Increment counter (incrementing again changes result)

Making operations idempotent:

Include Kafka message offset with database write:

# Check if update already applied
if db.get_last_offset(key) < message.offset:
db.write(key, value, message.offset)

Requirements:

  • Failed task replays same messages in same order (log-based broker)
  • Processing is deterministic
  • No concurrent updates to same value
  • Fencing to prevent "zombie" nodes

Storm's Trident uses this approach.

Rebuilding State After Failure

Stream processors with state (windowed aggregations, join tables/indexes) must recover state after failure.

Options:

  1. Remote datastore: Replicate to database, query on each message (slow)
  2. Local state: Keep state local, replicate periodically (fast, more complex)

Strategies:

  • Flink: Periodic snapshots to distributed filesystem
  • Kafka Streams: Replicate state changes to dedicated Kafka topic with log compaction
  • VoltDB: Redundantly process each message on several nodes
  • Rebuild from input: If state is short-window aggregations, replay input events

Trade-offs depend on infrastructure: Network vs. disk speed varies. No universal ideal. Local vs. remote state may shift as technology evolves.


5. Summary

In this chapter, we explored stream processing: continuously processing unbounded event streams.

Key concepts:

Message brokers:

  • AMQP/JMS-style: Individual message assignment and acknowledgment, auto-delete after processing. Good for task queues and asynchronous RPC where order doesn't matter.
  • Log-based: All messages in partition go to same consumer, delivered in order. Consumers checkpoint offsets. Messages retained on disk for replay. Good for high throughput, ordered processing, and maintaining derived state.

Sources of streams:

  • User activity events
  • Sensor readings
  • Data feeds (e.g., financial market data)
  • Database writes (changelog via CDC or event sourcing)

Change Data Capture (CDC):

  • Make database the leader, derived systems follow
  • Consume changelog to keep search indexes, caches, warehouses up-to-date
  • Build fresh views by consuming log from beginning
  • Log compaction retains only latest value per key

Stream processing applications:

  • Complex Event Processing (CEP): Pattern matching on event sequences
  • Stream analytics: Windowed aggregations and statistics
  • Materialized view maintenance: Keep derived data up-to-date
  • Search on streams: Match documents against standing queries

Time in streams:

  • Event time: When event actually occurred
  • Processing time: When event was processed
  • Stragglers arrive after window declared complete
  • Window types: tumbling, hopping, sliding, session

Stream joins:

  • Stream-stream: Join events within time window (e.g., search and click)
  • Stream-table: Enrich events with database data (maintain local copy via CDC)
  • Table-table: Maintain materialized view of joined tables

Fault tolerance:

  • Microbatching: Break stream into small blocks
  • Checkpointing: Periodic state snapshots
  • Atomic commit: All effects happen together or not at all
  • Idempotence: Make retries safe by detecting duplicates

💡 Insight

The deepest insight of stream processing is the duality between streams and tables: streams are the derivative of tables (changes over time), and tables are the integral of streams (accumulated state). This perspective unifies batch and stream processing, databases and message brokers, into a coherent framework for managing data over time.

Stream processing enables building systems that react to events in real-time while maintaining consistency and fault tolerance. Combined with batch processing for historical analysis, it provides a complete toolkit for data-intensive applications.


Previous: Chapter 11 | Next: Chapter 13