Skip to main content

Chapter 6. Replication

The major difference between a thing that might go wrong and a thing that cannot possibly go wrong is that when a thing that cannot possibly go wrong goes wrong it usually turns out to be impossible to get at or repair.

Douglas Adams, Mostly Harmless (1992)

Table of Contents

  1. Single-Leader Replication
  2. Problems with Replication Lag
  3. Multi-Leader Replication
  4. Leaderless Replication
  5. Summary

In plain English: Replication is like having backup singers in a band. When the lead singer (primary database) performs a song (processes data), the backup singers (replicas) follow along. If the lead singer gets sick, one of the backups can step up. The challenge is keeping all singers perfectly in sync—especially when they're in different cities.

In technical terms: Replication means keeping a copy of the same data on multiple machines connected via a network. It enables high availability (system keeps running when machines fail), geographic proximity (lower latency), and scalability (more machines can serve reads).

Why it matters: Modern applications can't tolerate single points of failure. Replication allows your database to survive hardware failures, network partitions, and even entire datacenter outages while continuing to serve traffic.


Replication means keeping a copy of the same data on multiple machines that are connected via a network. As discussed in "Distributed versus Single-Node Systems", there are several reasons why you might want to replicate data:

  • To keep data geographically close to your users (and thus reduce access latency)
  • To allow the system to continue working even if some of its parts have failed (and thus increase availability)
  • To scale out the number of machines that can serve read queries (and thus increase read throughput)

This chapter assumes:

  • Dataset small enough for each machine to hold entire copy
  • Chapter 7 covers sharding for larger datasets
  • Later chapters cover fault handling

The core challenge:

  • Static data is easy: copy once, done
  • Changing data is hard: propagating updates while maintaining consistency
  • Three algorithm families: single-leader, multi-leader, leaderless

💡 Insight

The fundamental challenge of replication isn't copying data—it's handling change. If data never changed, you'd copy it once and be done. But data does change, and propagating those changes while maintaining consistency across unreliable networks is where the complexity lies.

Key trade-offs in replication:

  • Synchronous vs asynchronous replication
  • How to handle failed replicas
  • Often database configuration options
  • Principles similar across implementations

Historical note:

  • Replication studied since 1970s
  • Fundamental constraints of networks haven't changed
  • Concepts like eventual consistency still cause confusion today

Backups and replication

Do you still need backups if you have replication? Yes—different purposes:

FeatureReplicationBackup
PurposeReflect current writes across nodesStore old snapshots for point-in-time recovery
Accidental delete❌ Deletion propagates to all replicas✅ Can restore from backup
TimingReal-timePeriodic snapshots

How they complement each other:

  • Backups are part of setting up replication (see "Setting Up New Followers")
  • Archiving replication logs can be part of backup process

Internal snapshots:

  • Some databases maintain immutable snapshots of past states
  • Trade-off: keeps old data on same (expensive) storage
  • Alternative: store current state in primary storage, backups in cheaper object storage

1. Single-Leader Replication

In plain English: Think of a classroom where the teacher writes on the board and students copy into their notebooks. The teacher is the leader, students are followers. Only the teacher can write new material (handle writes), but anyone can read from their notebook (handle reads). If students fall behind in copying, they have old information.

In technical terms: One replica is designated the leader (primary). All writes go to the leader, which forwards changes to followers via a replication log. Reads can come from any replica, but only the leader accepts writes.

Why it matters: This simple model is the foundation of most database replication. It's easy to reason about and provides strong consistency when configured synchronously, but introduces complexity around failover and replication lag.

Key terms:

  • Replica = node that stores a copy of the database
  • Core problem = ensuring all data ends up on all replicas
  • Solution = leader-based replication (also: primary-backup, active/passive)
Single-Leader Replication
Client
write
Leader
Leader
replication stream
Follower 1
Follower 2
Follower 3
reads
Client

How it works:

  1. One replica is designated the leader (also known as primary or source). When clients want to write to the database, they must send their requests to the leader, which first writes the new data to its local storage.

  2. The other replicas are known as followers (read replicas, secondaries, or hot standbys). Whenever the leader writes new data to its local storage, it also sends the data change to all of its followers as part of a replication log or change stream. Each follower takes the log from the leader and updates its local copy of the database accordingly, by applying all writes in the same order as they were processed on the leader.

  3. When a client wants to read from the database, it can query either the leader or any of the followers. However, writes are only accepted on the leader (the followers are read-only from the client's point of view).

Sharding interaction:

  • If database is sharded (Chapter 7), each shard has one leader
  • Different shards may have leaders on different nodes
  • Multi-leader replication: multiple leaders for same shard (covered later)

Where single-leader is used:

CategoryExamples
Relational DBsPostgreSQL, MySQL, Oracle Data Guard, SQL Server Always On
Document DBsMongoDB, DynamoDB
Message brokersKafka
Block devicesDRBD
Consensus algorithmsRaft (CockroachDB, TiDB, etcd, RabbitMQ quorum queues)
note

In older documents you may see the term master–slave replication. It means the same as leader-based replication, but the term should be avoided as it is widely considered offensive.

1.1. Synchronous Versus Asynchronous Replication

In plain English: Synchronous replication is like waiting for everyone on a group text to read your message before sending the next one—safe, but slow. Asynchronous is like sending messages and moving on—fast, but you don't know if everyone saw it.

In technical terms: Synchronous replication waits for follower acknowledgment before confirming the write to the client. Asynchronous sends the change to followers but doesn't wait for confirmation.

Why it matters: This choice determines your durability guarantees (will you lose data on failure?) and availability (can you keep writing if followers go down?). Most systems use semi-synchronous: one synchronous follower plus async followers.

Key configuration choice:

  • Relational databases: often configurable
  • Other systems: often hardcoded one way

Example: User updates profile image

  1. Client sends update request → leader
  2. Leader receives and processes
  3. Leader forwards to followers
  4. Leader confirms success to client
Synchronous vs. Asynchronous Replication
1
Client sends write
User updates profile image
2
Leader persists
Writes to local storage
3
Sync follower confirms
Follower 1 acknowledges
4
Leader confirms to client
Write is durable
5
Async followers catch up
Follower 2 eventually applies

In the diagram:

  • Follower 1 (sync) — leader waits for confirmation before reporting success
  • Follower 2 (async) — leader sends message, doesn't wait for response

Replication timing:

  • Normal case: < 1 second
  • No guarantees on maximum delay
  • Can fall behind by minutes when:
    • Follower recovering from failure
    • System at max capacity
    • Network problems between nodes
Synchronous
Asynchronous
Durability
Guaranteed on follower
May be lost if leader fails
Latency
Slower (wait for follower)
Faster (no waiting)
Availability
Blocked if follower down
Writes always succeed
Use case
Critical data (transactions)
High throughput reads

💡 Insight

The synchronous vs. asynchronous choice is fundamentally about trading durability for availability. Synchronous means "don't lose my data even if the leader crashes immediately," but at the cost of "I can't write if a follower is down." Most production systems use semi-synchronous: one sync follower for durability, the rest async for availability.

Synchronous replication:

  • ✅ Follower guaranteed up-to-date, consistent with leader
  • ✅ If leader fails, data still available on follower
  • ❌ If sync follower doesn't respond → write blocked
  • ❌ Leader must wait until sync replica available

Why all-synchronous doesn't work:

  • Any single node outage → whole system halts
  • Solution: semi-synchronous (one sync follower, rest async)
  • If sync follower fails → promote an async follower to sync
  • Guarantees: up-to-date copy on at least 2 nodes (leader + 1 sync)

Quorum-based replication:

  • Some systems: majority (e.g., 3/5) sync, minority async
  • Example of quorum (covered in "Quorums for reading and writing")
  • Often used with consensus protocols for auto leader election (Chapter 10)

Fully asynchronous:

  • If leader fails → uncommitted writes are lost
  • Writes not guaranteed durable even if confirmed to client
  • ✅ Leader continues processing even if all followers behind
  • Widely used when: many followers, geographically distributed

1.2. Setting Up New Followers

In plain English: Adding a new follower is like a new student joining a class mid-semester. You can't just copy someone's notebook (the data is constantly changing). Instead: take a snapshot of the board at a specific time, give it to the new student, then have them copy everything written after that point.

In technical terms: Setting up a follower requires: (1) taking a consistent snapshot of the leader, (2) copying it to the new follower, (3) connecting the follower to the leader and requesting all changes since the snapshot's position in the replication log, (4) catching up until fully synchronized.

Why it matters: You need to add followers without downtime—locking the database would violate high availability. The snapshot-plus-catchup approach enables adding replicas while the system continues serving traffic.

When do you need new followers?

  • Increase number of replicas
  • Replace failed nodes

Why not just copy files?

  • ❌ Clients constantly writing → data always in flux
  • ❌ Standard file copy sees different parts at different times
  • ❌ Result: inconsistent data

Why not lock the database?

  • Would make it unavailable for writes
  • Violates high availability goal

Solution: Snapshot + catch up (no downtime required)

1
Take consistent snapshot
Snapshot leader's database at specific point in time
2
Copy snapshot to follower
Transfer snapshot files to new follower node
3
Connect and request backlog
Follower requests all changes since snapshot position
4
Catch up
Process backlog, then continue with live replication stream

Detailed steps:

  1. Take a consistent snapshot of the leader's database at some point in time—if possible, without taking a lock on the entire database. Most databases have this feature, as it is also required for backups. In some cases, third-party tools are needed, such as Percona XtraBackup for MySQL.

  2. Copy the snapshot to the new follower node.

  3. The follower connects to the leader and requests all the data changes that have happened since the snapshot was taken. This requires that the snapshot is associated with an exact position in the leader's replication log. That position has various names: for example, PostgreSQL calls it the log sequence number; MySQL has two mechanisms, binlog coordinates and global transaction identifiers (GTIDs).

  4. When the follower has processed the backlog of data changes since the snapshot, we say it has caught up. It can now continue to process data changes from the leader as they happen.

Implementation varies:

  • Some databases: fully automated
  • Others: manual multi-step workflow by administrator

Object store integration:

  • Archive replication logs + periodic snapshots → disaster recovery
  • New follower setup: download snapshot/logs from object store
  • Tools: WAL-G (PostgreSQL, MySQL, SQL Server), Litestream (SQLite)

1.3. Databases backed by object storage

In plain English: Modern databases are moving data to cloud object stores (S3, GCS) instead of local disks—like using cloud storage instead of your laptop's hard drive. It's cheaper, more durable, and simplifies replication since the cloud provider handles copies. The tradeoff: higher latency accessing data.

In technical terms: Object storage (S3, GCS, Azure Blob) provides multi-region replication, high durability (99.999999999%), and compare-and-set operations. Databases use it for primary storage or tiered storage, accepting higher latency in exchange for simplified architecture and cost savings.

Why it matters: This architectural shift simplifies distributed databases by offloading replication, durability, and transactions to the object store layer. Systems like Snowflake and WarpStream use "zero-disk architecture" where all data lives in S3.

Object storage for live queries (not just archiving):

  • Amazon S3, Google Cloud Storage, Azure Blob Storage
  • Benefits:
💰
Cost Efficiency
Object storage is inexpensive compared to other cloud storage options, enabling databases to store less-frequently queried data on cheaper, higher-latency storage.
🔄
Built-in Replication
Object stores provide multi-zone, dual-region, or multi-region replication with very high durability guarantees, bypassing inter-zone network fees.
🔐
Conditional Writes
Compare-and-set (CAS) operations enable databases to implement transactions and leadership election without custom coordination.
🔗
Data Integration
Storing data from multiple databases in the same object store simplifies integration, especially with open formats like Parquet and Iceberg.

Key benefit: Shifts responsibility of transactions, leadership election, and replication to object storage.


Tradeoffs of object storage:

ChallengeDescription
Higher latencyMuch slower than local disks or EBS
Per-API feesBatching reads/writes → more latency
No standard filesystemNeed integration or FUSE mounts
FUSE limitationsLacks POSIX features (non-sequential writes, symlinks)

Architecture approaches:

ApproachDescriptionExamples
Tiered storageHot data on SSD/NVMe, cold data on object storeMany databases
Object + WALObject storage primary, low-latency system for WALNeon (Safekeepers)
Zero-disk (ZDA)All data on object store, disks for caching onlyWarpStream, Confluent Freight, Bufstream, Redpanda Serverless, Snowflake, Turbopuffer, SlateDB

Zero-disk benefit: Nodes have no persistent state → dramatically simplifies operations

1.4. Handling Node Outages

In plain English: Machines fail all the time—hardware breaks, software crashes, networks disconnect. The goal is to keep the system running despite individual failures, like a restaurant continuing service even if one waiter calls in sick.

In technical terms: Follower failures are handled via catch-up recovery: the follower reconnects and requests all writes it missed. Leader failures require failover: detect the failure, elect a new leader, and reconfigure clients—a complex process prone to edge cases and data loss.

Why it matters: Failover is one of the hardest problems in distributed systems. Getting it wrong can cause split brain (two leaders), data loss, or prolonged downtime. Many production outages stem from failed failover attempts.

Node failures:

  • Unexpected: hardware faults, software crashes, network issues
  • Planned: maintenance, kernel security patches

Goal: Keep system running despite individual failures, minimize impact

High availability with leader-based replication: Two cases below

Follower failure: Catch-up recovery

How it works:

  1. Each follower keeps local log of changes received from leader
  2. After crash/network interruption, follower knows last processed transaction
  3. Connects to leader, requests all changes since that point
  4. Once caught up → continues receiving live stream

Performance challenges:

  • High write throughput or long offline time → lots to catch up
  • High load on both follower (applying) and leader (sending)

Log retention dilemma:

  • Leader deletes log after all followers confirm
  • Long-unavailable follower creates a choice:
    • ❌ Retain log indefinitely → risk running out of disk
    • ❌ Delete unacknowledged log → follower can't recover, needs backup restore

Leader failure: Failover

Failover = handling leader failure:

  • Promote a follower to new leader
  • Reconfigure clients to write to new leader
  • Other followers consume from new leader
1
Detect leader failure
Timeout-based detection (e.g., 30 seconds no heartbeat)
2
Choose new leader
Election or appointed by controller; select most up-to-date replica
3
Reconfigure system
Clients send writes to new leader; old leader becomes follower

Failover types:

  • Manual: Administrator notified, takes steps to make new leader
  • Automatic: System handles failover (steps below)
  1. Determining that the leader has failed. There are many things that could potentially go wrong: crashes, power outages, network issues, and more. There is no foolproof way of detecting what has gone wrong, so most systems simply use a timeout: nodes frequently bounce messages back and forth between each other, and if a node doesn't respond for some period of time—say, 30 seconds—it is assumed to be dead. (If the leader is deliberately taken down for planned maintenance, this doesn't apply.)

  2. Choosing a new leader. This could be done through an election process (where the leader is chosen by a majority of the remaining replicas), or a new leader could be appointed by a previously established controller node. The best candidate for leadership is usually the replica with the most up-to-date data changes from the old leader (to minimize any data loss). Getting all the nodes to agree on a new leader is a consensus problem, discussed in detail in Chapter 10.

  3. Reconfiguring the system to use the new leader. Clients now need to send their write requests to the new leader (we discuss this in "Request Routing"). If the old leader comes back, it might still believe that it is the leader, not realizing that the other replicas have forced it to step down. The system needs to ensure that the old leader becomes a follower and recognizes the new leader.

💡 Insight

Failover is deceptively complex. The timeout must be tuned carefully: too short and you get unnecessary failovers during temporary slowdowns (making things worse), too long and you have extended downtime. Choosing the wrong replica as the new leader can cause data loss. And if two nodes both think they're the leader (split brain), you can corrupt data irreparably.

Failover is fraught with things that can go wrong:

⚠️
Data Loss with Async Replication
If the new leader hasn't received all writes from the old leader, those writes are discarded. Durability promises are violated.
🔗
Coordination with External Systems
Example: GitHub incident where MySQL autoincrement counter lagged behind, reusing primary keys already in Redis, causing data disclosure.
🧠
Split Brain
Two nodes both believe they're the leader. Without conflict resolution, data corruption is likely. Fencing mechanisms can help but must be carefully designed.
⏱️
Timeout Tuning
Longer timeout = longer recovery time. Shorter timeout = unnecessary failovers during load spikes or network glitches, making problems worse.

Detailed failure scenarios:

1. Unreplicated writes with async replication:

  • New leader missing writes from old leader
  • Old leader rejoins → what happens to those writes?
  • Common solution: discard old leader's unreplicated writes
  • Problem: writes you thought were durable actually weren't

2. External system coordination (GitHub incident):

  • Out-of-date MySQL follower promoted to leader
  • Autoincrement counter lagged → reused primary keys
  • Those keys also in Redis → MySQL/Redis inconsistency
  • Result: private data disclosed to wrong users

3. Split brain:

  • Two nodes both believe they're leader
  • Both accept writes → data loss/corruption without conflict resolution
  • Safety mechanisms can shut down one node, but:
    • Poorly designed → both nodes shut down
    • Detection too slow → corruption already happened

4. Timeout tuning:

  • Long timeout → slow recovery
  • Short timeout → unnecessary failovers
  • Load spikes or network glitches can trigger false failovers
  • Unnecessary failover during load → makes problems worse
note

Guarding against split brain by limiting or shutting down old leaders is known as fencing or, more emphatically, Shoot The Other Node In The Head (STONITH). We will discuss fencing in more detail in "Distributed Locks and Leases".

Practical advice:

  • No easy solutions → some teams prefer manual failover despite automatic support
  • Key priority: Pick the most up-to-date follower as new leader
    • Sync/semi-sync: follower that leader waited for before ACK
    • Async: follower with greatest log sequence number
  • Data loss trade-off: fraction of a second = tolerable; days behind = catastrophic

Looking ahead: These issues (node failures, unreliable networks, consistency/durability/availability/latency trade-offs) are fundamental distributed systems problems → Chapter 9 and 10

1.5. Implementation of Replication Logs

In plain English: Different databases use different techniques to transmit changes from leader to followers. Think of it like different ways to describe how to build a Lego set: you could describe each brick placement (statement-based), send a video of the building process (WAL shipping), or send photos of the result (logical log).

In technical terms: Replication logs can be implemented as SQL statements (statement-based), low-level write-ahead log bytes (WAL shipping), or row-level change descriptions (logical/row-based). Each has tradeoffs around determinism, coupling to storage engine, and upgradeability.

Why it matters: The replication log format determines whether you can do zero-downtime upgrades, how tightly coupled your replicas are, and whether external systems can consume your changes (change data capture).

How does leader-based replication work under the hood? Several different replication methods are used in practice, so let's look at each one briefly.

Statement-based replication

In the simplest case, the leader logs every write request (statement) that it executes and sends that statement log to its followers. For a relational database, this means that every INSERT, UPDATE, or DELETE statement is forwarded to followers, and each follower parses and executes that SQL statement as if it had been received from a client.

Although this may sound reasonable, there are various ways in which this approach to replication can break down:

  • Any statement that calls a nondeterministic function, such as NOW() to get the current date and time or RAND() to get a random number, is likely to generate a different value on each replica.

  • If statements use an autoincrementing column, or if they depend on the existing data in the database (e.g., UPDATE ... WHERE <some condition>), they must be executed in exactly the same order on each replica, or else they may have a different effect. This can be limiting when there are multiple concurrently executing transactions.

  • Statements that have side effects (e.g., triggers, stored procedures, user-defined functions) may result in different side effects occurring on each replica, unless the side effects are absolutely deterministic.

It is possible to work around those issues—for example, the leader can replace any nondeterministic function calls with a fixed return value when the statement is logged so that the followers all get the same value. The idea of executing deterministic statements in a fixed order is similar to the event sourcing model that we previously discussed in "Event Sourcing and CQRS". This approach is also known as state machine replication, and we will discuss the theory behind it in "Using shared logs".

Statement-based replication was used in MySQL before version 5.1. It is still sometimes used today, as it is quite compact, but by default MySQL now switches to row-based replication (discussed shortly) if there is any nondeterminism in a statement. VoltDB uses statement-based replication, and makes it safe by requiring transactions to be deterministic. However, determinism can be hard to guarantee in practice, so many databases prefer other replication methods.

Write-ahead log (WAL) shipping

In Chapter 4 we saw that a write-ahead log is needed to make B-tree storage engines robust: every modification is first written to the WAL so that the tree can be restored to a consistent state after a crash. Since the WAL contains all the information necessary to restore the indexes and heap into a consistent state, we can use the exact same log to build a replica on another node: besides writing the log to disk, the leader also sends it across the network to its followers. When the follower processes this log, it builds a copy of the exact same files as found on the leader.

This method of replication is used in PostgreSQL and Oracle, among others. The main disadvantage is that the log describes the data on a very low level: a WAL contains details of which bytes were changed in which disk blocks. This makes replication tightly coupled to the storage engine. If the database changes its storage format from one version to another, it is typically not possible to run different versions of the database software on the leader and the followers.

That may seem like a minor implementation detail, but it can have a big operational impact. If the replication protocol allows the follower to use a newer software version than the leader, you can perform a zero-downtime upgrade of the database software by first upgrading the followers and then performing a failover to make one of the upgraded nodes the new leader. If the replication protocol does not allow this version mismatch, as is often the case with WAL shipping, such upgrades require downtime.

Logical (row-based) log replication

An alternative is to use different log formats for replication and for the storage engine, which allows the replication log to be decoupled from the storage engine internals. This kind of replication log is called a logical log, to distinguish it from the storage engine's (physical) data representation.

A logical log for a relational database is usually a sequence of records describing writes to database tables at the granularity of a row:

  • For an inserted row, the log contains the new values of all columns.
  • For a deleted row, the log contains enough information to uniquely identify the row that was deleted. Typically this would be the primary key, but if there is no primary key on the table, the old values of all columns need to be logged.
  • For an updated row, the log contains enough information to uniquely identify the updated row, and the new values of all columns (or at least the new values of all columns that changed).

A transaction that modifies several rows generates several such log records, followed by a record indicating that the transaction was committed. MySQL keeps a separate logical replication log, called the binlog, in addition to the WAL (when configured to use row-based replication). PostgreSQL implements logical replication by decoding the physical WAL into row insertion/update/delete events.

Since a logical log is decoupled from the storage engine internals, it can more easily be kept backward compatible, allowing the leader and the follower to run different versions of the database software. This in turn enables upgrading to a new version with minimal downtime.

A logical log format is also easier for external applications to parse. This aspect is useful if you want to send the contents of a database to an external system, such as a data warehouse for offline analysis, or for building custom indexes and caches. This technique is called change data capture, and we will return to it in Chapter 12.

Statement-Based
Logical (Row-Based)
Format
SQL statements
Row-level changes
Determinism
Requires careful handling
Deterministic by design
Storage coupling
Low
Low
Compactness
Very compact
Larger (all columns)
External parsing
Complex (SQL dialect)
Easier (structured format)
Zero-downtime upgrade
Possible
Possible

2. Problems with Replication Lag

In plain English: Async replication is like news spreading through word-of-mouth. The further from the source, the more out-of-date the information. Most of the time this is fine, but sometimes it causes weird experiences—like refreshing a page and seeing your own comment disappear.

In technical terms: Asynchronous replication introduces eventual consistency: replicas eventually converge to the same state, but at any moment may have different data. The delay is called replication lag. Under load or network issues, lag can reach seconds or minutes, causing anomalies.

Why it matters: Replication lag isn't just theoretical—it causes real user-facing bugs. Understanding the specific anomalies (read-your-writes violations, non-monotonic reads, causality violations) helps you build applications that degrade gracefully under lag.

Being able to tolerate node failures is just one reason for wanting replication. As mentioned in "Distributed versus Single-Node Systems", other reasons are scalability (processing more requests than a single machine can handle) and latency (placing replicas geographically closer to users).

Leader-based replication requires all writes to go through a single node, but read-only queries can go to any replica. For workloads that consist of mostly reads and only a small percentage of writes (which is often the case with online services), there is an attractive option: create many followers, and distribute the read requests across those followers. This removes load from the leader and allows read requests to be served by nearby replicas.

In this read-scaling architecture, you can increase the capacity for serving read-only requests simply by adding more followers. However, this approach only realistically works with asynchronous replication—if you tried to synchronously replicate to all followers, a single node failure or network outage would make the entire system unavailable for writing. And the more nodes you have, the likelier it is that one will be down, so a fully synchronous configuration would be very unreliable.

Unfortunately, if an application reads from an asynchronous follower, it may see outdated information if the follower has fallen behind. This leads to apparent inconsistencies in the database: if you run the same query on the leader and a follower at the same time, you may get different results, because not all writes have been reflected in the follower. This inconsistency is just a temporary state—if you stop writing to the database and wait a while, the followers will eventually catch up and become consistent with the leader. For that reason, this effect is known as eventual consistency.

💡 Insight

"Eventual consistency" is a deliberately vague guarantee—it means "the replicas will agree, eventually, maybe in a millisecond or maybe in an hour, we're not saying." For operability, you need to monitor replication lag and understand the specific anomalies that can occur, so you can design around them.

note

The term eventual consistency was coined by Douglas Terry et al., popularized by Werner Vogels, and became the battle cry of many NoSQL projects. However, not only NoSQL databases are eventually consistent: followers in an asynchronously replicated relational database have the same characteristics.

The term "eventually" is deliberately vague: in general, there is no limit to how far a replica can fall behind. In normal operation, the delay between a write happening on the leader and being reflected on a follower—the replication lag—may be only a fraction of a second, and not noticeable in practice. However, if the system is operating near capacity or if there is a problem in the network, the lag can easily increase to several seconds or even minutes.

When the lag is so large, the inconsistencies it introduces are not just a theoretical issue but a real problem for applications. In this section we will highlight three examples of problems that are likely to occur when there is replication lag. We'll also outline some approaches to solving them.

2.1. Reading Your Own Writes

In plain English: You post a comment, refresh the page, and... your comment is gone. This happens when you write to the leader but read from a lagging follower. It's like leaving a note on a bulletin board, walking to the other side of the building, and seeing an older version of the board without your note.

In technical terms: Read-after-write consistency (also called read-your-writes consistency) guarantees that if a user writes something, any subsequent reads by that same user will reflect that write. Other users' reads may lag, but a user always sees their own writes.

Why it matters: Without this guarantee, users lose trust in your application. "I just submitted this form and now it's gone—did it save?" To provide read-after-write consistency, route reads of user-modified data to the leader or synchronously updated followers.

Many applications let the user submit some data and then view what they have submitted. This might be a record in a customer database, or a comment on a discussion thread, or something else of that sort. When new data is submitted, it must be sent to the leader, but when the user views the data, it can be read from a follower. This is especially appropriate if data is frequently viewed but only occasionally written.

With asynchronous replication, there is a problem: if the user views the data shortly after making a write, the new data may not yet have reached the replica. To the user, it looks as though the data they submitted was lost, so they will be understandably unhappy.

Read-After-Write Anomaly
1
User writes
Comment sent to leader
2
Leader persists
Data saved successfully
3
User reads
Routed to stale follower
4
Data not found
Follower hasn't caught up yet

In this situation, we need read-after-write consistency, also known as read-your-writes consistency. This is a guarantee that if the user reloads the page, they will always see any updates they submitted themselves. It makes no promises about other users: other users' updates may not be visible until some later time. However, it reassures the user that their own input has been saved correctly.

How can we implement read-after-write consistency in a system with leader-based replication? There are various possible techniques:

👤
Read Own Data from Leader
When reading data the user may have modified, read from the leader. Example: always read the user's own profile from the leader, others' profiles from followers.
Time-Based Routing
Track time of last update. For one minute after writing, read from leader. Or prevent queries on followers more than one minute behind.
🔢
Logical Timestamp
Client remembers timestamp of most recent write (log sequence number or system clock). Reads must go to replicas at least that current.
📱
Cross-Device Consistency
If user accesses from multiple devices, need centralized metadata and routing to same region for consistency.
  1. When reading something that the user may have modified, read it from the leader or a synchronously updated follower; otherwise, read it from an asynchronously updated follower. This requires that you have some way of knowing whether something might have been modified, without actually querying it. For example, user profile information on a social network is normally only editable by the owner of the profile, not by anybody else. Thus, a simple rule is: always read the user's own profile from the leader, and any other users' profiles from a follower.

  2. If most things in the application are potentially editable by the user, that approach won't be effective, as most things would have to be read from the leader (negating the benefit of read scaling). In that case, other criteria may be used to decide whether to read from the leader. For example, you could track the time of the last update and, for one minute after the last update, make all reads from the leader. You could also monitor the replication lag on followers and prevent queries on any follower that is more than one minute behind the leader.

  3. The client can remember the timestamp of its most recent write—then the system can ensure that the replica serving any reads for that user reflects updates at least until that timestamp. If a replica is not sufficiently up to date, either the read can be handled by another replica or the query can wait until the replica has caught up. The timestamp could be a logical timestamp (something that indicates ordering of writes, such as the log sequence number) or the actual system clock (in which case clock synchronization becomes critical; see "Unreliable Clocks").

  4. If your replicas are distributed across regions (for geographical proximity to users or for availability), there is additional complexity. Any request that needs to be served by the leader must be routed to the region that contains the leader.

Another complication arises when the same user is accessing your service from multiple devices, for example a desktop web browser and a mobile app. In this case you may want to provide cross-device read-after-write consistency: if the user enters some information on one device and then views it on another device, they should see the information they just entered.

In this case, there are some additional issues to consider:

  • Approaches that require remembering the timestamp of the user's last update become more difficult, because the code running on one device doesn't know what updates have happened on the other device. This metadata will need to be centralized.

  • If your replicas are distributed across different regions, there is no guarantee that connections from different devices will be routed to the same region. (For example, if the user's desktop computer uses the home broadband connection and their mobile device uses the cellular data network, the devices' network routes may be completely different.) If your approach requires reading from the leader, you may first need to route requests from all of a user's devices to the same region.

Regions and Availability Zones

We use the term region to refer to one or more datacenters in a single geographic location. Cloud providers locate multiple datacenters in the same geographic region. Each datacenter is referred to as an availability zone or simply zone. Thus, a single cloud region is made up of multiple zones. Each zone is a separate datacenter located in separate physical facility with its own power, cooling, and so on.

Zones in the same region are connected by very high speed network connections. Latency is low enough that most distributed systems can run with nodes spread across multiple zones in the same region as though they were in a single zone. Multi-zone configurations allow distributed systems to survive zonal outages where one zone goes offline, but they do not protect against regional outages where all zones in a region are unavailable. To survive a regional outage, a distributed system must be deployed across multiple regions, which can result in higher latencies, lower throughput, and increased cloud networking bills.

2.2. Monotonic Reads

In plain English: Monotonic reads prevent time from "going backward." Without this guarantee, you might see a new comment, refresh the page, and the comment is gone—not because it was deleted, but because you hit a more lagging replica the second time.

In technical terms: Monotonic reads guarantee that if a user makes multiple reads, they won't see data moving backward in time. If they've seen data at version V, subsequent reads must return version V or later, never earlier.

Why it matters: Non-monotonic reads are confusing and break user expectations. The fix is simple: route each user's reads to the same replica (e.g., hash user ID to pick replica). If that replica fails, reroute to another.

Our second example of an anomaly that can occur when reading from asynchronous followers is that it's possible for a user to see things moving backward in time.

This can happen if a user makes several reads from different replicas. For example, a user makes the same query twice, first to a follower with little lag, then to a follower with greater lag. (This scenario is quite likely if the user refreshes a web page, and each request is routed to a random server.) The first query returns a comment that was recently added by another user, but the second query doesn't return anything because the lagging follower has not yet picked up that write. In effect, the second query observes the system state at an earlier point in time than the first query. This wouldn't be so bad if the first query hadn't returned anything, because the user probably wouldn't know that another user had recently added a comment. However, it's very confusing for the user if they first see the comment appear, and then see it disappear again.

Non-Monotonic Read Anomaly
User Query 1
Fresh Follower (lag: 100ms)
Returns: Comment #42
User sees comment
User Query 2 (refresh)
Stale Follower (lag: 5 seconds)
Returns: No comment
Comment disappeared!

Monotonic reads is a guarantee that this kind of anomaly does not happen. It's a lesser guarantee than strong consistency, but a stronger guarantee than eventual consistency. When you read data, you may see an old value; monotonic reads only means that if one user makes several reads in sequence, they will not see time go backward—i.e., they will not read older data after having previously read newer data.

One way of achieving monotonic reads is to make sure that each user always makes their reads from the same replica (different users can read from different replicas). For example, the replica can be chosen based on a hash of the user ID, rather than randomly. However, if that replica fails, the user's queries will need to be rerouted to another replica.

2.3. Consistent Prefix Reads

In plain English: Imagine overhearing a phone conversation where you hear the answer before the question. That's what happens when causally related writes arrive at different replicas in the wrong order. "I'll see you there!" (wait, where?) "Let's meet at the park."

In technical terms: Consistent prefix reads guarantee that if a sequence of writes happens in a certain order, anyone reading those writes will see them in the same order. Causality is preserved.

Why it matters: This is especially a problem in sharded databases where different shards process writes independently. If Mr. Poons asks a question on shard A and Mrs. Cake answers on shard B, a reader might see the answer before the question. Solution: write causally related data to the same shard.

Our third example of replication lag anomalies concerns violation of causality. Imagine the following short dialog between Mr. Poons and Mrs. Cake:

Mr. Poons: How far into the future can you see, Mrs. Cake?

Mrs. Cake: About ten seconds usually, Mr. Poons.

There is a causal dependency between those two sentences: Mrs. Cake heard Mr. Poons's question and answered it.

Now, imagine a third person is listening to this conversation through followers. The things said by Mrs. Cake go through a follower with little lag, but the things said by Mr. Poons have a longer replication lag. This observer would hear the following:

Mrs. Cake: About ten seconds usually, Mr. Poons.

Mr. Poons: How far into the future can you see, Mrs. Cake?

To the observer it looks as though Mrs. Cake is answering the question before Mr. Poons has even asked it. Such psychic powers are impressive, but very confusing.

Causality Violation (Consistent Prefix Reads)
Correct Order (Reality)
Mr. Poons: "How far can you see?"
Mrs. Cake: "About ten seconds usually."
Observer Sees (Wrong Order)
Mrs. Cake: "About ten seconds usually."
Mr. Poons: "How far can you see?"

Preventing this kind of anomaly requires another type of guarantee: consistent prefix reads. This guarantee says that if a sequence of writes happens in a certain order, then anyone reading those writes will see them appear in the same order.

This is a particular problem in sharded (partitioned) databases, which we will discuss in Chapter 7. If the database always applies writes in the same order, reads always see a consistent prefix, so this anomaly cannot happen. However, in many distributed databases, different shards operate independently, so there is no global ordering of writes: when a user reads from the database, they may see some parts of the database in an older state and some in a newer state.

One solution is to make sure that any writes that are causally related to each other are written to the same shard—but in some applications that cannot be done efficiently. There are also algorithms that explicitly keep track of causal dependencies, a topic that we will return to in "The "happens-before" relation and concurrency".

2.4. Solutions for Replication Lag

In plain English: When your application experiences replication lag, you have three choices: (1) accept it and live with anomalies, (2) implement workarounds in application code, or (3) use a database with stronger consistency guarantees. Option 3 is simplest for developers.

In technical terms: Application-level solutions (routing specific reads to leader, tracking timestamps) are complex and error-prone. Modern distributed databases (NewSQL systems) provide linearizability and transactions while maintaining scalability, offering the best of both worlds.

Why it matters: Weaker consistency can provide better availability and performance, but at the cost of application complexity. Choose based on your specific requirements: read-after-write for user-facing writes, eventual consistency for analytics.

When working with an eventually consistent system, it is worth thinking about how the application behaves if the replication lag increases to several minutes or even hours. If the answer is "no problem," that's great. However, if the result is a bad experience for users, it's important to design the system to provide a stronger guarantee, such as read-after-write. Pretending that replication is synchronous when in fact it is asynchronous is a recipe for problems down the line.

As discussed earlier, there are ways in which an application can provide a stronger guarantee than the underlying database—for example, by performing certain kinds of reads on the leader or a synchronously updated follower. However, dealing with these issues in application code is complex and easy to get wrong.

💡 Insight

The trend in database design is toward systems that provide strong consistency and scalability. Early NoSQL systems chose eventual consistency for scalability, but modern systems (NewSQL) prove you can have both. The CAP theorem isn't an either/or choice—clever designs can minimize the tradeoffs.

The simplest programming model for application developers is to choose a database that provides a strong consistency guarantee for replicas such as linearizability (see Chapter 10), and ACID transactions (see Chapter 8). This allows you to mostly ignore the challenges that arise from replication, and treat the database as if it had just a single node. In the early 2010s the NoSQL movement promoted the view that these features limited scalability, and that large-scale systems would have to embrace eventual consistency.

However, since then, a number of databases started providing strong consistency and transactions while also offering the fault tolerance, high availability, and scalability advantages of a distributed database. As mentioned in "Relational Model versus Document Model", this trend is known as NewSQL to contrast with NoSQL (although it's less about SQL specifically, and more about new approaches to scalable transaction management).

Even though scalable, strongly consistent distributed databases are now available, there are still good reasons why some applications choose to use different forms of replication that offer weaker consistency guarantees: they can offer stronger resilience in the face of network interruptions, and have lower overheads compared to transactional systems. We will explore such approaches in the rest of this chapter.

3. Multi-Leader Replication

In plain English: Instead of one teacher (leader) and many students (followers), imagine multiple teachers in different classrooms, each teaching and learning from each other. Everyone can write, but now you have to merge their notebooks when they differ. More complex, but each classroom can continue independently even if others are unreachable.

In technical terms: Multi-leader (active/active) replication allows multiple nodes to accept writes. Each leader replicates to others and to followers. Enables writes during network partitions, but requires conflict resolution when concurrent writes occur.

Why it matters: Multi-leader replication is essential for geo-distributed systems, offline-first applications, and collaborative editing. The tradeoff: simpler availability model, but complex conflict resolution and weaker consistency guarantees.

So far in this chapter we have only considered replication architectures using a single leader. Although that is a common approach, there are interesting alternatives.

Single-leader replication has one major downside: all writes must go through the one leader. If you can't connect to the leader for any reason, for example due to a network interruption between you and the leader, you can't write to the database.

A natural extension of the single-leader replication model is to allow more than one node to accept writes. Replication still happens in the same way: each node that processes a write must forward that data change to all the other nodes. We call this a multi-leader configuration (also known as active/active or bidirectional replication). In this setup, each leader simultaneously acts as a follower to the other leaders.

Multi-Leader Replication Architecture
Region 1: US East
Leader 1
Follower
Follower
Bidirectional replication
Region 2: EU West
Leader 2
Follower
Follower

As with single-leader replication, there is a choice between making it synchronous or asynchronous. Let's say you have two leaders, A and B, and you're trying to write to A. If writes are synchronously replicated from A to B, and the network between the two nodes is interrupted, you can't write to A until the network comes back. Synchronous multi-leader replication thus gives you a model that is very similar to single-leader replication, i.e. if you had made B the leader and A simply forwards any write requests to B to be executed.

For that reason, we won't go further into synchronous multi-leader replication, and simply treat it as equivalent to single-leader replication. The rest of this section focusses on asynchronous multi-leader replication, in which any leader can process writes even when its connection to the other leaders is interrupted.

3.1. Geographically Distributed Operation

In plain English: If your users are spread across the globe, putting all writes through a single leader in one region means users on the other side of the planet have high latency. Multi-leader lets each region have its own leader, so writes are fast locally, and sync happens in the background.

In technical terms: In a geo-distributed multi-leader setup, each region has a leader that handles writes locally. Leaders replicate asynchronously between regions. Within each region, single-leader replication is used with followers.

Why it matters: Geo-distribution with multi-leader offers better performance (local writes), better fault tolerance (regions can operate independently), and better network resilience (tolerates inter-region link failures). The cost is weaker consistency—you can't guarantee constraints like uniqueness across leaders.

It rarely makes sense to use a multi-leader setup within a single region, because the benefits rarely outweigh the added complexity. However, there are some situations in which this configuration is reasonable.

Imagine you have a database with replicas in several different regions (perhaps so that you can tolerate the failure of an entire region, or perhaps in order to be closer to your users). This is known as a geographically distributed, geo-distributed or geo-replicated setup. With single-leader replication, the leader has to be in one of the regions, and all writes must go through that region.

In a multi-leader configuration, you can have a leader in each region. Within each region, regular leader–follower replication is used (with followers maybe in a different availability zone from the leader); between regions, each region's leader replicates its changes to the leaders in other regions.

Let's compare how the single-leader and multi-leader configurations fare in a multi-region deployment:

Single-Leader (Multi-Region)
Multi-Leader (Multi-Region)
Performance
All writes go to one region; high latency for distant users
Writes processed locally; lower perceived latency
Regional Outage Tolerance
Failover needed to promote follower in another region
Each region operates independently; automatic failover
Network Problem Tolerance
Sensitive to inter-region link failures
Tolerates network interruptions well
Consistency
Can provide strong guarantees (serializability)
Weaker consistency; can't guarantee uniqueness or constraints

Performance: In a single-leader configuration, every write must go over the internet to the region with the leader. This can add significant latency to writes and might contravene the purpose of having multiple regions in the first place. In a multi-leader configuration, every write can be processed in the local region and is replicated asynchronously to the other regions. Thus, the inter-region network delay is hidden from users, which means the perceived performance may be better.

Tolerance of regional outages: In a single-leader configuration, if the region with the leader becomes unavailable, failover can promote a follower in another region to be leader. In a multi-leader configuration, each region can continue operating independently of the others, and replication catches up when the offline region comes back online.

Tolerance of network problems: Even with dedicated connections, traffic between regions can be less reliable than traffic between zones in the same region or within a single zone. A single-leader configuration is very sensitive to problems in this inter-region link, because when a client in one region wants to write to a leader in another region, it has to send its request over that link and wait for the response before it can complete. A multi-leader configuration with asynchronous replication can tolerate network problems better: during a temporary network interruption, each region's leader can continue independently processing writes.

Consistency: A single-leader system can provide strong consistency guarantees, such as serializable transactions, which we will discuss in Chapter 8. The biggest downside of multi-leader systems is that the consistency they can achieve is much weaker. For example, you can't guarantee that a bank account won't go negative or that a username is unique: it's always possible for different leaders to process writes that are individually fine (paying out some of the money in an account, registering a particular username), but which violate the constraint when taken together with another write on another leader.

💡 Insight

Multi-leader replication represents a fundamental tradeoff in distributed systems: you gain availability and partition tolerance at the expense of consistency. If you need to enforce constraints (uniqueness, referential integrity, account balances), you need coordination between leaders, which brings you back to single-leader performance characteristics.

This is simply a fundamental limitation of distributed systems. If you need to enforce such constraints, you're therefore better off with a single-leader system. However, as we will see in "Dealing with Conflicting Writes", multi-leader systems can still achieve consistency properties that are useful in a wide range of apps that don't need such constraints.

Multi-leader replication is less common than single-leader replication, but it is still supported by many databases, including MySQL, Oracle, SQL Server, and YugabyteDB. In some cases it is an external add-on feature, for example in Redis Enterprise, EDB Postgres Distributed, and pglogical.

As multi-leader replication is a somewhat retrofitted feature in many databases, there are often subtle configuration pitfalls and surprising interactions with other database features. For example, autoincrementing keys, triggers, and integrity constraints can be problematic. For this reason, multi-leader replication is often considered dangerous territory that should be avoided if possible.

Multi-leader replication topologies

A replication topology describes the communication paths along which writes are propagated from one node to another. If you have two leaders, there is only one plausible topology: leader 1 must send all of its writes to leader 2, and vice versa. With more than two leaders, various different topologies are possible.

Multi-Leader Replication Topologies
Circular
Leader 1
Leader 2
Leader 3
Star
Root
Leader 1
Leader 2
Leader 3
All-to-All
Leader 1
Leader 2
Leader 3
(all connected)

The most general topology is all-to-all, in which every leader sends its writes to every other leader. However, more restricted topologies are also used: for example a circular topology in which each node receives writes from one node and forwards those writes (plus any writes of its own) to one other node. Another popular topology has the shape of a star: one designated root node forwards writes to all of the other nodes. The star topology can be generalized to a tree.

note

Don't confuse a star-shaped network topology with a star schema (see "Stars and Snowflakes: Schemas for Analytics"), which describes the structure of a data model.

In circular and star topologies, a write may need to pass through several nodes before it reaches all replicas. Therefore, nodes need to forward data changes they receive from other nodes. To prevent infinite replication loops, each node is given a unique identifier, and in the replication log, each write is tagged with the identifiers of all the nodes it has passed through. When a node receives a data change that is tagged with its own identifier, that data change is ignored, because the node knows that it has already been processed.

Problems with different topologies:

A problem with circular and star topologies is that if just one node fails, it can interrupt the flow of replication messages between other nodes, leaving them unable to communicate until the node is fixed. The topology could be reconfigured to work around the failed node, but in most deployments such reconfiguration would have to be done manually. The fault tolerance of a more densely connected topology (such as all-to-all) is better because it allows messages to travel along different paths, avoiding a single point of failure.

On the other hand, all-to-all topologies can have issues too. In particular, some network links may be faster than others (e.g., due to network congestion), with the result that some replication messages may "overtake" others.

Causality Violation in All-to-All Topology
1
Client A inserts row
INSERT on Leader 1
2
Client B updates row
UPDATE on Leader 3
3
Leader 2 receives UPDATE first
Updating non-existent row!
4
Leader 2 receives INSERT
Too late—causality violated

In the example above, client A inserts a row into a table on leader 1, and client B updates that row on leader 3. However, leader 2 may receive the writes in a different order: it may first receive the update (which, from its point of view, is an update to a row that does not exist in the database) and only later receive the corresponding insert (which should have preceded the update).

This is a problem of causality, similar to the one we saw in "Consistent Prefix Reads": the update depends on the prior insert, so we need to make sure that all nodes process the insert first, and then the update. Simply attaching a timestamp to every write is not sufficient, because clocks cannot be trusted to be sufficiently in sync to correctly order these events at leader 2 (see Chapter 9).

To order these events correctly, a technique called version vectors can be used, which we will discuss later in this chapter (see "Detecting Concurrent Writes"). However, many multi-leader replication systems don't use good techniques for ordering updates, leaving them vulnerable to issues like the one above. If you are using multi-leader replication, it is worth being aware of these issues, carefully reading the documentation, and thoroughly testing your database to ensure that it really does provide the guarantees you believe it to have.

Sync Engines and Local-First Software

In plain English: Modern apps (Google Docs, Figma, calendar sync) need to work offline and show changes instantly without waiting for the server. This is multi-leader replication taken to the extreme: every device is a leader. Your phone, laptop, and tablet all have their own copy of the data and sync when they can.

In technical terms: A sync engine is a library that captures local changes, propagates them to collaborators (immediately if online, later if offline), merges concurrent changes, and resolves conflicts. Examples: CouchDB/PouchDB, Automerge, Yjs.

Why it matters: Sync engines enable a new class of applications—offline-first and local-first software—that work without constant server connectivity and can even survive the shutdown of the original service (like Git works without GitHub). This is the future of collaborative software.

Another situation in which multi-leader replication is appropriate is if you have an application that needs to continue to work while it is disconnected from the internet.

For example, consider the calendar apps on your mobile phone, your laptop, and other devices. You need to be able to see your meetings (make read requests) and enter new meetings (make write requests) at any time, regardless of whether your device currently has an internet connection. If you make any changes while you are offline, they need to be synced with a server and your other devices when the device is next online.

In this case, every device has a local database replica that acts as a leader (it accepts write requests), and there is an asynchronous multi-leader replication process (sync) between the replicas of your calendar on all of your devices. The replication lag may be hours or even days, depending on when you have internet access available.

From an architectural point of view, this setup is very similar to multi-leader replication between regions, taken to the extreme: each device is a "region," and the network connection between them is extremely unreliable.

Real-time collaboration, offline-first, and local-first apps:

Moreover, many modern web apps offer real-time collaboration features, such as Google Docs and Sheets for text documents and spreadsheets, Figma for graphics, and Linear for project management. What makes these apps so responsive is that user input is immediately reflected in the user interface, without waiting for a network round-trip to the server, and edits by one user are shown to their collaborators with low latency.

This again results in a multi-leader architecture: each web browser tab that has opened the shared file is a replica, and any updates that you make to the file are asynchronously replicated to the devices of the other users who have opened the same file. Even if the app does not allow you to continue editing a file while offline, the fact that multiple users can make edits without waiting for a response from the server already makes it multi-leader.

💡 Insight

The programming model of sync engines is fundamentally different from traditional server-centric apps. Instead of making service calls that can fail, you read and write local data (which almost never fails), and sync happens in the background. This leads to more declarative, reactive code and better user experience—no loading spinners or error handling for every action.

Both offline editing and real-time collaboration require a similar replication infrastructure: the application needs to capture any changes that the user makes to a file, and either send them to collaborators immediately (if online), or store them locally for sending later (if offline). Additionally, the application needs to receive changes from collaborators, merge them into the user's local copy of the file, and update the user interface to reflect the latest version. If multiple users have changed the file concurrently, conflict resolution logic may be needed to merge those changes.

A software library that supports this process is called a sync engine. Although the idea has existed for a long time, the term has recently gained attention. An application that allows a user to continue editing a file while offline (which may be implemented using a sync engine) is called offline-first. The term local-first software refers to collaborative apps that are not only offline-first, but are also designed to continue working even if the developer who made the software shuts down all of their online services. This can be achieved by using a sync engine with an open standard sync protocol for which multiple service providers are available. For example, Git is a local-first collaboration system (albeit one that doesn't support real-time collaboration) since you can sync via GitHub, GitLab, or any other repository hosting service.

Pros and cons of sync engines:

The dominant way of building web apps today is to keep very little persistent state on the client, and to rely on making requests to a server whenever a new piece of data needs to be displayed or some data needs to be updated. In contrast, when using a sync engine, you have persistent state on the client, and communication with the server is moved into a background process. The sync engine approach has a number of advantages:

Instant UI Response
Local data access means UI responds in <16ms (60fps), rather than waiting for network round-trips.
📴
Offline Support
No separate offline mode needed—being offline is just very large network delay. Critical for mobile.
🛠️
Simpler Programming Model
No error handling for every action. Read/write local data declaratively, sync handles distribution.
👥
Real-time Collaboration
Combined with reactive programming, sync engines make real-time updates straightforward.

Sync engines work best when all the data that the user may need is downloaded in advance and stored persistently on the client. This means that the data is available for offline access when needed, but it also means that sync engines are not suitable if the user has access to a very large amount of data. For example, downloading all the files that the user themselves created is probably fine (one user generally doesn't generate that much data), but downloading the entire catalog of an e-commerce website probably doesn't make sense.

The sync engine was pioneered by Lotus Notes in the 1980s (without using that term), and sync for specific apps such as calendars has also existed for a long time. Today there are a number of general-purpose sync engines, some of which use a proprietary backend service (e.g., Google Firestore, Realm, or Ditto), and some have an open source backend, making them suitable for creating local-first software (e.g., PouchDB/CouchDB, Automerge, or Yjs).

Multiplayer video games have a similar need to respond immediately to the user's local actions, and reconcile them with other players' actions received asynchronously over the network. In game development jargon the equivalent of a sync engine is called netcode. The techniques used in netcode are quite specific to the requirements of games, and don't directly carry over to other types of software, so we won't consider them further in this book.

3.2. Dealing with Conflicting Writes

In plain English: The fundamental problem with multi-leader: two people edit the same thing at the same time on different leaders. How do you merge their changes? Options: avoid conflicts entirely, pick one and throw away the other (last-write-wins), ask the user to merge manually, or use algorithms that automatically merge.

In technical terms: Conflict resolution strategies include: conflict avoidance (route updates to same leader), last-write-wins (LWW—discard concurrent writes based on timestamp), manual resolution (store siblings, let app/user merge), and automatic resolution (CRDTs, Operational Transformation).

Why it matters: The conflict resolution strategy determines your data loss characteristics and application complexity. LWW is simple but loses data. Manual resolution is flexible but burdens users. Automatic resolution (CRDTs) is the gold standard but requires choosing the right algorithm for your data type.

The biggest problem with multi-leader replication—both in a geo-distributed server-side database and a local-first sync engine on end user devices—is that concurrent writes on different leaders can lead to conflicts that need to be resolved.

For example, consider a wiki page that is simultaneously being edited by two users. User 1 changes the title of the page from A to B, and user 2 independently changes the title from A to C. Each user's change is successfully applied to their local leader. However, when the changes are asynchronously replicated, a conflict is detected. This problem does not occur in a single-leader database.

Write Conflict in Multi-Leader Replication
User 1
title: A → B
Leader 1
title = B
User 2
title: A → C
Leader 2
title = C
Replication
CONFLICT: title is both B and C!
note

We say that the two writes are concurrent because neither was "aware" of the other at the time the write was originally made. It doesn't matter whether the writes literally happened at the same time; indeed, if the writes were made while offline, they might have actually happened some time apart. What matters is whether one write occurred in a state where the other write has already taken effect.

In "Detecting Concurrent Writes" we will tackle the question of how a database can determine whether two writes are concurrent. For now we will assume that we can detect conflicts, and we want to figure out the best way of resolving them.

Conflict avoidance

One strategy for conflicts is to avoid them occurring in the first place. For example, if the application can ensure that all writes for a particular record go through the same leader, then conflicts cannot occur, even if the database as a whole is multi-leader. This approach is not possible in the case of a sync engine client being updated offline, but it is sometimes possible in geo-replicated server systems.

For example, in an application where a user can only edit their own data, you can ensure that requests from a particular user are always routed to the same region and use the leader in that region for reading and writing. Different users may have different "home" regions (perhaps picked based on geographic proximity to the user), but from any one user's point of view the configuration is essentially single-leader.

However, sometimes you might want to change the designated leader for a record—perhaps because one region is unavailable and you need to reroute traffic to another region, or perhaps because a user has moved to a different location and is now closer to a different region. There is now a risk that the user performs a write while the change of designated leader is in progress, leading to a conflict that would have to be resolved using one of the methods below. Thus, conflict avoidance breaks down if you allow the leader to be changed.

Another example of conflict avoidance: imagine you want to insert new records and generate unique IDs for them based on an auto-incrementing counter. If you have two leaders, you could set them up so that one leader only generates odd numbers and the other only generates even numbers. That way you can be sure that the two leaders won't concurrently assign the same ID to different records. We will discuss other ID assignment schemes in "ID Generators and Logical Clocks".

Last write wins (discarding concurrent writes)

If conflicts can't be avoided, the simplest way of resolving them is to attach a timestamp to each write, and to always use the value with the greatest timestamp. For example, in the wiki example above, let's say that the timestamp of user 1's write is greater than the timestamp of user 2's write. In that case, both leaders will determine that the new title of the page should be B, and they discard the write that sets it to C. If the writes coincidentally have the same timestamp, the winner can be chosen by comparing the values (e.g., in the case of strings, taking the one that's earlier in the alphabet).

This approach is called last write wins (LWW) because the write with the greatest timestamp can be considered the "last" one. The term is misleading though, because when two writes are concurrent, which one is older and which is later is undefined, and so the timestamp order of concurrent writes is essentially random.

Therefore the real meaning of LWW is: when the same record is concurrently written on different leaders, one of those writes is randomly chosen to be the winner, and the other writes are silently discarded, even though they were successfully processed at their respective leaders. This achieves the goal that eventually all replicas end up in a consistent state, but at the cost of data loss.

💡 Insight

Last-write-wins is deceptively dangerous. The name suggests temporal ordering, but for concurrent writes it's arbitrary—you're randomly throwing away writes that users believed were saved. Only use LWW if you can tolerate data loss, like for caches or append-only data with unique keys (UUIDs).

If you can avoid conflicts—for example, by only inserting records with a unique key such as a UUID, and never updating them—then LWW is no problem. But if you update existing records, or if different leaders may insert records with the same key, then you have to decide whether lost updates are a problem for your application. If lost updates are not acceptable, you need to use one of the conflict resolution approaches described below.

Another problem with LWW is that if a real-time clock (e.g. a Unix timestamp) is used as timestamp for the writes, the system becomes very sensitive to clock synchronization. If one node has a clock that is ahead of the others, and you try to overwrite a value written by that node, your write may be ignored as it may have a lower timestamp, even though it clearly occurred later. This problem can be solved by using a logical clock, which we will discuss in "ID Generators and Logical Clocks".

Manual conflict resolution

If randomly discarding some of your writes is not desirable, the next option is to resolve the conflict manually. You may be familiar with manual conflict resolution from Git and other version control systems: if commits on two different branches edit the same lines of the same file, and you try to merge those branches, you will get a merge conflict that needs to be resolved before the merge is complete.

In a database, it would be impractical for a conflict to stop the entire replication process until a human has resolved it. Instead, databases typically store all the concurrently written values for a given record—for example, both B and C in the wiki example. These values are sometimes called siblings. The next time you query that record, the database returns all those values, rather than just the latest one. You can then resolve those values in whatever way you want, either automatically in application code (for example, you could concatenate B and C into "B/C"), or by asking the user. You then write back a new value to the database to resolve the conflict.

This approach to conflict resolution is used in some systems, such as CouchDB. However, it also suffers from a number of problems:

  • The API of the database changes: for example, where previously the title of the wiki page was just a string, it now becomes a set of strings that usually contains one element, but may sometimes contain multiple elements if there is a conflict. This can make the data awkward to work with in application code.

  • Asking the user to manually merge the siblings is a lot of work, both for the app developer (who needs to build the user interface for conflict resolution) and for the user (who may be confused about what they are being asked to do, and why). In many cases, it's better to merge automatically than to bother the user.

  • Merging siblings automatically can lead to surprising behavior if it is not done carefully. For example, the shopping cart on Amazon used to allow concurrent updates, which were then merged by keeping all the shopping cart items that appeared in any of the siblings (i.e., taking the set union of the carts). This meant that if the customer had removed an item from their cart in one sibling, but another sibling still contained that old item, the removed item would unexpectedly reappear in the customer's cart.

Amazon Shopping Cart Anomaly
1
Cart: {Soap, Book, DVD}
Initial state on both devices
2
Device 1: Remove Book
Cart: {Soap, DVD}
3
Device 2: Remove DVD
Cart: {Soap, Book}
4
Merge by union
Cart: {Soap, Book, DVD} — items reappear!
  • If multiple nodes observe the conflict and concurrently resolve it, the conflict resolution process can itself introduce a new conflict. Those resolutions could even be inconsistent: for example, one node may merge B and C into "B/C" and another may merge them into "C/B" if you are not careful to order them consistently. When the conflict between "B/C" and "C/B" is merged, it may result in "B/C/C/B" or something similarly surprising.

Automatic conflict resolution

For many applications, the best way of handling conflicts is to use an algorithm that automatically merges concurrent writes into a consistent state. Automatic conflict resolution ensures that all replicas converge to the same state—i.e., all replicas that have processed the same set of writes have the same state, regardless of the order in which the writes arrived.

LWW is a simple example of a conflict resolution algorithm. More sophisticated merge algorithms have been developed for different types of data, with the goal of preserving the intended effect of all updates as much as possible, and hence avoiding data loss:

📝
Text (Strings)
Detect character insertions/deletions. Merged result preserves all edits. Concurrent insertions at same position ordered deterministically.
📋
Collections (Lists, Sets)
Track insertions and deletions. Shopping cart example: track that items were deleted, so merge is {Soap}, not {Soap, Book, DVD}.
🔢
Counters
Merge counts increments/decrements from each sibling. Result doesn't double-count or drop updates.
🗂️
Maps (Key-Value)
Apply conflict resolution to values under each key. Updates to different keys are independent.

There are limits to what is possible with conflict resolution. For example, if you want to enforce that a list contains no more than five items, and multiple users concurrently add items to the list so that there are more than five in total, your only option is to drop some of the items. Nevertheless, automatic conflict resolution is sufficient to build many useful apps. And if you start from the requirement of wanting to build a collaborative offline-first or local-first app, then conflict resolution is inevitable, and automating it is often the best approach.

CRDTs and Operational Transformation

Two families of algorithms are commonly used to implement automatic conflict resolution: Conflict-free replicated datatypes (CRDTs) and Operational Transformation (OT). They have different design philosophies and performance characteristics, but both are able to perform automatic merges for all the aforementioned types of data.

How OT and CRDTs Merge Concurrent Text Edits
Initial State
ice
Replica 1: Prepend 'n'
nice
Replica 2: Append '!'
ice!
Merge
Merged Result
nice!

Assume you have two replicas that both start off with the text "ice". One replica prepends the letter "n" to make "nice", while concurrently the other replica appends an exclamation mark to make "ice!". The merged result "nice!" is achieved differently by both types of algorithms:

Operational Transformation (OT):

We record the index at which characters are inserted or deleted: "n" is inserted at index 0, and "!" at index 3. Next, the replicas exchange their operations. The insertion of "n" at 0 can be applied as-is, but if the insertion of "!" at 3 were applied to the state "nice" we would get "nic!e", which is incorrect. We therefore need to transform the index of each operation to account for concurrent operations that have already been applied; in this case, the insertion of "!" is transformed to index 4 to account for the insertion of "n" at an earlier index.

CRDT:

Most CRDTs give each character a unique, immutable ID and use those to determine the positions of insertions/deletions, instead of indexes. For example, we assign the ID 1A to "i", the ID 2A to "c", etc. When inserting the exclamation mark, we generate an operation containing the ID of the new character (4B) and the ID of the existing character after which we want to insert (3A). To insert at the beginning of the string we give "nil" as the preceding character ID. Concurrent insertions at the same position are ordered by the IDs of the characters. This ensures that replicas converge without performing any transformation.

💡 Insight

CRDTs and OT represent the state of the art in automatic conflict resolution. OT is commonly used for real-time collaborative text editing (Google Docs), while CRDTs are more common in distributed databases and sync engines for JSON data. Both can combine to form hybrid algorithms with the best of both worlds.

There are many algorithms based on variations of these ideas. Lists/arrays can be supported similarly, using list elements instead of characters, and other datatypes such as key-value maps can be added quite easily. There are some performance and functionality trade-offs between OT and CRDTs, but it's possible to combine the advantages of CRDTs and OT in one algorithm.

OT is most often used for real-time collaborative editing of text, e.g. in Google Docs, whereas CRDTs can be found in distributed databases such as Redis Enterprise, Riak, and Azure Cosmos DB. Sync engines for JSON data can be implemented both with CRDTs (e.g., Automerge or Yjs) and with OT (e.g., ShareDB).

What is a conflict?

Some kinds of conflict are obvious. In the example above, two writes concurrently modified the same field in the same record, setting it to two different values. There is little doubt that this is a conflict.

Other kinds of conflict can be more subtle to detect. For example, consider a meeting room booking system: it tracks which room is booked by which group of people at which time. This application needs to ensure that each room is only booked by one group of people at any one time (i.e., there must not be any overlapping bookings for the same room). In this case, a conflict may arise if two different bookings are created for the same room at the same time. Even if the application checks availability before allowing a user to make a booking, there can be a conflict if the two bookings are made on two different leaders.

There isn't a quick ready-made answer, but in the following chapters we will trace a path toward a good understanding of this problem. We will see some more examples of conflicts in Chapter 8, and in future chapters we will discuss scalable approaches for detecting and resolving conflicts in a replicated system.

4. Leaderless Replication

In plain English: Leaderless replication is democracy for databases—no single node is in charge. When you write, you send it to multiple nodes simultaneously. When you read, you ask multiple nodes and take the newest answer. It's like asking three friends for directions and trusting the two who agree.

In technical terms: Clients write to multiple replicas in parallel, requiring w acknowledgments for success. Reads query r replicas and take the most recent value (by version number). If w + r > n (total replicas), read and write sets overlap, guaranteeing freshness.

Why it matters: Leaderless replication (Dynamo-style) offers excellent availability—no failover needed. One slow or down node barely impacts performance. The tradeoff: weaker consistency (eventual), and you must tune quorum parameters (n, r, w) to balance consistency vs. availability.

The replication approaches we have discussed so far in this chapter—single-leader and multi-leader replication—are based on the idea that a client sends a write request to one node (the leader), and the database system takes care of copying that write to the other replicas. A leader determines the order in which writes should be processed, and followers apply the leader's writes in the same order.

Some data storage systems take a different approach, abandoning the concept of a leader and allowing any replica to directly accept writes from clients. Some of the earliest replicated data systems were leaderless, but the idea was mostly forgotten during the era of dominance of relational databases. It once again became a fashionable architecture for databases after Amazon used it for its in-house Dynamo system in 2007. Riak, Cassandra, and ScyllaDB are open source datastores with leaderless replication models inspired by Dynamo, so this kind of database is also known as Dynamo-style.

note

The original Dynamo system was only described in a paper, but never released outside of Amazon. The similarly-named DynamoDB is a more recent cloud database from AWS, but it has a completely different architecture: it uses single-leader replication based on the Multi-Paxos consensus algorithm.

In some leaderless implementations, the client directly sends its writes to several replicas, while in others, a coordinator node does this on behalf of the client. However, unlike a leader database, that coordinator does not enforce a particular ordering of writes. As we shall see, this difference in design has profound consequences for the way the database is used.

4.1. Writing to the Database When a Node Is Down

In plain English: In a leaderless system, if one replica is down, you just ignore it and continue with the others. No failover drama. When it comes back online, it catches up via read repair (fixing stale values when detected during reads) or anti-entropy (background process that syncs replicas).

In technical terms: Write to all n replicas in parallel, succeed if w replicas acknowledge. Down replicas miss writes. When they recover, catch up via: (1) read repair—clients detect stale values during reads and write back fresh values, (2) hinted handoff—other replicas store writes on behalf of the down replica and forward them when it returns, (3) anti-entropy—background process compares replicas and copies missing data.

Why it matters: No failover means simpler operations and no split-brain risk. But stale replicas can serve outdated data indefinitely unless read repair or anti-entropy kicks in. Read repair only fixes frequently-read values; anti-entropy is slow and unordered.

Imagine you have a database with three replicas, and one of the replicas is currently unavailable—​perhaps it is being rebooted to install a system update. In a single-leader configuration, if you want to continue processing writes, you may need to perform a failover (see "Handling Node Outages").

On the other hand, in a leaderless configuration, failover does not exist. The client (user 1234) sends the write to all three replicas in parallel, and the two available replicas accept the write but the unavailable replica misses it. Let's say that it's sufficient for two out of three replicas to acknowledge the write: after user 1234 has received two ok responses, we consider the write to be successful. The client simply ignores the fact that one of the replicas missed the write.

Quorum Write and Read Repair
Write Phase (w=2)
Client: Write v7
Replica 1: OK (v7)
Replica 2: OK (v7)
Replica 3: DOWN
Read Phase (r=2)
Client: Read
Replica 1: v7
Replica 2: v7
Replica 3: v6 (stale)
Read Repair
Client detects Replica 3 is stale, writes v7 back to it

Now imagine that the unavailable node comes back online, and clients start reading from it. Any writes that happened while the node was down are missing from that node. Thus, if you read from that node, you may get stale (outdated) values as responses.

To solve that problem, when a client reads from the database, it doesn't just send its request to one replica: read requests are also sent to several nodes in parallel. The client may get different responses from different nodes; for example, the up-to-date value from one node and a stale value from another.

In order to tell which responses are up-to-date and which are outdated, every value that is written needs to be tagged with a version number or timestamp, similarly to what we saw in "Last write wins (discarding concurrent writes)". When a client receives multiple values in response to a read, it uses the one with the greatest timestamp (even if that value was only returned by one replica, and several other replicas returned older values). See "Detecting Concurrent Writes" for more details.

Catching up on missed writes

The replication system should ensure that eventually all the data is copied to every replica. After an unavailable node comes back online, how does it catch up on the writes that it missed? Several mechanisms are used in Dynamo-style datastores:

1
Read Repair
Client detects stale value during read, writes newer value back to stale replica. Works for frequently-read data.
2
Hinted Handoff
Another replica stores writes on behalf of down replica. When it recovers, hints are sent and deleted. Helps with never-read data.
3
Anti-Entropy
Background process periodically compares replicas and copies missing data. No particular order, may have significant delay.

Read repair: When a client makes a read from several nodes in parallel, it can detect any stale responses. For example, user 2345 gets a version 6 value from replica 3 and a version 7 value from replicas 1 and 2. The client sees that replica 3 has a stale value and writes the newer value back to that replica. This approach works well for values that are frequently read.

Hinted handoff: If one replica is unavailable, another replica may store writes on its behalf in the form of hints. When the replica that was supposed to receive those writes comes back, the replica storing the hints sends them to the recovered replica, and then deletes the hints. This handoff process helps bring replicas up-to-date even for values that are never read, and therefore not handled by read repair.

Anti-entropy: In addition, there is a background process that periodically looks for differences in the data between replicas and copies any missing data from one replica to another. Unlike the replication log in leader-based replication, this anti-entropy process does not copy writes in any particular order, and there may be a significant delay before data is copied.

4.2. Quorums for reading and writing

In plain English: Quorums are about overlapping sets. If you write to 2 out of 3 servers and read from 2 out of 3 servers, at least one server will be in both sets—so you're guaranteed to read the latest write. It's like the pigeon-hole principle: 2 + 2 > 3, so there must be overlap.

In technical terms: With n replicas, require w acknowledgments for writes and r responses for reads. If w + r > n, the read and write sets must overlap, guaranteeing at least one node has the latest value. Common config: n=3, w=2, r=2 (tolerates 1 failure). For higher availability: n=5, w=3, r=3 (tolerates 2 failures).

Why it matters: Quorum parameters give you a tunable consistency-availability tradeoff. Higher w means fewer lost writes but lower write availability. Higher r means fresher reads but lower read availability. Tuning these is key to Dynamo-style systems.

In the example above, we considered the write to be successful even though it was only processed on two out of three replicas. What if only one out of three replicas accepted the write? How far can we push this?

If we know that every successful write is guaranteed to be present on at least two out of three replicas, that means at most one replica can be stale. Thus, if we read from at least two replicas, we can be sure that at least one of the two is up to date. If the third replica is down or slow to respond, reads can nevertheless continue returning an up-to-date value.

More generally, if there are n replicas, every write must be confirmed by w nodes to be considered successful, and we must query at least r nodes for each read. (In our example, n = 3, w = 2, r = 2.) As long as w + r > n, we expect to get an up-to-date value when reading, because at least one of the r nodes we're reading from must be up to date. Reads and writes that obey these r and w values are called quorum reads and writes. You can think of r and w as the minimum number of votes required for the read or write to be valid.

Quorum Reads and Writes
Configuration: n=5, w=3, r=3
Write (w=3)
✓ Rep 1
✓ Rep 2
✓ Rep 3
Rep 4
Rep 5
Read (r=3)
Rep 1
Rep 2
✓ Rep 3
✓ Rep 4
✓ Rep 5
Overlap: Replica 3 has latest value → Read succeeds!

In Dynamo-style databases, the parameters n, w, and r are typically configurable. A common choice is to make n an odd number (typically 3 or 5) and to set w = r = (n + 1) / 2 (rounded up). However, you can vary the numbers as you see fit. For example, a workload with few writes and many reads may benefit from setting w = n and r = 1. This makes reads faster, but has the disadvantage that just one failed node causes all database writes to fail.

note

There may be more than n nodes in the cluster, but any given value is stored only on n nodes. This allows the dataset to be sharded, supporting datasets that are larger than you can fit on one node. We will return to sharding in Chapter 7.

The quorum condition, w + r > n, allows the system to tolerate unavailable nodes as follows:

  • If w < n, we can still process writes if a node is unavailable.
  • If r < n, we can still process reads if a node is unavailable.
  • With n = 3, w = 2, r = 2 we can tolerate one unavailable node.
  • With n = 5, w = 3, r = 3 we can tolerate two unavailable nodes.

Normally, reads and writes are always sent to all n replicas in parallel. The parameters w and r determine how many nodes we wait for—i.e., how many of the n nodes need to report success before we consider the read or write to be successful.

If fewer than the required w or r nodes are available, writes or reads return an error. A node could be unavailable for many reasons: because the node is down (crashed, powered down), due to an error executing the operation (can't write because the disk is full), due to a network interruption between the client and the node, or for any number of other reasons. We only care whether the node returned a successful response and don't need to distinguish between different kinds of fault.

4.3. Limitations of Quorum Consistency

In plain English: Quorums sound great in theory—w + r > n guarantees freshness, right? In practice, edge cases abound: nodes can fail with partial writes, rebalancing can break quorum overlap, concurrent writes introduce ambiguity, and clock skew can cause writes to be silently dropped. Quorums give you high probability of consistency, not certainty.

In technical terms: Even with w + r > n, stale reads can occur due to: (1) node failures breaking quorum, (2) rebalancing changing which nodes hold data, (3) concurrent writes (read may see old or new), (4) partial write failures (fewer than w successes), (5) clock skew causing timestamp-based conflicts (LWW with Cassandra).

Why it matters: Don't treat quorum parameters as strict consistency guarantees—they're tuning knobs for probabilistic freshness. For strong consistency, use linearizable systems (Chapter 10). For eventual consistency, monitor replication lag and design your application to tolerate stale reads.

If you have n replicas, and you choose w and r such that w + r > n, you can generally expect every read to return the most recent value written for a key. This is the case because the set of nodes to which you've written and the set of nodes from which you've read must overlap. That is, among the nodes you read there must be at least one node with the latest value.

Often, r and w are chosen to be a majority (more than n/2) of nodes, because that ensures w + r > n while still tolerating up to n/2 (rounded down) node failures. But quorums are not necessarily majorities—it only matters that the sets of nodes used by the read and write operations overlap in at least one node. Other quorum assignments are possible, which allows some flexibility in the design of distributed algorithms.

You may also set w and r to smaller numbers, so that w + r ≤ n (i.e., the quorum condition is not satisfied). In this case, reads and writes will still be sent to n nodes, but a smaller number of successful responses is required for the operation to succeed.

With a smaller w and r you are more likely to read stale values, because it's more likely that your read didn't include the node with the latest value. On the upside, this configuration allows lower latency and higher availability: if there is a network interruption and many replicas become unreachable, there's a higher chance that you can continue processing reads and writes. Only after the number of reachable replicas falls below w or r does the database become unavailable for writing or reading, respectively.

However, even with w + r > n, there are edge cases in which the consistency properties can be confusing. Some scenarios include:

💾
Node Failure with Data Restore
If a node with new value fails and is restored from backup with old value, the number storing new value may fall below w, breaking quorum.
⚖️
Rebalancing in Progress
While data is moving between nodes, replicas may have inconsistent views of which nodes should hold the data, breaking quorum overlap.
Concurrent Writes
If a read is concurrent with a write, it may or may not see the new value. One read may see new value, next read may see old.
⚠️
Partial Write Failures
If write succeeded on <w replicas but failed overall, it's not rolled back. Subsequent reads may or may not see it.
Clock Skew (LWW)
With timestamp-based conflict resolution (Cassandra), writes might be silently dropped if another node has a faster clock.
🔀
Concurrent Write Conflicts
Two concurrent writes may be processed in different orders on different replicas, leading to conflicts requiring resolution.

Thus, although quorums appear to guarantee that a read returns the latest written value, in practice it is not so simple. Dynamo-style databases are generally optimized for use cases that can tolerate eventual consistency. The parameters w and r allow you to adjust the probability of stale values being read, but it's wise to not take them as absolute guarantees.

💡 Insight

Quorums are probabilistic, not deterministic. Think of w and r as tuning knobs that adjust the likelihood of seeing fresh data, not as hard guarantees. For truly consistent reads, you need linearizability (Chapter 10), which requires coordination and hurts availability.

Monitoring staleness

From an operational perspective, it's important to monitor whether your databases are returning up-to-date results. Even if your application can tolerate stale reads, you need to be aware of the health of your replication. If it falls behind significantly, it should alert you so that you can investigate the cause (for example, a problem in the network or an overloaded node).

For leader-based replication, the database typically exposes metrics for the replication lag, which you can feed into a monitoring system. This is possible because writes are applied to the leader and to followers in the same order, and each node has a position in the replication log (the number of writes it has applied locally). By subtracting a follower's current position from the leader's current position, you can measure the amount of replication lag.

However, in systems with leaderless replication, there is no fixed order in which writes are applied, which makes monitoring more difficult. The number of hints that a replica stores for handoff can be one measure of system health, but it's difficult to interpret usefully. Eventual consistency is a deliberately vague guarantee, but for operability it's important to be able to quantify "eventual."

Single-Leader vs. Leaderless Replication Performance

A replication system based on a single leader can provide strong consistency guarantees that are difficult or impossible to achieve in a leaderless system. However, as we have seen in "Problems with Replication Lag", reads in a leader-based replicated system can also return stale values if you make them on an asynchronously updated follower.

Reading from the leader ensures up-to-date responses, but it suffers from performance problems:

  • Read throughput is limited by the leader's capacity to handle requests (in contrast with read scaling, which distributes reads across asynchronously updated replicas that may return stale values).
  • If the leader fails, you have to wait for the fault to be detected, and for the failover to complete before you can continue handling requests. Even if the failover process is very quick, users will notice it because of the temporarily increased response times; if failover takes a long time, the system is unavailable for its duration.
  • The system is very sensitive to performance problems on the leader: if the leader is slow to respond, e.g. due to overload or some resource contention, the increased response times immediately affect users as well.

A big advantage of a leaderless architecture is that it is more resilient against such issues. Because there is no failover, and requests go to multiple replicas in parallel anyway, one replica becoming slow or unavailable has very little impact on response times: the client simply uses the responses from the other replicas that are faster to respond. Using the fastest responses is called request hedging, and it can significantly reduce tail latency.

At its core, the resilience of a leaderless system comes from the fact that it doesn't distinguish between the normal case and the failure case. This is especially helpful when handling so-called gray failures, in which a node isn't completely down, but running in a degraded state where it is unusually slow to handle requests, or when a node is simply overloaded (for example, if a node has been offline for a while, recovery via hinted handoff can cause a lot of additional load). A leader-based system has to decide whether the situation is bad enough to warrant a failover (which can itself cause further disruption), whereas in a leaderless system that question doesn't even arise.

That said, leaderless systems can have performance problems as well:

  • Even though the system doesn't need to perform failover, one replica does need to detect when another replica is unavailable so that it can store hints about writes that the unavailable replica missed. When the unavailable replica comes back, the handoff process needs to send it those hints. This puts additional load on the replicas at a time when the system is already under strain.

  • The more replicas you have, the bigger the size of your quorums, and the more responses you have to wait for before a request can complete. Even if you wait only for the fastest r or w replicas to respond, and even if you make the requests in parallel, a bigger r or w increases the chance that you hit a slow replica, increasing the overall response time (see "Use of Response Time Metrics").

  • A large-scale network interruption that disconnects a client from a large number of replicas can make it impossible to form a quorum. Some leaderless databases offer a configuration option that allows any reachable replica to accept writes, even if it's not one of the usual replicas for that key (Riak and Dynamo call this a sloppy quorum; Cassandra and ScyllaDB call it consistency level ANY). There is no guarantee that subsequent reads will see the written value, but depending on the application it may still be better than having the write fail.

Multi-leader replication can offer even greater resilience against network interruptions than leaderless replication, since reads and writes only require communication with one leader, which can be co-located with the client. However, since a write on one leader is propagated asynchronously to the others, reads can be arbitrarily out-of-date. Quorum reads and writes provide a compromise: good fault tolerance while also having a high likelihood of reading up-to-date data.

Multi-region operation

We previously discussed cross-region replication as a use case for multi-leader replication (see "Multi-Leader Replication"). Leaderless replication is also suitable for multi-region operation, since it is designed to tolerate conflicting concurrent writes, network interruptions, and latency spikes.

Cassandra and ScyllaDB implement their multi-region support within the normal leaderless model: the client sends its writes directly to the replicas in all regions, and you can choose from a variety of consistency levels that determine how many responses are required for a request to be successful. For example, you can request a quorum across the replicas in all the regions, a separate quorum in each of the regions, or a quorum only in the client's local region. A local quorum avoids having to wait for slow requests to other regions, but it is also more likely to return stale results.

Riak keeps all communication between clients and database nodes local to one region, so n describes the number of replicas within one region. Cross-region replication between database clusters happens asynchronously in the background, in a style that is similar to multi-leader replication.

4.4. Detecting Concurrent Writes

In plain English: When two people edit the same shopping cart simultaneously on different replicas, how do you know which edits came first, and which should win? You can't rely on timestamps (clocks are unreliable). Instead, use version numbers: each replica tracks what version it has seen, and the database can detect when two writes are concurrent (neither knew about the other).

In technical terms: Use version vectors (one version number per replica) to capture causality. When a write includes version numbers from prior reads, it indicates which state the write is based on. Concurrent writes (those with incomparable version vectors) need conflict resolution via CRDTs, LWW, or manual merging.

Why it matters: Detecting concurrency correctly is essential for avoiding lost updates. Version vectors enable the database to distinguish "A happened before B" (B overwrites A) from "A and B are concurrent" (conflict resolution needed). This is the foundation of Dynamo-style conflict handling.

Like with multi-leader replication, leaderless databases allow concurrent writes to the same key, resulting in conflicts that need to be resolved. Such conflicts may occur as the writes happen, but not always: they could also be detected later during read repair, hinted handoff, or anti-entropy.

The problem is that events may arrive in a different order at different nodes, due to variable network delays and partial failures. For example, two clients, A and B, simultaneously write to a key X in a three-node datastore:

  • Node 1 receives the write from A, but never receives the write from B due to a transient outage.
  • Node 2 first receives the write from A, then the write from B.
  • Node 3 first receives the write from B, then the write from A.
Concurrent Writes - No Well-Defined Ordering
Node 1
A writes X=A
B's write lost
Final: X=A
Node 2
A writes X=A
B writes X=B
Final: X=B
Node 3
B writes X=B
A writes X=A
Final: X=A
Nodes permanently inconsistent without conflict resolution!

If each node simply overwrote the value for a key whenever it received a write request from a client, the nodes would become permanently inconsistent, as shown by the final get request in the diagram: node 2 thinks that the final value of X is B, whereas the other nodes think that the value is A.

In order to become eventually consistent, the replicas should converge toward the same value. For this, we can use any of the conflict resolution mechanisms we previously discussed in "Dealing with Conflicting Writes", such as last-write-wins (used by Cassandra and ScyllaDB), manual resolution, or CRDTs (described in "CRDTs and Operational Transformation", and used by Riak).

Last-write-wins is easy to implement: each write is tagged with a timestamp, and a value with a higher timestamp always overwrites a value with a lower timestamp. However, a timestamp doesn't tell you whether two values are actually conflicting (i.e., they were written concurrently) or not (they were written one after another). If you want to resolve conflicts explicitly, the system needs to take more care to detect concurrent writes.

The "happens-before" relation and concurrency

How do we decide whether two operations are concurrent or not? To develop an intuition, let's look at some examples:

  • In a previous example, the two writes are not concurrent: A's insert happens before B's increment, because the value incremented by B is the value inserted by A. In other words, B's operation builds upon A's operation, so B's operation must have happened later. We also say that B is causally dependent on A.

  • On the other hand, the two writes in the diagram above are concurrent: when each client starts the operation, it does not know that another client is also performing an operation on the same key. Thus, there is no causal dependency between the operations.

An operation A happens before another operation B if B knows about A, or depends on A, or builds upon A in some way. Whether one operation happens before another operation is the key to defining what concurrency means. In fact, we can simply say that two operations are concurrent if neither happens before the other (i.e., neither knows about the other).

Thus, whenever you have two operations A and B, there are three possibilities: either A happened before B, or B happened before A, or A and B are concurrent. What we need is an algorithm to tell us whether two operations are concurrent or not. If one operation happened before another, the later operation should overwrite the earlier operation, but if the operations are concurrent, we have a conflict that needs to be resolved.

Concurrency, Time, and Relativity

It may seem that two operations should be called concurrent if they occur "at the same time"—but in fact, it is not important whether they literally overlap in time. Because of problems with clocks in distributed systems, it is actually quite difficult to tell whether two things happened at exactly the same time—an issue we will discuss in more detail in Chapter 9.

For defining concurrency, exact time doesn't matter: we simply call two operations concurrent if they are both unaware of each other, regardless of the physical time at which they occurred. People sometimes make a connection between this principle and the special theory of relativity in physics, which introduced the idea that information cannot travel faster than the speed of light. Consequently, two events that occur some distance apart cannot possibly affect each other if the time between the events is shorter than the time it takes light to travel the distance between them.

In computer systems, two operations might be concurrent even though the speed of light would in principle have allowed one operation to affect the other. For example, if the network was slow or interrupted at the time, two operations can occur some time apart and still be concurrent, because the network problems prevented one operation from being able to know about the other.

Capturing the happens-before relationship

Let's look at an algorithm that determines whether two operations are concurrent, or whether one happened before another. To keep things simple, let's start with a database that has only one replica. Once we have worked out how to do this on a single replica, we can generalize the approach to a leaderless database with multiple replicas.

The following example shows two clients concurrently adding items to the same shopping cart. (If that example strikes you as too inane, imagine instead two air traffic controllers concurrently adding aircraft to the sector they are tracking.) Initially, the cart is empty. Between them, the clients make five writes to the database:

1
Client 1: Add milk (v1)
Server assigns version 1, stores [milk]
2
Client 2: Add eggs (v1)
Concurrent with milk! Server assigns v2, siblings: [milk], [eggs]
3
Client 1: Add flour (v1)
Knows only milk. Server v3: overwrites [milk], keeps [eggs] sibling
4
Client 2: Add ham (v2)
Merges [milk]+[eggs]. Server v4: keeps [milk,flour] sibling
5
Client 1: Add bacon (v3)
Merges [milk,flour]+[eggs]. Final: siblings [milk,flour,eggs,bacon] and [eggs,milk,ham]

The dataflow between the operations is illustrated graphically in a directed acyclic graph (DAG). The arrows indicate which operation happened before which other operation, in the sense that the later operation knew about or depended on the earlier one. In this example, the clients are never fully up to date with the data on the server, since there is always another operation going on concurrently. But old versions of the value do get overwritten eventually, and no writes are lost.

Note that the server can determine whether two operations are concurrent by looking at the version numbers—it does not need to interpret the value itself (so the value could be any data structure). The algorithm works as follows:

1
Server: Maintain version number
Increment version for every write to a key
2
Client: Read before write
Server returns all siblings + latest version number
3
Client: Write with version
Include version from prior read + merge all siblings from that read
4
Server: Overwrite or keep siblings
Overwrite all values ≤ version; keep values > version as siblings
  1. The server maintains a version number for every key, increments the version number every time that key is written, and stores the new version number along with the value written.

  2. When a client reads a key, the server returns all siblings, i.e., all values that have not been overwritten, as well as the latest version number. A client must read a key before writing.

  3. When a client writes a key, it must include the version number from the prior read, and it must merge together all values that it received in the prior read, e.g. using a CRDT or by asking the user. The response from a write request is like a read, returning all siblings, which allows us to chain several writes like in the shopping cart example.

  4. When the server receives a write with a particular version number, it can overwrite all values with that version number or below (since it knows that they have been merged into the new value), but it must keep all values with a higher version number (because those values are concurrent with the incoming write).

When a write includes the version number from a prior read, that tells us which previous state the write is based on. If you make a write without including a version number, it is concurrent with all other writes, so it will not overwrite anything—it will just be returned as one of the values on subsequent reads.

Version vectors

The example above used only a single replica. How does the algorithm change when there are multiple replicas, but no leader?

The single-replica example uses a single version number to capture dependencies between operations, but that is not sufficient when there are multiple replicas accepting writes concurrently. Instead, we need to use a version number per replica as well as per key. Each replica increments its own version number when processing a write, and also keeps track of the version numbers it has seen from each of the other replicas. This information indicates which values to overwrite and which values to keep as siblings.

The collection of version numbers from all the replicas is called a version vector. A few variants of this idea are in use, but the most interesting is probably the dotted version vector, which is used in Riak 2.0. We won't go into the details, but the way it works is quite similar to what we saw in our cart example.

Like the version numbers in the single-replica example, version vectors are sent from the database replicas to clients when values are read, and need to be sent back to the database when a value is subsequently written. (Riak encodes the version vector as a string that it calls causal context.) The version vector allows the database to distinguish between overwrites and concurrent writes.

The version vector also ensures that it is safe to read from one replica and subsequently write back to another replica. Doing so may result in siblings being created, but no data is lost as long as siblings are merged correctly.

Version vectors and vector clocks

A version vector is sometimes also called a vector clock, even though they are not quite the same. The difference is subtle—please see the references for details. In brief, when comparing the state of replicas, version vectors are the right data structure to use.

5. Summary

In this chapter we looked at the issue of replication. Replication can serve several purposes:

🔧
High Availability
Keeping the system running, even when one machine (or several machines, a zone, or even an entire region) goes down
📴
Disconnected Operation
Allowing an application to continue working when there is a network interruption
Latency
Placing data geographically close to users, so that users can interact with it faster
📈
Scalability
Being able to handle a higher volume of reads than a single machine could handle, by performing reads on replicas

Despite being a simple goal—keeping a copy of the same data on several machines—replication turns out to be a remarkably tricky problem. It requires carefully thinking about concurrency and about all the things that can go wrong, and dealing with the consequences of those faults. At a minimum, we need to deal with unavailable nodes and network interruptions (and that's not even considering the more insidious kinds of fault, such as silent data corruption due to software bugs or hardware errors).

We discussed three main approaches to replication:

Single-Leader
Multi-Leader / Leaderless
Write handling
All writes to one leader node
Multiple leaders or all replicas accept writes
Consistency
Strong consistency possible
Eventual consistency, conflicts likely
Availability
Failover needed if leader fails
No failover; continues with available nodes
Use cases
Most databases, easiest to understand
Geo-distribution, offline apps, high availability

Single-leader replication: Clients send all writes to a single node (the leader), which sends a stream of data change events to the other replicas (followers). Reads can be performed on any replica, but reads from followers might be stale.

Multi-leader replication: Clients send each write to one of several leader nodes, any of which can accept writes. The leaders send streams of data change events to each other and to any follower nodes.

Leaderless replication: Clients send each write to several nodes, and read from several nodes in parallel in order to detect and correct nodes with stale data.

Each approach has advantages and disadvantages. Single-leader replication is popular because it is fairly easy to understand and it offers strong consistency. Multi-leader and leaderless replication can be more robust in the presence of faulty nodes, network interruptions, and latency spikes—at the cost of requiring conflict resolution and providing weaker consistency guarantees.

Replication can be synchronous or asynchronous, which has a profound effect on the system behavior when there is a fault. Although asynchronous replication can be fast when the system is running smoothly, it's important to figure out what happens when replication lag increases and servers fail. If a leader fails and you promote an asynchronously updated follower to be the new leader, recently committed data may be lost.

We looked at some strange effects that can be caused by replication lag, and we discussed a few consistency models which are helpful for deciding how an application should behave under replication lag:

1
Read-after-write consistency
Users should always see data that they submitted themselves
2
Monotonic reads
After users have seen data at one point in time, they shouldn't later see data from some earlier point
3
Consistent prefix reads
Users should see data in a state that makes causal sense: seeing a question and its reply in the correct order

Finally, we discussed how multi-leader and leaderless replication ensure that all replicas eventually converge to a consistent state: by using a version vector or similar algorithm to detect which writes are concurrent, and by using a conflict resolution algorithm such as a CRDT to merge the concurrently written values. Last-write-wins and manual conflict resolution are also possible.

💡 Insight

Replication is fundamentally about trading off consistency for availability and performance. Single-leader offers the simplest consistency model but limits availability. Multi-leader and leaderless offer better availability but require sophisticated conflict resolution. Choose based on your application's tolerance for stale reads and your operational complexity budget.

This chapter has assumed that every replica stores a full copy of the whole database, which is unrealistic for large datasets. In the next chapter we will look at sharding, which allows each machine to store only a subset of the data.


Previous: Chapter 5: Encoding | Next: Chapter 7: Sharding