Chapter 11. Batch Processing
A system cannot be successful if it is too strongly influenced by a single person. Once the initial design is complete and fairly robust, the real test begins as people with many different viewpoints undertake their own experiments.
Donald Knuth
Table of Contents
- Introduction
- Batch Processing with Unix Tools
- Batch Processing in Distributed Systems
- 3.1. Distributed Filesystems
- 3.2. Object Stores
- 3.3. Distributed Job Orchestration
- 3.4. Resource Allocation
- 3.5. Scheduling Workflows
- 3.6. Handling Faults
- Batch Processing Models
- 4.1. MapReduce
- 4.2. Dataflow Engines
- 4.3. Shuffling Data
- 4.4. JOIN and GROUP BY
- Query Languages and DataFrames
- Batch Use Cases
- 6.1. Extract-Transform-Load (ETL)
- 6.2. Analytics
- 6.3. Machine Learning
- 6.4. Bulk Data Imports
- Summary
1. Introduction
In plain English:
- Think of batch processing like doing laundry
- Instead of washing each shirt individually (exhausting!)
- You wait until you have a full load
- Then process everything at once
- Same with data—collect it, process together
In technical terms:
- Batch processing takes a set of input data (read-only)
- Produces output data (generated from scratch every run)
- Unlike online transactions, batch jobs don't mutate data
- They derive new outputs from existing inputs
Why it matters:
- Bug in the output? Roll back code, rerun the job
- This "human fault tolerance" enables faster development
- Mistakes aren't irreversible
1.1. Online vs. Batch vs. Stream
Examples:
- Web servers
- Databases
- APIs
Examples:
- Log analysis
- ML training
- ETL pipelines
Examples:
- Fraud detection
- Live dashboards
- CDC replication
| System Type | Response Time | Input | Examples |
|---|---|---|---|
| Online | Milliseconds | Individual requests | Web servers, APIs, databases |
| Batch | Minutes to days | Bounded datasets | ETL, ML training, analytics |
| Stream | Seconds | Unbounded events | Real-time dashboards, CDC |
1.2. Why Batch Processing Matters
Batch processing offers unique advantages:
- A fundamental building block for reliable systems
- Enables patterns impossible in online systems
💡 Insight
The key property of batch processing is immutability: inputs are never modified. This seemingly simple constraint has profound implications:
- Something goes wrong? Rerun the job to produce correct output
- Same input files can be used by multiple jobs for different purposes
- Compare outputs across job runs to detect anomalies
Benefits of batch processing:
-
Human fault tolerance — If you deploy buggy code that produces wrong output, just roll back and rerun. Compare this to a database where bad writes corrupt data permanently.
-
Reproducibility — The same input always produces the same output, making debugging straightforward.
-
Parallel processing — Different parts of the input can be processed independently, enabling horizontal scaling.
-
Cost efficiency — Jobs can run during off-peak hours, use spot instances, and process massive amounts of data economically.
2. Batch Processing with Unix Tools
Before distributed systems, let's understand batch processing with Unix:
- Tools you already have
- Same patterns scale to massive systems
2.1. Simple Log Analysis
Say you have a web server log file and want to find the five most popular pages:
216.58.210.78 - - [27/Jun/2025:17:55:11 +0000] "GET /css/typography.css HTTP/1.1"
200 3377 "https://martin.kleppmann.com/" "Mozilla/5.0 ..."
You can analyze it with a chain of Unix commands:
cat /var/log/nginx/access.log | # 1. Read the log file
awk '{print $7}' | # 2. Extract the URL (7th field)
sort | # 3. Sort URLs alphabetically
uniq -c | # 4. Count consecutive duplicates
sort -r -n | # 5. Sort by count (descending)
head -n 5 # 6. Take top 5
Output:
4189 /favicon.ico
3631 /2016/02/08/how-to-do-distributed-locking.html
2124 /2020/11/18/distributed-systems-and-elliptic-curves.html
1369 /
915 /css/typography.css
💡 Insight
This pipeline processes gigabytes in seconds because each tool does one thing well and data flows between them efficiently. The key pattern is sort → group → aggregate, which is exactly what distributed batch systems like MapReduce use at massive scale.
2.2. Chain of Commands vs. Custom Program
The same analysis in Python:
from collections import defaultdict
counts = defaultdict(int) # Counter for each URL
with open('/var/log/nginx/access.log', 'r') as file:
for line in file:
url = line.split()[6] # Extract URL (7th field, 0-indexed)
counts[url] += 1 # Increment counter
# Sort by count descending, take top 5
top5 = sorted(((count, url) for url, count in counts.items()),
reverse=True)[:5]
for count, url in top5:
print(f"{count} {url}")
Both approaches work, but the execution models differ fundamentally:
| Aspect | Unix Pipeline | Python Script |
|---|---|---|
| Aggregation | Sort then count adjacent | Hash table in memory |
| Memory | Streaming (minimal) | Proportional to unique URLs |
| Large data | Spills to disk automatically | May run out of memory |
2.3. Sorting vs. In-Memory Aggregation
In plain English: Imagine counting votes. You could either keep a tally sheet (hash table) where you update counts as you go, or you could sort all the ballots by candidate name first, then just count how many are in each pile.
When to use which:
-
Hash table (in-memory): Fast when all unique keys fit in memory. If you have 1 million log entries but only 10,000 unique URLs, a hash table works great.
-
Sorting: Better when data exceeds memory. The
sortutility automatically spills to disk and parallelizes across CPU cores. Mergesort has sequential access patterns that perform well on disk.
💡 Insight
The Unix
sortcommand is deceptively powerful—it automatically handles larger-than-memory datasets by spilling to disk and parallelizes across CPU cores. This is the same principle that distributed batch systems use: sort, merge, and scan.
Limitation: Unix tools run on a single machine. When datasets exceed local disk capacity, we need distributed batch processing frameworks.
3. Batch Processing in Distributed Systems
A distributed batch framework is a distributed operating system:
- Just as your laptop has storage, scheduler, programs connected by pipes
- Distributed frameworks have the same components—at scale
3.1. Distributed Filesystems
In plain English:
- Think of a distributed filesystem like a library with multiple branches
- Books (data blocks) are stored across different branches (machines)
- There's a central catalog (metadata service) that knows where everything is
- If one branch burns down, copies exist at other branches
Key components:
| Component | Local Filesystem | Distributed Filesystem |
|---|---|---|
| Block size | 4 KB (ext4) | 128 MB (HDFS) or 4 MB (S3) |
| Data nodes | Single disk | Many machines |
| Metadata | Inodes on disk | NameNode / metadata service |
| Redundancy | RAID | Replication or erasure coding |
| Access | VFS API | DFS protocol (HDFS, S3 API) |
How it works:
- Files are split into large blocks (128 MB in HDFS)
- Each block is replicated across multiple machines (typically 3)
- A metadata service tracks which machines store which blocks
- Clients read blocks from any replica; writes go to all replicas
Block A
Block B
Block D
Block A
Block C
Block D
Block B
Block C
Block A
File "logs.parquet" = [Block A, Block B, Block C, Block D]
Each block replicated 3x across different nodes
💡 Insight
DFS blocks are much larger than local filesystem blocks (128 MB vs 4 KB) because:
- Overhead of tracking each block scales with the number of blocks
- At petabyte scale, millions of 4 KB blocks would overwhelm metadata service
- Large blocks amortize network overhead
- More efficient to stream 128 MB than make 32,000 separate 4 KB requests
3.2. Object Stores
Object stores have become the dominant storage layer:
- S3, GCS, Azure Blob
- Replacing HDFS in many deployments
- Simpler operations model
Key differences from distributed filesystems:
| Feature | Distributed FS (HDFS) | Object Store (S3) |
|---|---|---|
| Operations | Open, seek, read, write, close | GET, PUT (whole object) |
| Mutability | Files can be appended | Objects are immutable |
| Directories | True directories | Key prefixes (simulated) |
| Renames | Atomic | Copy + delete (non-atomic) |
| Compute locality | Tasks run on data nodes | Storage/compute separated |
| Cost model | Capacity-based | Request + capacity |
Object URL structure:
s3://my-data-bucket/2025/06/27/events.parquet
└── bucket ──┘ └─────── key ─────────┘
Key point:
- The slashes in the key are just conventions—no real directories
- Listing objects with prefix
2025/06/returns all matching keys
3.3. Distributed Job Orchestration
In plain English:
- An orchestrator is like a construction site foreman
- When you want to build something (run a job):
- The foreman figures out which workers are available
- Assigns tasks
- Monitors progress
- Handles problems when workers get sick or equipment breaks
Components of job orchestration:
• Receives job requests
• Decides which tasks run on which nodes
• Balances fairness vs. efficiency
• Tracks all nodes and their resources (CPU, GPU, memory)
• Maintains global cluster state in ZooKeeper/etcd
• Knows what's running where
Task D
Task E
Task F
Executors: Run tasks, send heartbeats, report status
Use cgroups for resource isolation between tasks
Job request metadata includes:
- Number of tasks to execute
- Resources per task (CPU, memory, disk, GPU)
- Job identifier and access credentials
- Input/output data locations
- Executable code location
3.4. Resource Allocation
The scheduling problem:
- Scheduling is NP-hard—optimal allocation is computationally infeasible
- Real schedulers use heuristics instead
Consider this scenario:
A cluster has 160 CPU cores. Two jobs arrive, each requesting 100 cores. What should the scheduler do?
Options and trade-offs:
| Strategy | Behavior | Trade-off |
|---|---|---|
| Fair share | Run 80 tasks from each job | Neither finishes as fast as possible |
| Gang scheduling | Wait for all 100 cores, run one job | Nodes sit idle while waiting |
| FIFO | First job gets everything | Second job may starve |
| Preemption | Kill some tasks to make room | Wasted work from killed tasks |
Common scheduling heuristics:
- FIFO (first come, first served)
- Dominant Resource Fairness (DRF)
- Priority queues
- Capacity-based scheduling
- Bin-packing algorithms
3.5. Scheduling Workflows
Batch jobs often form workflows (DAGs):
- DAG = Directed Acyclic Graph
- Output of one job feeds into another
- Scheduler ensures jobs run in correct order
Workflow schedulers: Airflow, Dagster, Prefect
Wait for all inputs before running dependent jobs
💡 Insight
There's an important distinction between:
- Job orchestrators (YARN, Kubernetes) — schedule individual jobs
- Workflow orchestrators (Airflow, Dagster) — manage dependencies between jobs
A workflow with 50-100 interconnected jobs is common in data pipelines.
3.6. Handling Faults
Batch jobs run for long periods—failures are inevitable:
-
Jobs run minutes to days
-
Many parallel tasks
-
Multiple failure modes:
-
Hardware faults (especially on commodity hardware)
-
Network interruptions
-
Preemption by higher-priority jobs
-
Spot instance terminations (to save cost)
Fault tolerance strategies:
| System | Intermediate Data | Recovery Method |
|---|---|---|
| MapReduce | Written to DFS | Reread from DFS |
| Spark | Kept in memory | Recompute from lineage |
| Flink | Periodic checkpoints | Restore from checkpoint |
💡 Insight
Because batch jobs regenerate output from scratch, fault recovery is simpler:
- Just delete partial output and rerun the failed task
- This wouldn't work if the job had side effects (like sending emails)
- That's why batch processing emphasizes immutable inputs and pure transformations
4. Batch Processing Models
4.1. MapReduce
MapReduce mirrors our Unix log analysis example:
- Same pattern: read → extract → sort → group → aggregate
Unix: cat | awk | sort | uniq -c | sort -rn | head
MapReduce: Read → Map → Shuffle → Reduce → (Second MapReduce job)
Break input files into records
Input: Parquet/Avro files on HDFS or S3
Extract key-value pairs from each record
Example: (URL, 1) for each log line
Sort by key, group values with same key
All values for "page.html" go to same reducer
Process grouped values, produce output
Example: sum up the 1s → ("page.html", 42)
Mapper and Reducer:
# Mapper: called for each input record
def mapper(record):
url = record['request_url']
yield (url, 1) # Emit key-value pair
# Reducer: called for each unique key with all its values
def reducer(key, values):
count = sum(values) # values is iterator over all 1s
yield (key, count)
💡 Insight
MapReduce's programming model comes from functional programming—specifically Lisp's
mapandreduce(fold) higher-order functions:
- Map is embarrassingly parallel (each input processed independently)
- Reduce processes each key independently
- This makes parallelization trivial
Why MapReduce is mostly obsolete:
- Requires writing map/reduce in a general-purpose language
- No job pipelining (must wait for upstream job to finish completely)
- Always sorts between map and reduce (even when unnecessary)
- Replaced by Spark, Flink, and SQL-based systems
4.2. Dataflow Engines
Modern engines like Spark and Flink improve on MapReduce:
- Flexible operators (not just map/reduce)
- In-memory intermediate data
- Pipelined execution
↓ DFS (always write) ↓ DFS (always write)
← In memory (only shuffle to disk if needed) →
Advantages of dataflow engines:
| Feature | MapReduce | Dataflow Engines |
|---|---|---|
| Sorting | Always between stages | Only when needed |
| Intermediate data | Written to DFS | In-memory or local disk |
| Operator fusion | Each stage separate | Adjacent ops combined |
| Pipelining | Wait for stage completion | Stream between stages |
| Process reuse | New JVM per task | Reuse processes |
4.3. Shuffling Data
In plain English:
- Shuffling is like sorting mail at a post office
- Letters arrive from many mailboxes (mappers)
- Need to organize so all letters for same zip code (key)
- End up in the same bin (reducer)
Each mapper creates sorted files for each reducer
hash(key) determines which reducer gets the key-value pair
Reducers merge sorted files from all mappers
Shuffle process:
- Each mapper creates a separate output file for each reducer
- Key hash determines destination:
hash(key) % num_reducers - Mapper sorts key-value pairs within each file
- Reducers fetch their files from all mappers
- Reducers merge-sort the files together
- Same keys are now adjacent → reducer iterates over values
💡 Insight
Despite the name, shuffle produces sorted order, not random order:
- Term comes from shuffling cards to redistribute, not randomize
- Modern systems like BigQuery keep shuffle data in memory
- Use external shuffle services for resilience
4.4. JOIN and GROUP BY
Shuffling enables distributed joins:
- Bring matching keys to same node
- Process joined data locally
user_id: 123
page: /products
timestamp: 10:30
user_id: 123
page: /checkout
timestamp: 10:35
user_id: 456
page: /home
user_id: 123
birth_date: 1990
name: Alice
SHUFFLE BY user_id
- User profile: {birth_date: 1990, name: Alice} ← arrives first
- Event: {page: /products, timestamp: 10:30}
- Event: {page: /checkout, timestamp: 10:35}
{page: /products, viewer_birth_year: 1990}
{page: /checkout, viewer_birth_year: 1990}
How it works:
- Two mappers: one for events (emit
user_id → event), one for users (emituser_id → profile) - Shuffle brings all records with same
user_idto same reducer - Secondary sort ensures user profile arrives first
- Reducer stores profile in variable, then iterates over events
- Minimal memory: only one user's data in memory at a time
5. Query Languages and DataFrames
5.1. SQL for Batch Processing
As batch systems matured, SQL became the lingua franca:
- Familiar to analysts and engineers
- Optimizers can choose efficient execution plans
- Integrates with existing BI tools
-- Find top pages by age group
SELECT
page,
FLOOR((2025 - birth_year) / 10) * 10 AS age_decade,
COUNT(*) AS views
FROM events e
JOIN users u ON e.user_id = u.user_id
GROUP BY page, age_decade
ORDER BY views DESC
LIMIT 100;
Why SQL won:
- Analysts and developers already know it
- Integrates with existing BI tools (Tableau, Looker)
- Query optimizers can choose efficient execution plans
- More concise than handwritten MapReduce
SQL-based batch engines:
- Hive: SQL on Hadoop/Spark
- Trino (Presto): Federated SQL across data sources
- Spark SQL: SQL on Spark
- BigQuery, Snowflake: Cloud data warehouses
5.2. DataFrames
Data scientists preferred the DataFrame model:
- Familiar from R and Pandas
- Step-by-step transformations
- Works with SQL under the hood
# Pandas-style DataFrame API (runs distributed on Spark)
events_df = spark.read.parquet("s3://data/events/")
users_df = spark.read.parquet("s3://data/users/")
result = (events_df
.join(users_df, "user_id")
.withColumn("age", 2025 - users_df.birth_year)
.groupBy("page")
.agg(
count("*").alias("views"),
avg("age").alias("avg_viewer_age")
)
.orderBy(desc("views"))
.limit(100))
result.write.parquet("s3://output/page-demographics/")
| Aspect | SQL | DataFrame API |
|---|---|---|
| Style | Declarative (what) | Step-by-step (how) |
| Optimization | Query planner | Query planner (Spark) or immediate (Pandas) |
| Familiarity | DBAs, analysts | Data scientists |
| Flexibility | Standard operators | Custom functions easier |
💡 Insight
Spark's DataFrame API is deceptively clever:
- Unlike Pandas which executes immediately, Spark builds a query plan
- Optimizes the plan before execution
df.filter(x).filter(y)becomes a single optimized filter- Not two passes over the data
5.3. Batch Processing and Data Warehouses Converge
Historically separate, batch processing and data warehouses are merging:
- Same underlying technologies (columnar storage, distributed shuffle)
- Object storage (S3) as common foundation
- SQL + DataFrame APIs both supported
• MapReduce, Spark
• Flexible code
• Commodity hardware
• Horizontal scaling
• Teradata, Oracle
• SQL only
• Specialized appliances
• Vertical scaling
• SQL + DataFrame APIs
• Columnar storage (Parquet)
• Distributed shuffle
• Object storage (S3) as foundation
• Same engines for ETL and analytics
When to use which:
| Workload | Better fit |
|---|---|
| SQL analytics | Cloud warehouse (BigQuery, Snowflake) |
| Complex ML pipelines | Batch framework (Spark, Ray) |
| Row-by-row processing | Batch framework |
| Cost-sensitive large jobs | Batch framework |
| Iterative graph algorithms | Batch framework |
6. Batch Use Cases
6.1. Extract-Transform-Load (ETL)
In plain English:
- ETL is like a factory assembly line for data
- Raw materials (source data) come in
- Get processed and quality-checked (transformed)
- Packaged for shipping (loaded to destination)
• Clean data
• Join tables
• Aggregate
• Validate
• Parse JSON
• Filter
• Enrich
Workflow scheduler (Airflow) orchestrates the pipeline
Runs daily/hourly, handles retries, alerts on failure
Why batch for ETL:
- Parallelizable: Filtering, projecting, and joining are embarrassingly parallel
- Debuggable: Inspect failed files, fix code, rerun
- Retryable: Transient failures handled by scheduler
- Orchestrated: Airflow, Dagster provide operators for many systems
6.2. Analytics
Batch systems support two analytics patterns:
- Pre-aggregation (scheduled jobs)
- Ad hoc queries (interactive exploration)
1. Pre-aggregation (scheduled)
-- Runs daily via Airflow
CREATE TABLE daily_sales_cube AS
SELECT
date,
region,
product_category,
SUM(revenue) as total_revenue,
COUNT(*) as transactions
FROM transactions
WHERE date = CURRENT_DATE - 1
GROUP BY date, region, product_category;
2. Ad hoc queries (interactive)
-- Analyst runs this to investigate spike
SELECT
hour,
COUNT(*) as errors,
error_type
FROM logs
WHERE date = '2025-06-27' AND status >= 500
GROUP BY hour, error_type
ORDER BY errors DESC;
6.3. Machine Learning
Batch processing is central to ML workflows:
- Feature engineering at scale
- Model training (gradient descent across huge datasets)
- Batch inference (score millions of records)
Frameworks: Spark MLlib, Ray, Kubeflow, Flyte
OpenAI uses Ray for ChatGPT training
LLM data preparation is a prime batch workload:
- Extract plain text from HTML
- Detect and remove duplicates
- Filter low-quality content
- Tokenize text into embeddings
6.4. Bulk Data Imports
Batch outputs often need to reach production databases.
Why NOT write directly from batch jobs:
| Problem | Why |
|---|---|
| Slow | Network request per record |
| Overwhelming | Thousands of tasks writing simultaneously |
| Inconsistent | Partial results visible if job fails |
Better patterns:
1. Stream through Kafka
Buffer between batch and production
Consumers control their read rate
Multiple downstream systems can consume
2. Bulk file import
Build database files in batch job
Bulk load atomically
Venice, Pinot, Druid support this pattern
7. Summary
In this chapter, we explored batch processing from Unix pipes to petabyte-scale systems.
Core concepts:
- Batch processing transforms immutable inputs into derived outputs
- Same patterns from Unix (
sort | uniq -c) scale to distributed systems - Key operation is shuffle: redistribute data so same keys meet at same node
System components:
- Distributed filesystems (HDFS) and object stores (S3) provide storage
- Orchestrators (YARN, K8s) schedule tasks across machines
- Workflow schedulers (Airflow) manage job dependencies
Processing models:
- MapReduce: map → shuffle → reduce (largely obsolete)
- Dataflow engines (Spark, Flink): flexible operators, in-memory intermediate data
- SQL and DataFrame APIs: high-level, optimizable interfaces
Use cases:
- ETL: move and transform data between systems
- Analytics: pre-aggregation and ad hoc queries
- Machine learning: feature engineering, training, batch inference
- Bulk imports: populate production systems from batch outputs
💡 Insight
The key insight of batch processing is treating computation as pure functions:
- Given the same input, produce the same output (no side effects)
- Jobs become reproducible, retriable, and parallelizable
- When something goes wrong, you can always rerun
- That property is worth its weight in gold for building reliable systems
In Chapter 12, we'll turn to stream processing, where inputs are unbounded and jobs never complete. We'll see how stream processing builds on batch concepts while introducing new challenges around time, ordering, and state.
Previous: Chapter 10. Consistency and Consensus | Next: Chapter 12. Stream Processing