Skip to main content

Chapter 11. Batch Processing

A system cannot be successful if it is too strongly influenced by a single person. Once the initial design is complete and fairly robust, the real test begins as people with many different viewpoints undertake their own experiments.

Donald Knuth

Table of Contents

  1. Introduction
  2. Batch Processing with Unix Tools
  3. Batch Processing in Distributed Systems
  4. Batch Processing Models
  5. Query Languages and DataFrames
  6. Batch Use Cases
  7. Summary

1. Introduction

In plain English:

  • Think of batch processing like doing laundry
  • Instead of washing each shirt individually (exhausting!)
  • You wait until you have a full load
  • Then process everything at once
  • Same with data—collect it, process together

In technical terms:

  • Batch processing takes a set of input data (read-only)
  • Produces output data (generated from scratch every run)
  • Unlike online transactions, batch jobs don't mutate data
  • They derive new outputs from existing inputs

Why it matters:

  • Bug in the output? Roll back code, rerun the job
  • This "human fault tolerance" enables faster development
  • Mistakes aren't irreversible

1.1. Online vs. Batch vs. Stream

THREE STYLES OF DATA PROCESSING
Request → Response
User clicks "Buy Now"
Response in milliseconds

Examples:
- Web servers
- Databases
- APIs

Input → Output
Job processes yesterday's logs
Results in minutes/hours

Examples:
- Log analysis
- ML training
- ETL pipelines

Event → Action
Message arrives in queue
Processed in seconds

Examples:
- Fraud detection
- Live dashboards
- CDC replication

System TypeResponse TimeInputExamples
OnlineMillisecondsIndividual requestsWeb servers, APIs, databases
BatchMinutes to daysBounded datasetsETL, ML training, analytics
StreamSecondsUnbounded eventsReal-time dashboards, CDC

1.2. Why Batch Processing Matters

Batch processing offers unique advantages:

  • A fundamental building block for reliable systems
  • Enables patterns impossible in online systems

💡 Insight

The key property of batch processing is immutability: inputs are never modified. This seemingly simple constraint has profound implications:

  • Something goes wrong? Rerun the job to produce correct output
  • Same input files can be used by multiple jobs for different purposes
  • Compare outputs across job runs to detect anomalies

Benefits of batch processing:

  1. Human fault tolerance — If you deploy buggy code that produces wrong output, just roll back and rerun. Compare this to a database where bad writes corrupt data permanently.

  2. Reproducibility — The same input always produces the same output, making debugging straightforward.

  3. Parallel processing — Different parts of the input can be processed independently, enabling horizontal scaling.

  4. Cost efficiency — Jobs can run during off-peak hours, use spot instances, and process massive amounts of data economically.


2. Batch Processing with Unix Tools

Before distributed systems, let's understand batch processing with Unix:

  • Tools you already have
  • Same patterns scale to massive systems

2.1. Simple Log Analysis

Say you have a web server log file and want to find the five most popular pages:

216.58.210.78 - - [27/Jun/2025:17:55:11 +0000] "GET /css/typography.css HTTP/1.1"
200 3377 "https://martin.kleppmann.com/" "Mozilla/5.0 ..."

You can analyze it with a chain of Unix commands:

cat /var/log/nginx/access.log |   # 1. Read the log file
awk '{print $7}' | # 2. Extract the URL (7th field)
sort | # 3. Sort URLs alphabetically
uniq -c | # 4. Count consecutive duplicates
sort -r -n | # 5. Sort by count (descending)
head -n 5 # 6. Take top 5

Output:

4189 /favicon.ico
3631 /2016/02/08/how-to-do-distributed-locking.html
2124 /2020/11/18/distributed-systems-and-elliptic-curves.html
1369 /
915 /css/typography.css
UNIX PIPELINE DATA FLOW
access.log
1
Full lines from file
2
URLs only extracted
3
Sorted A-Z
4
Counted per URL
5
By count descending
6
Top 5 URLs

💡 Insight

This pipeline processes gigabytes in seconds because each tool does one thing well and data flows between them efficiently. The key pattern is sort → group → aggregate, which is exactly what distributed batch systems like MapReduce use at massive scale.

2.2. Chain of Commands vs. Custom Program

The same analysis in Python:

from collections import defaultdict

counts = defaultdict(int) # Counter for each URL

with open('/var/log/nginx/access.log', 'r') as file:
for line in file:
url = line.split()[6] # Extract URL (7th field, 0-indexed)
counts[url] += 1 # Increment counter

# Sort by count descending, take top 5
top5 = sorted(((count, url) for url, count in counts.items()),
reverse=True)[:5]

for count, url in top5:
print(f"{count} {url}")

Both approaches work, but the execution models differ fundamentally:

AspectUnix PipelinePython Script
AggregationSort then count adjacentHash table in memory
MemoryStreaming (minimal)Proportional to unique URLs
Large dataSpills to disk automaticallyMay run out of memory

2.3. Sorting vs. In-Memory Aggregation

In plain English: Imagine counting votes. You could either keep a tally sheet (hash table) where you update counts as you go, or you could sort all the ballots by candidate name first, then just count how many are in each pile.

When to use which:

  • Hash table (in-memory): Fast when all unique keys fit in memory. If you have 1 million log entries but only 10,000 unique URLs, a hash table works great.

  • Sorting: Better when data exceeds memory. The sort utility automatically spills to disk and parallelizes across CPU cores. Mergesort has sequential access patterns that perform well on disk.

💡 Insight

The Unix sort command is deceptively powerful—it automatically handles larger-than-memory datasets by spilling to disk and parallelizes across CPU cores. This is the same principle that distributed batch systems use: sort, merge, and scan.

Limitation: Unix tools run on a single machine. When datasets exceed local disk capacity, we need distributed batch processing frameworks.


3. Batch Processing in Distributed Systems

A distributed batch framework is a distributed operating system:

  • Just as your laptop has storage, scheduler, programs connected by pipes
  • Distributed frameworks have the same components—at scale
SINGLE MACHINE vs. DISTRIBUTED BATCH SYSTEM

3.1. Distributed Filesystems

In plain English:

  • Think of a distributed filesystem like a library with multiple branches
  • Books (data blocks) are stored across different branches (machines)
  • There's a central catalog (metadata service) that knows where everything is
  • If one branch burns down, copies exist at other branches

Key components:

ComponentLocal FilesystemDistributed Filesystem
Block size4 KB (ext4)128 MB (HDFS) or 4 MB (S3)
Data nodesSingle diskMany machines
MetadataInodes on diskNameNode / metadata service
RedundancyRAIDReplication or erasure coding
AccessVFS APIDFS protocol (HDFS, S3 API)

How it works:

  1. Files are split into large blocks (128 MB in HDFS)
  2. Each block is replicated across multiple machines (typically 3)
  3. A metadata service tracks which machines store which blocks
  4. Clients read blocks from any replica; writes go to all replicas
DISTRIBUTED FILESYSTEM ARCHITECTURE

Block A
Block B
Block D

Block A
Block C
Block D

Block B
Block C
Block A

File "logs.parquet" = [Block A, Block B, Block C, Block D]
Each block replicated 3x across different nodes

💡 Insight

DFS blocks are much larger than local filesystem blocks (128 MB vs 4 KB) because:

  • Overhead of tracking each block scales with the number of blocks
  • At petabyte scale, millions of 4 KB blocks would overwhelm metadata service
  • Large blocks amortize network overhead
  • More efficient to stream 128 MB than make 32,000 separate 4 KB requests

3.2. Object Stores

Object stores have become the dominant storage layer:

  • S3, GCS, Azure Blob
  • Replacing HDFS in many deployments
  • Simpler operations model

Key differences from distributed filesystems:

FeatureDistributed FS (HDFS)Object Store (S3)
OperationsOpen, seek, read, write, closeGET, PUT (whole object)
MutabilityFiles can be appendedObjects are immutable
DirectoriesTrue directoriesKey prefixes (simulated)
RenamesAtomicCopy + delete (non-atomic)
Compute localityTasks run on data nodesStorage/compute separated
Cost modelCapacity-basedRequest + capacity

Object URL structure:

s3://my-data-bucket/2025/06/27/events.parquet
└── bucket ──┘ └─────── key ─────────┘

Key point:

  • The slashes in the key are just conventions—no real directories
  • Listing objects with prefix 2025/06/ returns all matching keys

3.3. Distributed Job Orchestration

In plain English:

  • An orchestrator is like a construction site foreman
  • When you want to build something (run a job):
    • The foreman figures out which workers are available
    • Assigns tasks
    • Monitors progress
    • Handles problems when workers get sick or equipment breaks

Components of job orchestration:

JOB ORCHESTRATION COMPONENTS

• Receives job requests
• Decides which tasks run on which nodes
• Balances fairness vs. efficiency

• Tracks all nodes and their resources (CPU, GPU, memory)
• Maintains global cluster state in ZooKeeper/etcd
• Knows what's running where

Task A
Task D
Task B
Task E
Task C
Task F

Executors: Run tasks, send heartbeats, report status
Use cgroups for resource isolation between tasks

Job request metadata includes:

  • Number of tasks to execute
  • Resources per task (CPU, memory, disk, GPU)
  • Job identifier and access credentials
  • Input/output data locations
  • Executable code location

3.4. Resource Allocation

The scheduling problem:

  • Scheduling is NP-hard—optimal allocation is computationally infeasible
  • Real schedulers use heuristics instead

Consider this scenario:

A cluster has 160 CPU cores. Two jobs arrive, each requesting 100 cores. What should the scheduler do?

Options and trade-offs:

StrategyBehaviorTrade-off
Fair shareRun 80 tasks from each jobNeither finishes as fast as possible
Gang schedulingWait for all 100 cores, run one jobNodes sit idle while waiting
FIFOFirst job gets everythingSecond job may starve
PreemptionKill some tasks to make roomWasted work from killed tasks

Common scheduling heuristics:

  • FIFO (first come, first served)
  • Dominant Resource Fairness (DRF)
  • Priority queues
  • Capacity-based scheduling
  • Bin-packing algorithms

3.5. Scheduling Workflows

Batch jobs often form workflows (DAGs):

  • DAG = Directed Acyclic Graph
  • Output of one job feeds into another
  • Scheduler ensures jobs run in correct order
WORKFLOW / DAG EXAMPLE

Workflow schedulers: Airflow, Dagster, Prefect
Wait for all inputs before running dependent jobs

💡 Insight

There's an important distinction between:

  • Job orchestrators (YARN, Kubernetes) — schedule individual jobs
  • Workflow orchestrators (Airflow, Dagster) — manage dependencies between jobs

A workflow with 50-100 interconnected jobs is common in data pipelines.

3.6. Handling Faults

Batch jobs run for long periods—failures are inevitable:

  • Jobs run minutes to days

  • Many parallel tasks

  • Multiple failure modes:

  • Hardware faults (especially on commodity hardware)

  • Network interruptions

  • Preemption by higher-priority jobs

  • Spot instance terminations (to save cost)

Fault tolerance strategies:

SystemIntermediate DataRecovery Method
MapReduceWritten to DFSReread from DFS
SparkKept in memoryRecompute from lineage
FlinkPeriodic checkpointsRestore from checkpoint

💡 Insight

Because batch jobs regenerate output from scratch, fault recovery is simpler:

  • Just delete partial output and rerun the failed task
  • This wouldn't work if the job had side effects (like sending emails)
  • That's why batch processing emphasizes immutable inputs and pure transformations

4. Batch Processing Models

4.1. MapReduce

MapReduce mirrors our Unix log analysis example:

  • Same pattern: read → extract → sort → group → aggregate
MAPREDUCE PIPELINE

Unix: cat | awk | sort | uniq -c | sort -rn | head

MapReduce: Read → Map → Shuffle → Reduce → (Second MapReduce job)

Break input files into records
Input: Parquet/Avro files on HDFS or S3

Extract key-value pairs from each record
Example: (URL, 1) for each log line

Sort by key, group values with same key
All values for "page.html" go to same reducer

Process grouped values, produce output
Example: sum up the 1s → ("page.html", 42)

Mapper and Reducer:

# Mapper: called for each input record
def mapper(record):
url = record['request_url']
yield (url, 1) # Emit key-value pair

# Reducer: called for each unique key with all its values
def reducer(key, values):
count = sum(values) # values is iterator over all 1s
yield (key, count)

💡 Insight

MapReduce's programming model comes from functional programming—specifically Lisp's map and reduce (fold) higher-order functions:

  • Map is embarrassingly parallel (each input processed independently)
  • Reduce processes each key independently
  • This makes parallelization trivial

Why MapReduce is mostly obsolete:

  • Requires writing map/reduce in a general-purpose language
  • No job pipelining (must wait for upstream job to finish completely)
  • Always sorts between map and reduce (even when unnecessary)
  • Replaced by Spark, Flink, and SQL-based systems

4.2. Dataflow Engines

Modern engines like Spark and Flink improve on MapReduce:

  • Flexible operators (not just map/reduce)
  • In-memory intermediate data
  • Pipelined execution
MAPREDUCE vs. DATAFLOW ENGINES
MapReduce:
1
2
3
4
5
6

↓ DFS (always write) ↓ DFS (always write)

Dataflow Engine (Spark/Flink):
1
2
3
4
5

← In memory (only shuffle to disk if needed) →

Advantages of dataflow engines:

FeatureMapReduceDataflow Engines
SortingAlways between stagesOnly when needed
Intermediate dataWritten to DFSIn-memory or local disk
Operator fusionEach stage separateAdjacent ops combined
PipeliningWait for stage completionStream between stages
Process reuseNew JVM per taskReuse processes

4.3. Shuffling Data

In plain English:

  • Shuffling is like sorting mail at a post office
  • Letters arrive from many mailboxes (mappers)
  • Need to organize so all letters for same zip code (key)
  • End up in the same bin (reducer)
SHUFFLE IN MAPREDUCE
Input Shards
Shuffle
(sort by key hash)
Output Shards

Each mapper creates sorted files for each reducer
hash(key) determines which reducer gets the key-value pair
Reducers merge sorted files from all mappers

Shuffle process:

  1. Each mapper creates a separate output file for each reducer
  2. Key hash determines destination: hash(key) % num_reducers
  3. Mapper sorts key-value pairs within each file
  4. Reducers fetch their files from all mappers
  5. Reducers merge-sort the files together
  6. Same keys are now adjacent → reducer iterates over values

💡 Insight

Despite the name, shuffle produces sorted order, not random order:

  • Term comes from shuffling cards to redistribute, not randomize
  • Modern systems like BigQuery keep shuffle data in memory
  • Use external shuffle services for resilience

4.4. JOIN and GROUP BY

Shuffling enables distributed joins:

  • Bring matching keys to same node
  • Process joined data locally
SORT-MERGE JOIN
ACTIVITY EVENTS

user_id: 123
page: /products
timestamp: 10:30


user_id: 123
page: /checkout
timestamp: 10:35


user_id: 456
page: /home

USER PROFILES

user_id: 123
birth_date: 1990
name: Alice

SHUFFLE BY user_id

  1. User profile: {birth_date: 1990, name: Alice} ← arrives first
  2. Event: {page: /products, timestamp: 10:30}
  3. Event: {page: /checkout, timestamp: 10:35}

{page: /products, viewer_birth_year: 1990}
{page: /checkout, viewer_birth_year: 1990}

How it works:

  1. Two mappers: one for events (emit user_id → event), one for users (emit user_id → profile)
  2. Shuffle brings all records with same user_id to same reducer
  3. Secondary sort ensures user profile arrives first
  4. Reducer stores profile in variable, then iterates over events
  5. Minimal memory: only one user's data in memory at a time

5. Query Languages and DataFrames

5.1. SQL for Batch Processing

As batch systems matured, SQL became the lingua franca:

  • Familiar to analysts and engineers
  • Optimizers can choose efficient execution plans
  • Integrates with existing BI tools
-- Find top pages by age group
SELECT
page,
FLOOR((2025 - birth_year) / 10) * 10 AS age_decade,
COUNT(*) AS views
FROM events e
JOIN users u ON e.user_id = u.user_id
GROUP BY page, age_decade
ORDER BY views DESC
LIMIT 100;

Why SQL won:

  • Analysts and developers already know it
  • Integrates with existing BI tools (Tableau, Looker)
  • Query optimizers can choose efficient execution plans
  • More concise than handwritten MapReduce

SQL-based batch engines:

  • Hive: SQL on Hadoop/Spark
  • Trino (Presto): Federated SQL across data sources
  • Spark SQL: SQL on Spark
  • BigQuery, Snowflake: Cloud data warehouses

5.2. DataFrames

Data scientists preferred the DataFrame model:

  • Familiar from R and Pandas
  • Step-by-step transformations
  • Works with SQL under the hood
# Pandas-style DataFrame API (runs distributed on Spark)
events_df = spark.read.parquet("s3://data/events/")
users_df = spark.read.parquet("s3://data/users/")

result = (events_df
.join(users_df, "user_id")
.withColumn("age", 2025 - users_df.birth_year)
.groupBy("page")
.agg(
count("*").alias("views"),
avg("age").alias("avg_viewer_age")
)
.orderBy(desc("views"))
.limit(100))

result.write.parquet("s3://output/page-demographics/")
AspectSQLDataFrame API
StyleDeclarative (what)Step-by-step (how)
OptimizationQuery plannerQuery planner (Spark) or immediate (Pandas)
FamiliarityDBAs, analystsData scientists
FlexibilityStandard operatorsCustom functions easier

💡 Insight

Spark's DataFrame API is deceptively clever:

  • Unlike Pandas which executes immediately, Spark builds a query plan
  • Optimizes the plan before execution
  • df.filter(x).filter(y) becomes a single optimized filter
  • Not two passes over the data

5.3. Batch Processing and Data Warehouses Converge

Historically separate, batch processing and data warehouses are merging:

  • Same underlying technologies (columnar storage, distributed shuffle)
  • Object storage (S3) as common foundation
  • SQL + DataFrame APIs both supported
CONVERGENCE OF BATCH AND WAREHOUSES
TRADITIONAL SEPARATION:
Batch Processing

• MapReduce, Spark
• Flexible code
• Commodity hardware
• Horizontal scaling

Data Warehouse

• Teradata, Oracle
• SQL only
• Specialized appliances
• Vertical scaling

MODERN CONVERGENCE:

• SQL + DataFrame APIs
• Columnar storage (Parquet)
• Distributed shuffle
• Object storage (S3) as foundation
• Same engines for ETL and analytics

When to use which:

WorkloadBetter fit
SQL analyticsCloud warehouse (BigQuery, Snowflake)
Complex ML pipelinesBatch framework (Spark, Ray)
Row-by-row processingBatch framework
Cost-sensitive large jobsBatch framework
Iterative graph algorithmsBatch framework

6. Batch Use Cases

6.1. Extract-Transform-Load (ETL)

In plain English:

  • ETL is like a factory assembly line for data
  • Raw materials (source data) come in
  • Get processed and quality-checked (transformed)
  • Packaged for shipping (loaded to destination)
ETL PIPELINE EXAMPLE
EXTRACT
TRANSFORM

• Clean data
• Join tables
• Aggregate
• Validate
• Parse JSON
• Filter
• Enrich

LOAD

Workflow scheduler (Airflow) orchestrates the pipeline
Runs daily/hourly, handles retries, alerts on failure

Why batch for ETL:

  • Parallelizable: Filtering, projecting, and joining are embarrassingly parallel
  • Debuggable: Inspect failed files, fix code, rerun
  • Retryable: Transient failures handled by scheduler
  • Orchestrated: Airflow, Dagster provide operators for many systems

6.2. Analytics

Batch systems support two analytics patterns:

  • Pre-aggregation (scheduled jobs)
  • Ad hoc queries (interactive exploration)

1. Pre-aggregation (scheduled)

-- Runs daily via Airflow
CREATE TABLE daily_sales_cube AS
SELECT
date,
region,
product_category,
SUM(revenue) as total_revenue,
COUNT(*) as transactions
FROM transactions
WHERE date = CURRENT_DATE - 1
GROUP BY date, region, product_category;

2. Ad hoc queries (interactive)

-- Analyst runs this to investigate spike
SELECT
hour,
COUNT(*) as errors,
error_type
FROM logs
WHERE date = '2025-06-27' AND status >= 500
GROUP BY hour, error_type
ORDER BY errors DESC;

6.3. Machine Learning

Batch processing is central to ML workflows:

  • Feature engineering at scale
  • Model training (gradient descent across huge datasets)
  • Batch inference (score millions of records)
ML BATCH PROCESSING WORKFLOW
1. FEATURE ENGINEERING
1
2
3
4
2. MODEL TRAINING
1
2
3
3. BATCH INFERENCE
1
2
3

Frameworks: Spark MLlib, Ray, Kubeflow, Flyte
OpenAI uses Ray for ChatGPT training

LLM data preparation is a prime batch workload:

  • Extract plain text from HTML
  • Detect and remove duplicates
  • Filter low-quality content
  • Tokenize text into embeddings

6.4. Bulk Data Imports

Batch outputs often need to reach production databases.

Why NOT write directly from batch jobs:

ProblemWhy
SlowNetwork request per record
OverwhelmingThousands of tasks writing simultaneously
InconsistentPartial results visible if job fails

Better patterns:

1. Stream through Kafka

Buffer between batch and production
Consumers control their read rate
Multiple downstream systems can consume

2. Bulk file import

Build database files in batch job
Bulk load atomically
Venice, Pinot, Druid support this pattern


7. Summary

In this chapter, we explored batch processing from Unix pipes to petabyte-scale systems.

Core concepts:

  • Batch processing transforms immutable inputs into derived outputs
  • Same patterns from Unix (sort | uniq -c) scale to distributed systems
  • Key operation is shuffle: redistribute data so same keys meet at same node

System components:

  • Distributed filesystems (HDFS) and object stores (S3) provide storage
  • Orchestrators (YARN, K8s) schedule tasks across machines
  • Workflow schedulers (Airflow) manage job dependencies

Processing models:

  • MapReduce: map → shuffle → reduce (largely obsolete)
  • Dataflow engines (Spark, Flink): flexible operators, in-memory intermediate data
  • SQL and DataFrame APIs: high-level, optimizable interfaces

Use cases:

  • ETL: move and transform data between systems
  • Analytics: pre-aggregation and ad hoc queries
  • Machine learning: feature engineering, training, batch inference
  • Bulk imports: populate production systems from batch outputs

💡 Insight

The key insight of batch processing is treating computation as pure functions:

  • Given the same input, produce the same output (no side effects)
  • Jobs become reproducible, retriable, and parallelizable
  • When something goes wrong, you can always rerun
  • That property is worth its weight in gold for building reliable systems

In Chapter 12, we'll turn to stream processing, where inputs are unbounded and jobs never complete. We'll see how stream processing builds on batch concepts while introducing new challenges around time, ordering, and state.


Previous: Chapter 10. Consistency and Consensus | Next: Chapter 12. Stream Processing