Skip to main content

System Design Interview Questions (2026)

100 real interview questions with in-depth answers — 30 basic, 40 intermediate, 30 advanced. Updated April 2026.

Preparing for a Software Engineer (System Design) role?

A system design interview evaluates your ability to translate ambiguous, open-ended requirements into a concrete, scalable architecture. Interviewers look for requirement clarification (functional vs. non-functional), capacity estimation (QPS, storage, bandwidth), component selection with trade-off reasoning, and awareness of failure modes. Unlike a coding round, there is rarely one correct answer — the process of reasoning matters as much as the final diagram.

Vertical scaling (scaling up) means adding more CPU, RAM, or disk to a single machine. Horizontal scaling (scaling out) means adding more machines and distributing load across them. Vertical scaling hits a hard ceiling and creates a single point of failure; horizontal scaling is theoretically unbounded but requires statelessness or shared state management (sticky sessions, distributed caches). Most modern architectures scale horizontally using commodity servers behind a load balancer.

A load balancer distributes incoming requests across a pool of backend servers to maximize throughput, minimize latency, and avoid overwhelming any single server. It performs health checks and removes unhealthy nodes from the rotation. Layer 4 load balancers route by TCP/UDP headers; Layer 7 load balancers (like Nginx or AWS ALB) inspect HTTP headers, cookies, and URL paths to make smarter routing decisions such as sending /api traffic to API servers and /static to storage.

Round-robin distributes requests sequentially to each server in turn — simple but ignores server capacity. Least-connections sends each new request to the server with the fewest active connections, which handles heterogeneous workloads better. IP-hash routes requests from the same client IP to the same server, providing sticky sessions without server-side session sharing. Weighted variants of each allow heavier servers to receive proportionally more traffic. Most cloud providers offer all three through ALB, GCP GCLB, or Nginx upstream configuration.

A Content Delivery Network (CDN) is a geographically distributed network of edge servers that caches and serves static assets (images, JS, CSS, videos) from the location closest to the user. CDNs reduce origin-server load, cut latency by hundreds of milliseconds for global users, and absorb DDoS traffic at the edge. Use a CDN whenever you serve static or semi-static content to a global audience. Popular options include Cloudflare, AWS CloudFront, and Fastly. Dynamic content can also benefit from CDN via edge caching with short TTLs or Vary headers.

Browser cache stores responses locally per RFC Cache-Control headers. CDN cache sits at edge nodes, serving static assets without hitting origin. Application-layer cache (Redis, Memcached) stores expensive DB query results or serialized objects in memory. Database query cache (deprecated in MySQL 8, but present in some engines) caches result sets. Finally, CPU L1/L2/L3 caches are managed by the OS. The architecture flows: Client → Browser Cache → CDN Edge → Load Balancer → App Server → Redis → Database. Each layer trades freshness for speed.
Cache hit rate is the percentage of requests served from the cache rather than the origin. A 99% hit rate means only 1% of requests reach the DB — a 100× reduction in DB load. Hit rate depends on working-set size (what fraction of data fits in memory), access skew (Zipfian distributions concentrate hits on popular keys), TTL configuration, and eviction policy. Monitoring cache hit rate in Redis via INFO stats (keyspace_hits / (keyspace_hits + keyspace_misses)) is essential for diagnosing performance regressions.
TTL (time-to-live) expires entries after a fixed duration — simple but can serve stale data. LRU (Least Recently Used) evicts the least recently accessed key when memory is full. Write-through updates cache and DB synchronously on every write, keeping them consistent but adding write latency. Write-behind (write-back) writes to cache first and asynchronously flushes to DB, improving write throughput at the risk of data loss on crash. Write-around bypasses cache on writes, letting the cache fill on reads — good for write-heavy, rarely re-read data. Cache-aside (lazy loading) is the most common: on miss, the app reads from DB, populates cache, and serves the result.
An index is a separate data structure (typically a B-tree or LSM-tree) that maps column values to the rows containing them, allowing the database engine to find rows without a full table scan. A B-tree index reduces lookup from O(n) to O(log n). The trade-off: indexes speed up reads but slow down writes because each INSERT/UPDATE/DELETE must also update the index. Over-indexing bloats storage and degrades write performance. Composite indexes support multi-column queries but must be used in left-prefix order.
Choose SQL (PostgreSQL, MySQL) when you need ACID transactions, complex joins, a well-defined schema, or strong consistency — e.g., financial ledgers, e-commerce orders. Choose NoSQL when you need flexible schemas, massive horizontal write scaling, or specialized access patterns: document stores (MongoDB) for nested data, key-value stores (Redis, DynamoDB) for high-speed lookups, wide-column stores (Cassandra) for time-series or write-heavy workloads, and graph databases (Neo4j) for relationship traversal. The false dichotomy: many systems use both — PostgreSQL for authoritative records and Redis for caching.
A primary key uniquely identifies a row. In distributed systems, auto-increment integers are problematic because multiple nodes cannot coordinate without a central sequence generator. UUID v4 solves this but sacrifices B-tree locality. Snowflake IDs (Twitter) and ULIDs encode time + random bits to be globally unique and time-sortable. A foreign key enforces referential integrity in a single database, but across microservices or sharded DBs you cannot have cross-node foreign keys — you manage consistency at the application layer (eventual consistency, sagas).
ACID stands for Atomicity (a transaction fully commits or fully rolls back — no partial writes), Consistency (every transaction brings the DB from one valid state to another, respecting all constraints), Isolation (concurrent transactions execute as if they were sequential — enforced via locks or MVCC), and Durability (committed data survives crashes, enforced by write-ahead logs flushed to disk). Full ACID across distributed nodes is expensive — two-phase commit is required — which is why many distributed systems relax isolation to read-committed or even offer only eventual consistency.
Eventual consistency is a consistency model where, given no new writes, all replicas will converge to the same value — but reads may temporarily return stale data. It is used in systems like DynamoDB, Cassandra, and DNS because it allows higher availability and lower latency than strong consistency (which requires coordination). The trade-off is that applications must tolerate temporary divergence and use techniques like vector clocks, conflict resolution (last-write-wins, merge functions), or read-repair to reconcile differences.
CAP theorem (Brewer, 2000) states that a distributed system can guarantee at most two of three properties: Consistency (every read sees the most recent write), Availability (every request gets a non-error response), and Partition Tolerance (the system operates despite network partitions). Since network partitions are unavoidable in practice, real systems choose between CP (consistent under partition — e.g., HBase, Zookeeper) and AP (available under partition — e.g., Cassandra, DynamoDB). The limitation: CAP over-simplifies; PACELC extends it to address latency vs. consistency trade-offs even when there is no partition.
A message queue is a broker that stores messages from producers until consumers process them, decoupling the two sides in time and speed. It provides backpressure handling (if consumers are slow, messages queue up instead of crashing the producer), retry semantics, and durable delivery. Examples: RabbitMQ (AMQP, push-based), SQS (managed, at-least-once), Kafka (log-based, replayable). Use queues when you want to smooth traffic spikes, offload long-running tasks to background workers, or guarantee processing of every event.
In point-to-point, a message is delivered to exactly one consumer from a queue — typical for task distribution (SQS, RabbitMQ default queues). In pub/sub, a message published to a topic is delivered to all active subscribers — typical for event broadcast (Kafka topics, SNS, Redis Pub/Sub). Kafka blends both: topics fan-out to multiple consumer groups (pub/sub), but within each group only one consumer receives each partition's message (point-to-point). Choose point-to-point for workload distribution; choose pub/sub for event-driven pipelines where multiple downstream systems need the same event.
A microservice is a small, independently deployable service that owns a single business capability and its data. It communicates with other services over a network (REST, gRPC, or events). Each service can be developed, deployed, and scaled independently. Examples: an Order Service, a Payment Service, a Notification Service. The defining principle is bounded context from Domain-Driven Design — each service has its own schema and does not share a database with another service.
A monolith is a single deployable unit with all modules in one process. Advantages: simpler deployment, no network overhead between modules, easier local development, ACID transactions across the whole domain. Disadvantages: long build/test cycles, tight coupling, risky large deployments, harder to scale individual hot-spots. Microservices offer independent scaling, independent deployment, and team autonomy, but introduce distributed systems complexity: network latency, partial failures, distributed tracing, eventual consistency, and operational overhead (service discovery, multiple CI/CD pipelines). Start with a monolith; extract services when a bottleneck or team boundary justifies the complexity.
An API gateway sits at the edge of a microservice architecture and handles cross-cutting concerns: request routing to the correct downstream service, SSL termination, authentication/authorization (JWT validation, API key checks), rate limiting, request/response transformation, and observability (logging, tracing). Examples: AWS API Gateway, Kong, Nginx, Traefik. Without a gateway, every client must know all service addresses and every service must re-implement auth and rate limiting — the gateway centralizes that logic.
Rate limiting caps how many requests a client can make in a time window to prevent abuse, protect downstream services from overload, and enforce fair usage. Without it, a single misbehaving client (or DDoS) can exhaust your server capacity and degrade service for everyone. Common algorithms: fixed window counter (simple, suffers boundary bursting), sliding window, token bucket (smooth bursts), and leaky bucket (enforces constant rate). Implementation typically happens at the API gateway or load balancer and relies on a shared store like Redis to track counters across multiple app server instances.
A reverse proxy sits in front of backend servers and forwards incoming client requests to them. Clients talk only to the proxy and never directly to backend nodes — this hides topology, enables SSL offload, caching, compression, and load balancing. Nginx and HAProxy are classic reverse proxies. Cloudflare acts as a reverse proxy at the network edge. The key distinction: a reverse proxy represents the server side; a forward proxy represents the client side.
A forward proxy acts on behalf of clients, forwarding their requests to external servers. Clients configure the proxy explicitly (or it is intercepted transparently). Uses: corporate firewalls that filter employee internet traffic, anonymizing proxies (like Tor exit nodes), and caching proxies that reduce repeated fetches of the same external resource. The key distinction from a reverse proxy: a forward proxy protects and serves clients; a reverse proxy protects and serves servers.
Sharding is a form of horizontal partitioning where data is split across multiple database instances (shards) based on a shard key. Each shard holds a subset of rows. For example, users 0–9M on shard 1, 9M–18M on shard 2. Benefits: each shard handles a fraction of reads/writes, and storage scales linearly. Challenges: cross-shard queries (joins, aggregations) are expensive, shard rebalancing is painful, and you lose cross-shard ACID transactions. Consistent hashing minimizes data movement when adding/removing shards.
Replication keeps copies of data on multiple nodes. In single-leader (primary-replica) replication, all writes go to the primary, which streams a replication log to replicas. Replicas serve read traffic. In multi-leader (active-active) replication, multiple nodes accept writes — useful for multi-region setups but requires conflict resolution. Leaderless (Dynamo-style) replication lets any node accept writes using quorum rules. Replication provides fault tolerance (a replica can be promoted if the primary fails) and read scaling (more replicas → more read capacity).
A single point of failure is any component whose failure causes the entire system to stop working. Examples: a single DNS server, a non-replicated primary database, a monolithic load balancer with no standby. Eliminate SPOFs by introducing redundancy at every layer: multiple load balancers in active-passive or active-active pairs, database primary + replicas, multi-AZ deployments, and health checks with automatic failover. Chaos engineering (Netflix Chaos Monkey) deliberately kills random instances to find hidden SPOFs before they cause production outages.
A heartbeat is a periodic signal sent by a node to indicate it is still alive and healthy. In a cluster, nodes send heartbeats to a coordinator (like Zookeeper) or to peer nodes at regular intervals (e.g., every 1–2 seconds). If a node misses N consecutive heartbeats (e.g., 3), it is declared dead and the system triggers failover — promoting a replica to primary or reassigning partitions. Heartbeats are the foundation of failure detection in Kafka brokers, Kubernetes pods, and Cassandra gossip protocols.
A health check endpoint (typically GET /health or /healthz) returns the current status of a service — usually HTTP 200 if healthy, 503 if degraded. Load balancers and orchestrators (Kubernetes liveness/readiness probes) poll this endpoint every few seconds. If a node fails its health check, traffic is routed away from it automatically. A liveness probe determines if the container should be restarted; a readiness probe determines if it should receive traffic. Health checks should verify all critical dependencies (DB connectivity, cache reachability) and return fast (under 200ms).
Synchronous communication means the caller blocks and waits for the response before continuing — e.g., a REST HTTP call or a gRPC unary RPC. It is simpler to reason about but couples caller and callee in time: if the callee is slow or down, the caller stalls or errors. Asynchronous communication means the caller sends a message (to a queue like Kafka or SQS) and continues without waiting; a separate consumer processes the message later. Async decouples services in time and allows independent scaling and failure isolation, but adds complexity around eventual consistency and observability.
A webhook is an HTTP callback: when an event occurs in system A (e.g., a payment completes in Stripe), system A sends a POST request to a URL registered by system B, delivering a JSON payload describing the event. This is push-based vs. polling-based integration. Webhooks are efficient because system B does not need to repeatedly ask "did anything happen?" They require system B to expose a public HTTPS endpoint, handle retries (Stripe retries failed deliveries with exponential backoff), verify signatures (HMAC on the payload), and respond quickly with HTTP 200 — offloading heavy processing to a background queue.
Latency is the time a single request takes from initiation to completion (e.g., p50 = 20ms, p99 = 400ms). Throughput is the number of requests a system handles per unit of time (e.g., 50,000 QPS). They are related but not the same: a system can have high throughput with acceptable latency under normal load but spike latency at high load due to queuing (Little's Law: L = λW). Optimizing one can hurt the other — batching increases throughput but adds latency. System design interviews expect you to state both targets in the requirements phase.
Functional requirements: shorten a long URL to a 6–8 character code; redirect the short URL to the original. Non-functional: low read latency (< 20ms), high availability, 100M new URLs/day, 10B redirects/day. Schema: table `urls(short_code VARCHAR(8) PK, long_url TEXT, user_id, created_at, expires_at)`. Hash the long URL with MD5 and take the first 6 Base62 characters — ~56 billion possible codes. On collision (rare), increment a counter or append a character. Redirect performance: cache `short_code → long_url` in Redis with a 24-hour TTL; the app server hits Redis first, falls back to the DB only on a miss. Use a 301 (permanent, browser-cached) redirect for normal links and 302 (temporary) for analytics tracking. CDN at the edge can cache 301s to reduce origin hits further. For custom slugs, store them separately and check for conflicts.
Token bucket: a bucket refills at R tokens/second up to capacity B. Each request consumes one token; requests are rejected if the bucket is empty. Allows short bursts up to B. Leaky bucket: requests enter a fixed-capacity queue and are processed at a constant rate R — it smooths bursty traffic but can drop requests if the queue fills. Sliding window log: store a timestamp for every request in the past window (e.g., 1 minute) in Redis sorted sets; reject if count exceeds limit. Accurate but memory-intensive (O(requests) per user). Sliding window counter: a hybrid that approximates the sliding window using two fixed-window buckets weighted by how far into the current window you are — far more memory-efficient. In production, Redis + Lua scripts execute the check-and-decrement atomically across distributed app servers. The `INCR` + `EXPIRE` pattern in Redis implements fixed-window; `ZADD` + `ZREMRANGEBYSCORE` implements log-based sliding window.
Start with requirements: GET(key) and PUT(key, value), targets of 10ms p99, millions of keys, multi-node deployment. Use consistent hashing to map keys to nodes so adding/removing nodes rebalances only K/N keys. Replicate each key to R replicas for fault tolerance. For reads, use a quorum: wait for R of N replicas to respond (where R + W > N ensures overlap). Writes propagate to W replicas synchronously; the rest catch up asynchronously. Use a write-ahead log (WAL) on each node for durability — on crash, replay the log. Add a gossip protocol (like Cassandra's) for ring membership. Vector clocks detect write conflicts across replicas. Bloom filters on each node avoid disk reads for missing keys. This is essentially the Dynamo architecture.
Consistent hashing maps both cache nodes and keys onto a ring of 2^32 positions using a hash function. A key is assigned to the first node clockwise from its position on the ring. When a node is added or removed, only K/N keys need to be remapped (K = total keys, N = nodes) rather than all keys — critical for cache coherence and minimal rebalancing in sharded databases. Virtual nodes (vnodes) assign each physical node multiple positions on the ring, smoothing out uneven key distribution that occurs with few nodes. Kafka partition-to-broker assignment, Cassandra data distribution, and Redis Cluster all use variants of consistent hashing.
Read replicas are copies of the primary database that receive a streamed replication log (binlog in MySQL, WAL in PostgreSQL) and apply changes asynchronously. Application code directs all writes to the primary and reads to one or more replicas. This scales read throughput linearly with the number of replicas and offloads reporting/analytics queries from the primary. The caveat is replication lag: replicas may be seconds behind the primary. Read-after-write consistency requires routing the immediate read after a write back to the primary or waiting for replica sync. AWS RDS/Aurora supports up to 15 read replicas; PostgreSQL with Patroni enables automatic failover of the primary.
A write-ahead log is an append-only file on disk where every change is recorded before it is applied to the main data structure (B-tree pages, heap files). On crash, the DB replays the WAL from the last checkpoint to reconstruct any unflushed changes — guaranteeing that committed transactions are durable. PostgreSQL's WAL is also the mechanism for streaming replication: standby servers receive WAL segments from the primary and apply them in order. Kafka uses a similar append-only log as its core data structure, enabling replays and long retention. The WAL pattern separates durability (sequential write to log) from performance (random-access data structure updates).
When the primary fails, a new leader must be elected from the remaining replicas. In database systems like PostgreSQL with Patroni, Etcd or Zookeeper acts as the distributed coordination service. Each node attempts to acquire an ephemeral lease (a Zookeeper ephemeral znode or an Etcd lease with a TTL). The node that acquires the lease becomes the new leader and starts accepting writes; others become followers and consume from the new leader. The critical problem is split-brain: if the old leader is slow (not dead), two nodes may believe they are leader. Epoch fencing (a monotonically increasing epoch number) and STONITH (Shoot The Other Node In The Head) prevent conflicting writes by rejecting requests with stale epochs.
Raft is a consensus algorithm designed to be understandable (unlike Paxos). A cluster of N nodes elects one leader via randomized election timeouts — the first node that times out requests votes, wins a majority, and becomes leader. The leader handles all client writes by appending entries to its log and replicating them to a majority of followers before committing. Log entries are committed once acknowledged by (N/2 + 1) nodes. If the leader fails, a new election is triggered. Raft guarantees that the new leader always has the most up-to-date committed log. Used by Etcd (Kubernetes' backing store), CockroachDB, and TiKV.
Two-phase commit coordinates a distributed transaction across multiple participants. Phase 1 (prepare): the coordinator sends PREPARE to all participants; each writes the transaction to its WAL and responds YES or NO. Phase 2 (commit): if all replied YES, the coordinator sends COMMIT to all; otherwise it sends ABORT. 2PC guarantees atomic commits across nodes but has significant failure modes: if the coordinator crashes between phases, participants are left in a blocking "prepared" state waiting indefinitely — this is the protocol's fundamental weakness. Three-phase commit (3PC) adds a pre-commit phase to reduce blocking, but network partitions can still cause uncertainty. In practice, systems use Sagas or idempotent retries rather than 2PC for long-lived distributed transactions.
A Saga is a sequence of local transactions, each publishing an event or message that triggers the next step. If any step fails, the Saga executes compensating transactions to undo prior steps (e.g., a refund transaction compensates a charge). Two coordination styles: choreography (each service reacts to events from the previous service — loose coupling, hard to visualize) and orchestration (a central Saga orchestrator sends commands and listens for replies — easier to trace but a coordination bottleneck). Sagas do not provide isolation — other transactions can see intermediate states. The trade-off vs. 2PC: Sagas are non-blocking and resilient but require compensation logic and only provide eventual consistency.
In CRUD, you store only the current state of an entity and overwrite it on every update. In event sourcing, you store an immutable, append-only log of every state-change event (e.g., OrderPlaced, OrderShipped, OrderCancelled). Current state is derived by replaying events from the beginning or from the last snapshot. Benefits: complete audit trail, ability to replay events to rebuild projections, debugging by replaying to any point in time, and natural event publishing for downstream consumers. Drawbacks: querying current state requires building read models (projections), events are immutable (corrections require compensating events), and the event log grows forever. Event sourcing pairs naturally with CQRS.
Command Query Responsibility Segregation (CQRS) separates the write model (commands that mutate state) from the read model (queries that return data). Commands go to a write store optimized for transactional integrity (e.g., PostgreSQL with normalized schema); queries go to a read store optimized for fast reads (e.g., Elasticsearch, Redis, denormalized PostgreSQL views). An event bus (Kafka) propagates changes from the write side to update the read side asynchronously. Benefits: read and write stores can be scaled, indexed, and optimized independently. Cost: eventual consistency between write and read models, more moving parts, and projection rebuilding when the read model schema changes.
Opening a new database connection is expensive: TCP handshake, TLS negotiation, PostgreSQL fork of a backend process (~5–10ms, ~5 MB RAM each). A connection pool maintains a fixed set of pre-opened connections (e.g., 20 per app server) and reuses them across requests. PgBouncer is a popular external PostgreSQL pooler that multiplexes thousands of client connections into a small pool of server connections — critical when running 50+ app server pods each wanting 20 direct connections. Without pooling, 100 pods × 20 connections = 2,000 backend processes overwhelming the DB. Max connections in PostgreSQL default to 100; production deployments set it to 200–500 with PgBouncer in transaction mode.
The N+1 problem occurs when code fetches a list of N parent records and then issues 1 additional query per record to fetch related data — resulting in N+1 round trips to the DB. For example: fetch 100 posts (1 query), then for each post fetch its author (100 queries) = 101 total. Fixes: eager loading with SQL JOINs or ORM include (e.g., Sequelize `include`, ActiveRecord `includes`); batching with DataLoader (groups multiple lookups into a single IN query per tick); or pre-joining data into a denormalized read model. At scale the N+1 pattern can take a page load from 10ms to 10 seconds and cause cascading DB overload under traffic spikes.
An idempotency key is a unique client-generated UUID sent in the request header (e.g., `Idempotency-Key: uuid`). The server stores the key and the result of the first execution. On duplicate requests (retries after network errors), the server detects the existing key and returns the stored result instead of reprocessing — preventing double charges or duplicate fulfillments. Stripe, PayPal, and Braintree all implement this. On the server side: store `(idempotency_key, response_body, expires_at)` in a Redis hash or Postgres table. Lock on the key before processing to prevent concurrent duplicate requests racing. Keys typically expire after 24 hours.
The core challenge is fan-out: one event (e.g., "new follower") may need to notify millions of users. Decouple notification generation from delivery: producers publish notification events to a Kafka topic. A fan-out service reads events and resolves recipient lists (single user for DMs, all followers for broadcast). Fan-out-on-write pre-computes and queues delivery tasks per user at event time — fast reads but expensive for celebrities with millions of followers. Fan-out-on-read computes the recipient list at read time — cheaper writes but slow reads. Hybrid: fan-out-on-write for normal users (<10K followers), fan-out-on-read for celebrities. Separate delivery queues per channel (email via Resend/SendGrid, SMS via Twilio, push via APNs/FCM) with per-channel retry logic and rate limiting.
Two main strategies: push (fan-out-on-write) and pull (fan-out-on-read). Push: when a user tweets, pre-populate the tweet ID into each follower's feed cache (a Redis sorted set keyed by `feed:{user_id}`, scored by timestamp). Feed read is O(1) — just fetch from Redis. Push cost: a celebrity with 50M followers requires 50M Redis writes per tweet. Pull: to render a timeline, fetch a list of followed users, then merge their N most recent tweets from DB/cache. Correct but slow for users following thousands of people. Twitter uses hybrid: push for non-celebrities, pull for accounts with >1M followers. Tweets are stored in a separate store (Manhattan at Twitter); the feed contains only tweet IDs, and a hydration layer fetches full tweet objects in batch.
Requirements: sub-100ms latency for prefix suggestions, top-K results by frequency/relevance. Architecture: offline aggregation job (Spark or Flink) computes top-K queries per prefix daily and stores them in a prefix trie or inverted index. Serve from Redis: store each prefix as a key mapping to a sorted set of (suggestion, score) — ZREVRANGEBYSCORE returns top-K in O(log N + K). For freshness, apply a small real-time update stream on top of the daily batch. Limit the prefix table to prefixes of length 1–10 characters to bound storage. Cache at the CDN edge for the 1-character and 2-character prefixes (hottest keys). Debounce client-side requests to fire only after 150ms of inactivity.
Encode each user's location as a Geohash (a Base32 string where common prefixes indicate proximity) or an S2 cell. Store `user_id → geohash` in Redis using the GEO commands (GEOADD, GEODIST, GEORADIUS / GEOSEARCH). GEORADIUS returns users within a radius in O(N+log M) time. For large-scale platforms: shard the Redis cluster by city/region to keep each shard small. Location updates: clients send GPS updates every 30 seconds to a location service that writes to Redis (volatile, TTL-expired after 10 minutes of inactivity) and optionally logs to a time-series store. WebSocket or SSE pushes real-time position updates to friends viewing a map. Uber uses a combination of Geohash + quadtrees and H3 (Uber's hexagonal grid) for driver matching.
A Bloom filter is a space-efficient probabilistic data structure that answers "is this item possibly in the set?" with no false negatives but a controllable false positive rate. It uses a bit array of M bits and K independent hash functions. To add an item, set the K bit positions produced by the hash functions; to query, check if all K positions are set. Bloom filters never return false negatives, so a negative answer is definitive. In system design, they prevent unnecessary DB reads: before fetching a URL from disk, check the Bloom filter — if absent, skip the read. Google Bigtable uses a per-SSTable Bloom filter to avoid disk seeks for keys not in that SSTable. Cassandra uses them to skip SSTables during reads.
HyperLogLog (HLL) is a probabilistic cardinality estimation algorithm that can count the number of distinct elements in a stream using only ~1.5KB of memory with ~0.81% error, regardless of how many billions of unique items have been seen. It works by hashing each item and observing the maximum number of leading zeros in the hash — a high maximum implies many distinct items. Redis implements PFADD and PFCOUNT natively. Use cases: counting daily active users (DAU), unique visitors per page, unique queries per day — any cardinality problem where exact counts are not required and memory is constrained. For exact counts you would need to store every distinct value, which is O(N) memory.
Time-series data (metrics, events, sensor readings) is append-only, queried by time range, and often aggregated. Naive SQL tables work at small scale but suffer from large row counts and slow range scans. Dedicated time-series databases partition by time: InfluxDB stores data in shards by time window and drops old shards atomically. TimescaleDB extends PostgreSQL with automatic time-based partitioning (hypertables). Cassandra excels at time-series with a wide-column model: row key = `(metric_name, day)`, column names = timestamps — rows grow to the right. Prometheus scrapes metrics into a local TSDB and uses remote write to Thanos/Mimir for long-term storage. Key optimizations: pre-compute roll-ups (1-min, 1-hour, 1-day aggregates), use delta encoding and compression (Gorilla float encoding achieves ~1.37 bits/sample).
The circuit breaker pattern prevents cascading failures by short-circuiting calls to a failing downstream service instead of letting threads pile up waiting for timeouts. States: Closed (calls pass through normally, failures tracked); Open (calls immediately fail-fast without reaching the downstream service, after a threshold of failures in a window); Half-Open (a small probe of requests is allowed through to test recovery — if they succeed, circuit closes; if they fail, it reopens). Libraries: Resilience4j (Java), Polly (.NET), pybreaker (Python). Netflix Hystrix popularized the pattern (now in maintenance; Resilience4j is the successor). Without circuit breakers, a 5-second timeout × 100 concurrent threads = 500 thread-seconds of blocking that can exhaust a thread pool.
The bulkhead pattern isolates different parts of a system into resource pools (like watertight compartments in a ship) so that failures in one pool do not starve others. In practice: use separate thread pools or connection pools for different downstream services — if the payment service is slow and exhausts its thread pool, the catalog service's separate pool is unaffected. In Kubernetes, this maps to resource limits per pod and separate node pools per workload. Hystrix implements bulkheads via semaphore or thread-pool isolation per command group. The pattern is complementary to circuit breakers: the circuit breaker stops sending requests; the bulkhead limits how many can queue up.
Naïve retries immediately re-send failed requests and can cause a thundering herd: if 1,000 clients all retry at the same second after a brief outage, they simultaneously overwhelm the recovering server. Exponential backoff spaces retries geometrically: wait = base * 2^attempt (e.g., 100ms, 200ms, 400ms, 800ms). Jitter adds randomness — full jitter: wait = random(0, base * 2^attempt) — to desynchronize retries across clients. AWS SDK uses full jitter by default. Apply a maximum cap (e.g., 30 seconds) and a maximum retry count (e.g., 5). Use idempotency keys on retried requests to prevent duplicate side effects. Only retry on transient errors (429, 503, network timeouts) — not on 400/401/404 which will never succeed.
Requirements: schedule jobs (cron or one-off), guarantee at-least-once execution, support millions of jobs, and tolerate node failures. Core components: a persistent job store (PostgreSQL or Redis sorted set, keyed by next-run timestamp), a scheduler daemon that polls for due jobs, and a worker pool that executes them. To avoid multiple workers picking up the same job, use SELECT FOR UPDATE SKIP LOCKED (PostgreSQL) or Redis SET NX with a short TTL (optimistic lock). Distribute work across workers via a task queue (Celery + Redis, Sidekiq + Redis, or BullMQ). For cron jobs, compute the next-run timestamp from the cron expression (using a library like croniter) and re-enqueue after execution. Failure detection: if a worker crashes mid-job, a heartbeat timeout re-enqueues the job for another worker.
Kafka is a distributed, append-only commit log that acts as a high-throughput, fault-tolerant message broker and event stream. Data is organized into topics, each split into partitions stored durably on disk. Producers append to the end of a partition; consumers read at their own pace using an offset pointer. A consumer group is a logical subscriber: Kafka assigns each partition to exactly one consumer in the group, enabling parallel processing while guaranteeing ordered delivery within a partition. Add consumers to scale throughput (up to the number of partitions). Multiple independent consumer groups can each read the same topic from offset 0 — e.g., an analytics pipeline and a fraud detection service both consume the same payment events. Kafka retains messages by time or size, enabling replay.
Exactly-once means every message is processed exactly one time — neither dropped (at-most-once) nor duplicated (at-least-once). It is the hardest guarantee to achieve in distributed systems. Kafka achieves producer exactly-once via idempotent producers (each message has a sequence number; duplicates are detected and discarded by the broker) and transactional APIs (atomic writes across multiple partitions). End-to-end exactly-once (Kafka + consumer) requires the consumer to commit the Kafka offset and apply the side effect atomically — achievable with Kafka Streams' built-in transactional processing but hard in custom consumers. In practice, teams often implement at-least-once delivery and make consumers idempotent (detect and discard duplicate processing) rather than rely on exactly-once broker guarantees.
REST over HTTP/1.1 with JSON is human-readable, universally supported, and trivially debuggable with curl — ideal for public APIs, browser clients, and external integrations. gRPC uses HTTP/2 (multiplexing, header compression), Protocol Buffers (binary serialization ~5–10× smaller and faster than JSON), and generates strongly-typed client/server stubs from .proto files. For internal service-to-service calls, gRPC wins on throughput (lower CPU for serialization), latency (binary + HTTP/2), and contract enforcement (schema-first with backward-compatible evolution). gRPC supports streaming (unary, server-streaming, client-streaming, bidirectional). Drawbacks: harder to debug without tooling, less browser-native support (gRPC-Web bridges required), and steeper learning curve. Google, Netflix, and Uber use gRPC extensively for internal RPCs.
Cache invalidation across data centers is a well-known hard problem. Strategies: TTL-based invalidation accepts temporary staleness — set short TTLs (seconds to minutes) and let caches expire naturally; simple but data can be stale for the full TTL duration. Event-driven invalidation: when a write occurs, publish an invalidation event to Kafka; each DC's cache consumer deletes the affected key — near-real-time but adds complexity. Write-through with regional replication: the write path updates the cache in all DCs synchronously or via async replication. Facebook's Memcache paper describes "lease" tokens to handle thundering herds on invalidation: the first cache miss gets a lease token, others wait — preventing stampedes when a popular key expires. Redis Enterprise's Active-Active geo-distribution uses CRDTs to resolve conflicts.
The dual-write problem: when a service must both update a DB row and publish an event to Kafka, it cannot do both atomically with a standard 2PC across DB and broker. If the DB write succeeds but Kafka write fails (or vice versa), you have inconsistency. The outbox pattern solves this: instead of writing to Kafka directly, the service writes the event into an `outbox` table in the same DB transaction as the business entity update. A separate "relay" process (Debezium CDC connector, Transactional Outbox Relay) reads new outbox rows and publishes them to Kafka, then marks them as published. Because the relay reads from the DB, the event is guaranteed to be published exactly once after the DB transaction commits — no dual-write risk.
Sharding splits data across N database instances using a shard key (e.g., user_id mod N). Queries that include the shard key are routed to a single shard — fast. Queries without the shard key must scatter-gather across all shards and merge results — expensive and slow. Secondary index lookups across shards require a global secondary index (a separate lookup table mapping the indexed value to the shard) or a full scatter-gather. Avoid shard keys that create hot spots: using user_id for a social network where 1% of users generate 90% of writes overloads a single shard. Prefer time-based or hash-based shard keys for even distribution. Resharding (splitting a shard) is painful — plan shard key cardinality (number of possible values) to be much larger than your maximum shard count.
A traditional message queue (RabbitMQ, SQS) delivers each message to exactly one consumer (per queue) and deletes it after acknowledgement. It is designed for task distribution: each job is processed once. An event stream (Kafka) is an immutable, retained log: messages (events) are not deleted after consumption and can be replayed. Multiple independent consumer groups each read from offset 0. This makes Kafka ideal for event-driven architectures where multiple systems need the same event stream (analytics, fraud detection, notifications), and for event sourcing where replaying history is required. The trade-off: queues are simpler for simple task workers; Kafka is more powerful but operationally heavier and requires managing consumer offsets and lag.
A distributed cache (Redis Cluster, Memcached) partitions keys across N shards using consistent hashing. Each shard stores data in memory with a configured maxmemory limit. When memory is full, Redis applies an eviction policy: noeviction (returns error on new writes), allkeys-lru (evict the least recently used key across all keys), volatile-lru (LRU only among keys with TTL set), allkeys-lfu (Least Frequently Used — better than LRU for skewed access), allkeys-random. LRU is the most common general policy; LFU is better when a few hot keys dominate. Redis approximates LRU with a sample of 5–10 random keys (configurable via maxmemory-samples) rather than a true LRU list, trading accuracy for performance. Monitor cache eviction rate and hit rate in Redis INFO stats continuously.
JSON Web Tokens (JWTs) encode user identity and claims as a Base64url-encoded JSON payload, signed with HMAC-SHA256 (symmetric) or RSA/ECDSA (asymmetric). The server issues a short-lived access token (15 minutes) and a longer-lived refresh token (7 days). The client stores the access token in memory (never in localStorage — XSS risk) and the refresh token in an HttpOnly, Secure cookie (CSRF-protected). Every API request carries the access token in the Authorization header; the server verifies the signature and expiry locally with no DB lookup — this is the stateless benefit. On expiry, the client calls /auth/refresh with the cookie; the server rotates the refresh token (invalidating the old one) and issues a new access token. Token revocation before expiry requires a blocklist (Redis set of revoked JTIs) — adding stateful lookup back into the equation. For logout, delete the refresh token from the DB and blocklist the access token JTI.
A service mesh is an infrastructure layer that handles all inter-service communication via a sidecar proxy (Envoy) injected alongside each service pod. The data plane (Envoy proxies) handles mTLS encryption, retries, circuit breaking, load balancing, and distributed tracing header injection — transparently, without application code changes. The control plane (Istio istiod, Linkerd control plane) pushes routing rules, certificates, and policies to each Envoy sidecar via xDS APIs. Key capabilities: automatic mTLS between all services (zero-trust internal network), fine-grained traffic splitting (canary deployments by routing 5% of traffic to new version), observability (automatic golden signals: latency, error rate, throughput per route), and policy enforcement (rate limits, authorization policies). Trade-offs: sidecar proxies add ~1–3ms latency per hop and ~50MB RAM per pod; at 1,000 pods that is 50 GB of overhead. For simpler needs, consider a library-based approach (Resilience4j) instead.
In Content-Addressable Storage, objects are stored and retrieved by the cryptographic hash of their content (e.g., SHA-256) rather than by an assigned name or location. If the content changes, the hash changes — meaning you automatically detect tampering. If the same content is uploaded twice, it maps to the same hash — enabling automatic deduplication. Git uses CAS for all objects (blobs, trees, commits) in .git/objects. Docker image layers are CAS-addressed in a registry. Venti (Plan 9), Tahoe-LAFS, and IPFS are distributed CAS systems. S3 can approximate CAS by using the SHA-256 hash as the object key. The key property: CAS objects are immutable by definition — you never "update" an object, you create a new one with a new hash and update the pointer. This simplifies caching (an object at a given hash never changes) and enables infinite CDN cache TTLs.
The expand/contract pattern (also called parallel change) is the standard approach. Phase 1 — Expand: add new columns, tables, or indexes as optional/nullable while keeping old schema intact; deploy the new application code that writes to both old and new columns. Phase 2 — Migrate: run a background job to backfill existing rows to populate new columns (in batches with LIMIT/OFFSET to avoid locking). Phase 3 — Contract: once backfill is complete and the new code is stable, drop old columns in a separate migration. Critical rules: never add a NOT NULL column without a DEFAULT in a single step (it locks the table for a full scan in PostgreSQL < 11); use CONCURRENT index creation (`CREATE INDEX CONCURRENTLY`) to avoid locking reads. Tools: Flyway and Liquibase version migrations; gh-ost (GitHub) replicates MySQL tables online for large schema changes. Always test rollback: can the old code still run against the new schema during a canary deploy?
In a dynamic container environment (Kubernetes, ECS), service instances come and go — IPs change with every restart. Service discovery solves the problem of how services find each other. Client-side discovery: the service queries a service registry (Consul, Eureka, etcd) and selects an instance itself, applying client-side load balancing (Ribbon in Spring). Server-side discovery: the client queries a load balancer (AWS ELB, Kubernetes ClusterIP) that queries the registry internally. Kubernetes uses DNS-based service discovery: a Service object gets a stable DNS name (my-service.my-namespace.svc.cluster.local) pointing to a virtual IP; kube-proxy rewrites packets to one of the healthy pod IPs using iptables/IPVS rules. Consul supports health check integration and multi-datacenter federation. Zookeeper provides a distributed key-value store for service registration but is heavier than Consul for pure service discovery.
Backpressure is a mechanism for a slow consumer to signal a fast producer to slow down, preventing unbounded memory growth and cascading failures. In reactive streams (Project Reactor, RxJava, Akka Streams), backpressure is explicit: consumers request N items, and producers emit at most N before waiting for the next request. In a Kafka consumer, backpressure is implicit: the consumer processes at its own pace and the broker retains unread messages — the consumer never overwhelms itself, only its consumer lag grows. In HTTP services, backpressure manifests as queuing: Nginx limits the upstream queue size (proxy_max_temp_file_size, limit_req_zone); when the queue is full, new requests receive 429 or 503. Design strategies: bounded queues (reject when full — fail fast), load shedding (drop lowest-priority requests under load), throttling (slow down producers via feedback), and elastic scaling (add consumers). The key is to propagate backpressure up the call chain rather than silently dropping or buffering unboundedly.
Scale targets: 500M users, 100M tweets/day (~1,200 writes/s), 300B timeline reads/day (~3.5M reads/s). Write path: user → API server → tweet service → write to Tweet DB (sharded by tweet_id using Snowflake IDs) → publish TweetCreated event to Kafka. Fan-out service consumes TweetCreated: for each follower, insert tweet_id into the follower's timeline cache (Redis sorted set `timeline:{user_id}` scored by time). Timeline read: fetch top-100 tweet IDs from Redis → hydrate full tweet objects in a single mget → merge with profile data. Celebrity problem: accounts with 50M+ followers would require 50M Redis writes per tweet — too slow and expensive. Solution: hybrid fan-out. For users with fewer than 10K followers, push tweet_id into each follower's timeline at write time. For celebrities, skip push; instead, when a user opens their timeline, inject celebrity tweets by querying the celebrity's tweet list and merging in memory. This is "lazy celebrity injection" — Twitter (Finagle, Manhattan, FlockDB) implemented exactly this.
Upload pipeline: client uploads raw video to a pre-signed S3 URL via resumable upload (chunks of 5–10 MB). On completion, an UploadCompleted event triggers the transcoding pipeline. Transcoding: a job coordinator (SQS + Lambda or Kubernetes batch jobs) splits the video into segments (HLS chunks), spawns parallel FFmpeg workers that transcode each segment into multiple resolutions (360p, 720p, 1080p, 4K) and bitrates, then re-assembles into adaptive bitrate (ABR) manifests (HLS .m3u8 or DASH .mpd). Store transcoded segments in S3. CDN delivery: CloudFront (or Akamai) caches segments at edge PoPs globally; the player (video.js, Shaka Player) uses ABR to select the highest bitrate that fits the current network speed. Metadata (title, description, duration, thumbnail) stored in PostgreSQL; video search uses Elasticsearch. Thumbnail generation: FFmpeg extracts frames at intervals; a separate job scores frames for aesthetic quality and selects the best three as thumbnail options. View counts use Redis HyperLogLog increments batched to the DB every 60 seconds.
Core challenge: efficient sync of large files with minimal bandwidth. File chunking: split files into content-defined chunks (Rabin fingerprinting) of ~4 MB average. Content-defined chunking (CDC) means identical content in different files produces identical chunks — enabling cross-user deduplication. Storage: each chunk is stored in object storage (S3) keyed by its SHA-256 hash. A `blocks` table maps `hash → S3_key`. A `file_versions` table maps `(user_id, file_path, version) → ordered list of chunk hashes`. Deduplication: before uploading a chunk, the client sends only the hash; if the server already has it, skip the upload — similar to rsync block hashing. Sync protocol: client watches the filesystem with inotify/FSEvents, computes changed chunks, calls `/api/files/diff` with current chunk hashes, uploads only missing chunks, then atomically updates the file version record. Conflict resolution: store both versions (like Dropbox) or use CRDTs for text files. WebSocket/SSE notifies other clients of changes. Metadata in PostgreSQL; blobs in S3 with versioning.
The challenge: rate limit a user across 50 app servers sharing no local state. Redis is the shared state store. Sliding window counter in Redis: use a sorted set per user — key `rl:{user_id}`, members are request timestamps (epoch ms), score = timestamp. On each request: ZADD the current timestamp, ZREMRANGEBYSCORE to remove entries older than the window (e.g., 60 seconds ago), ZCARD to count remaining entries. If count > limit, reject. Execute all three commands atomically using a Lua script (`EVAL`) to prevent TOCTOU races between check and increment. This gives O(log N) per request. For very high traffic, use a Redis Cluster and hash `{user_id}` to co-locate all keys for a user on one shard (hash tags: `rl:{user_id}`). For 1M users each with 100 req/min, the sorted sets stay small (~100 members) and Redis handles 1M+ ops/s. Add a local in-memory token bucket cache per app server for the first layer, falling back to Redis — this reduces Redis load by ~90% for popular clients.
A single-instance Redis lock uses SET key value NX PX ttl — the NX flag ensures only one setter wins; PX sets an expiry so the lock is released on crash. Insufficient for high availability: if the Redis node fails, the lock is lost. Redlock (proposed by Salvatore Sanfilippo) uses N ≥ 5 independent Redis masters. The client acquires the lock by attempting SET NX PX on all N nodes simultaneously, measuring elapsed time. The lock is considered acquired if (a) a majority (N/2+1) of nodes accepted it and (b) the elapsed time is less than the lock TTL. Release: delete the key on all nodes using a Lua script to ensure atomicity. Controversy (Martin Kleppmann critique): if a process pauses (GC, page fault) after acquiring the lock and before using it, the TTL expires, another client acquires the lock, and the two clients operate concurrently — Redlock does not provide safety guarantees under arbitrary process pauses. Fencing tokens (monotonically increasing version numbers checked by the resource) provide safety that Redlock alone cannot. For strong consistency, use Zookeeper ephemeral nodes or etcd leases which provide stronger guarantees.
Requirements: process payments globally, exactly-once semantics, support 150+ currencies, < 200ms p99. Idempotency: every payment request carries a client-generated idempotency key stored in PostgreSQL with the outcome. On duplicate submission, return the stored response. State machine: a payment record transitions through PENDING → AUTHORIZED → CAPTURED → SETTLED (or FAILED, REFUNDED) — only append new states, never mutate. Double-spend prevention: acquire a distributed lock (Redlock or DB row lock) on the payment source account before deducting; use database-level CHECK constraints (`balance >= 0`) to prevent overdraft races. Multi-currency: store amounts as integers in the smallest unit (cents for USD, pence for GBP) to avoid floating-point errors. Use ISO 4217 currency codes. FX conversion: integrate with an FX rate service (e.g., Open Exchange Rates) and cache rates for 60 seconds. Settlement: partner with acquiring banks via ISO 8583 or modern APIs (Stripe Connect). Multi-region: deploy in US, EU, APAC; use a global DB (CockroachDB, Spanner) for cross-region ACID transactions on payment records. Reconciliation job runs nightly comparing ledger entries against bank statements.
Core flows: rider requests ride → system finds nearby drivers → matches rider to optimal driver → tracks ride → charges on completion. Location indexing: drivers send GPS pings every 4 seconds to a location service that stores `driver_id → (lat, lng, timestamp)` in Redis GEO (underlying sorted set with Geohash scores) or a quadtree. Nearby driver query: GEORADIUS of 2km returns candidate drivers. Matching algorithm: score candidates by ETA (computed via routing graph, OSRM or Google Maps Distance Matrix) and driver rating; assign the rider to the lowest-ETA available driver. Lock driver as "assigned" atomically with SET NX in Redis to prevent double-assignment. ETA computation: pre-build a road graph; use Contraction Hierarchies (CH) for sub-millisecond shortest-path queries. Surge pricing: compute supply/demand ratio per geohex (H3 hexagon) every 30 seconds; apply a surge multiplier if demand > N × supply. Real-time tracking: driver app maintains a WebSocket connection; location updates streamed to rider via WebSocket (pub/sub on a Redis channel per trip). Trip data persisted to PostgreSQL partitioned by date.
Crawl → Index → Serve pipeline. Crawl: distributed crawler (Scrapy Cluster) fetches pages, respects robots.txt, deduplicates URLs via Bloom filter, stores raw HTML in object storage. Index building: NLP pipeline (tokenization, stemming, stop-word removal) builds an inverted index: `term → [(doc_id, tf, positions)]`. TF-IDF or BM25 scoring computes term importance. Index sharding: partition the index by term hash across N index shards. Serving: a query is parsed, terms are looked up in parallel across shards (scatter-gather), results are merged by score, then re-ranked using a learning-to-rank model (features: BM25 score, PageRank, freshness, click-through rate). Index shards are replicated for read scaling. Near-real-time indexing: changes flow through Kafka into incremental index updates using a mutable segment (similar to Lucene's NRT indexing) that merges periodically. Elasticsearch wraps Lucene and implements exactly this architecture — distributing shards across nodes, replicating, and exposing a REST query API.
Two paradigms: content-based filtering (recommend items similar to what a user liked) and collaborative filtering (recommend items liked by users with similar behavior). Matrix factorization (ALS, SVD) decomposes the user-item interaction matrix into user embeddings and item embeddings of dimension D (typically 64–256). The dot product of user and item vectors approximates the predicted rating. Batch pipeline: run ALS in Spark/MLflow weekly, store embeddings in a vector store (Faiss, Milvus, Pinecone). Serving: on request, fetch the user's embedding → run approximate nearest-neighbor (ANN) search (HNSW algorithm, < 5ms) to find top-K items → re-rank with a pointwise XGBoost model using context features (time of day, device, recency). Real-time signals: stream click/view events through Kafka into a feature store (Feast, Tecton), updated every few seconds to capture session context for real-time re-ranking. Cold-start problem: new users get popularity-based recommendations; new items are bootstrapped with content embeddings from their metadata.
Collection: agents (Prometheus exporters, StatsD, OpenTelemetry collectors) scrape or receive metrics and forward to an ingestion layer. Prometheus scrapes targets every 15 seconds via HTTP /metrics endpoints exposing plain text. At large scale, Prometheus remote_write streams to Thanos or Cortex (horizontally scalable Prometheus). Storage: Prometheus TSDB uses Gorilla-style delta-delta encoding and XOR compression achieving ~1.37 bits per data point. Thanos stores long-term data in S3 as block files (2-hour Prometheus TSDB blocks). Aggregation: Prometheus recording rules pre-compute expensive aggregations (e.g., 5-minute rate()) and store results as new metrics, reducing query time at read. Alerting pipeline: Prometheus AlertManager evaluates alert rules every 15 seconds; on firing, routes to PagerDuty/OpsGenie/Slack with deduplication and silencing. Dashboards: Grafana queries Thanos via PromQL. Key metrics to track: RED (Rate, Errors, Duration per service) and USE (Utilization, Saturation, Errors per resource).
Amazon Dynamo (2007 paper) designed for always-writeable, highly available storage with eventual consistency. Data placement: consistent hashing ring with 2^32 positions; each physical node is assigned 150–200 virtual nodes (vnodes) spread around the ring for even distribution. Replication: each key is replicated to the N clockwise nodes on the ring (preference list, skipping virtual nodes of the same physical node). Quorum: read quorum R + write quorum W > N ensures at least one overlapping node has the latest version. Typical: N=3, W=2, R=2. Write path: coordinator receives write, forwards to all N replicas; waits for W acknowledgements. Read path: coordinator fetches from R replicas; if versions differ, uses vector clocks to determine order or flags conflict for application-level resolution (siblings in Riak). Sloppy quorum + hinted handoff: if a target replica is down, write to the next healthy node which stores a hint and forwards the data when the original node recovers. Merkle trees: each replica maintains a Merkle tree of its key ranges so background anti-entropy processes can efficiently sync diverged replicas.
Bigtable (Google, 2006) is a sparse, sorted, multi-dimensional map indexed by (row key, column family:column qualifier, timestamp). Rows are sorted lexicographically by row key — choose the row key for access pattern (e.g., reverse-domain URL "com.google.www" to co-locate all pages from the same domain). Data is split into ~200MB tablets (shards) managed by tablet servers. Each tablet is an LSM-tree: writes go to a memtable (in-memory sorted tree), which is flushed to SSTables (sorted string tables) on disk, periodically compacted. Bloom filters on each SSTable avoid disk reads for absent keys. Column families are stored in separate files — project only the families you need. HBase is the open-source equivalent running on HDFS; it uses Zookeeper for master election and tablet server coordination. Primary use cases: web indexing (URL → page content), analytics (wide sparse rows per user), time-series (row key = reversed timestamp for sequential scans).
Spanner (Google, 2012) is a globally distributed, externally consistent relational database that supports SQL, ACID transactions, and horizontal sharding across datacenters. It achieves external consistency (stronger than serializable — effectively "real-time" ordering) using TrueTime: a GPS + atomic-clock-based API that returns a time interval [earliest, latest] with a bounded uncertainty (ε ≈ 1–7ms). For a commit timestamp, Spanner waits out the uncertainty interval ("commit wait") before making the transaction visible — guaranteeing any subsequent transaction starts after the commit. This eliminates the need for lock-based timestamp ordering across regions. Spanner uses Paxos consensus within each shard group for replication. Schema changes are versioned and rolled out slowly to avoid downtime. Cloud Spanner is the managed version; CockroachDB and YugabyteDB are open-source analogs using hybrid logical clocks (HLC) as a TrueTime approximation.
CockroachDB is a distributed, ACID-compliant SQL database inspired by Spanner but using hybrid logical clocks instead of TrueTime. Architecture: each row belongs to a range (~64 MB); each range is replicated via Raft to N nodes (default 3). The SQL layer parses queries, generates a distributed execution plan, and pushes computation to the nodes owning the relevant ranges (DistSQL). Transactions use a multi-version concurrency control (MVCC) model with optimistic locking: reads are timestamp-consistent snapshots; writes create new versions. Write-write conflicts detected at commit time retry automatically. Global transactions use a two-phase protocol with a centralized transaction record stored alongside the data it modifies. Range splits (when a range grows > 64 MB) and rebalancing (moving ranges to equalize load) are automatic. CockroachDB achieves read-your-own-write consistency and serializable isolation by default — strong guarantees at the cost of cross-range transaction latency (10–100ms for multi-region).
Vitess (YouTube's open-source project, graduated CNCF) is a sharding middleware for MySQL that sits between the application and MySQL instances. It presents a single MySQL-compatible endpoint while routing queries to the correct shard. Key components: VTGate (stateless query router, parses SQL, routes based on vindex/vschema), VTTablet (per-MySQL sidecar managing connection pooling, query rewriting, metrics), and a topology service (Zookeeper or etcd) for shard map metadata. Sharding is defined by a vindex (hash of a column) mapped to a keyspace range. Resharding (splitting a shard) is online: Vitess streams rows to the new shards while still serving traffic, then atomically cuts over. Vitess handles cross-shard queries with scatter-gather but discourages full-table scans. YouTube uses Vitess with thousands of MySQL instances; PlanetScale is the managed Vitess cloud product.
Cassandra's write path is optimized for high throughput. Write → Commit log (WAL on disk, sequential write for durability) → Memtable (in-memory sorted data structure per column family). When the memtable reaches a threshold (~32 MB), it is flushed to an immutable SSTable on disk. SSTables are never modified in place — Cassandra only appends. Over time, many SSTables accumulate, so compaction merges and deduplicates them: STCS (Size-Tiered Compaction) merges SSTables of similar size; LCS (Leveled Compaction) organizes SSTables into levels with fixed file sizes, bounding read amplification to one SSTable per level but requiring more I/O. Read path: check memtable → check bloom filter on each SSTable (skip if key absent) → check partition index → read row from SSTable data file. Tombstones mark deletes and are removed during compaction. Cassandra achieves 100K+ writes/s per node because every write is sequential (commit log + memtable), never random.
A Kafka topic is divided into P partitions, each an ordered, immutable sequence of records stored as a log file on disk (segment files of ~1 GB each). Each record has a monotonically increasing offset within its partition. Producers append to the tail; consumers read from any offset and commit their position. Consumer lag is the difference between the last produced offset and the last committed consumer offset — a lag growing over time signals the consumer cannot keep up. Replication: each partition has one leader and R-1 follower replicas on different brokers. Producers write to the leader; followers fetch and replicate. ISR (In-Sync Replicas) is the set of replicas caught up to within replica.lag.time.max.ms. Delivery semantics: at-most-once (auto-commit before processing), at-least-once (commit after processing — default, safe with idempotent consumers), exactly-once (idempotent producer + transactional API atomically writes to output topic and commits offsets). Kafka brokers are coordinated by ZooKeeper (pre-3.0) or KRaft (built-in Raft, Kafka 3.x).
Apache Spark uses a micro-batch model: it collects events for a short interval (50ms–1s), processes each batch as a distributed SQL/DataFrame job, and outputs results — providing near-real-time processing with high throughput and exactly-once semantics via checkpointing. It excels at unified batch + stream pipelines and is familiar to data engineers. Apache Flink is a true stream processor: events are processed one at a time with sub-10ms latency, genuine event-time windows, stateful operators backed by RocksDB with incremental checkpoints, and exactly-once guarantees via distributed snapshots (Chandy-Lamport algorithm). Choose Flink for low-latency fraud detection, real-time feature engineering for ML, CEP (complex event processing), or anything needing accurate event-time windowing (handling late arrivals). Choose Spark Structured Streaming for teams already using Spark for batch, or when latency > 1 second is acceptable and unified batch/stream APIs are more valuable than raw latency.
Distributed tracing connects operations across multiple services into a single trace to diagnose latency and errors. A trace is a DAG of spans; each span represents a unit of work (HTTP call, DB query) with a start/end timestamp, service name, and key-value attributes. Context propagation: when service A calls service B, it injects a trace context (trace ID + parent span ID) into outgoing headers using the W3C Trace Context standard (`traceparent: 00-trace_id-span_id-flags`). Service B extracts the context, creates a child span, and propagates it further. OpenTelemetry provides vendor-neutral SDK instrumentation for traces, metrics, and logs with exporters for Jaeger, Tempo, Honeycomb, and Datadog. Sampling: at 100% trace volume for high-throughput services (10K QPS) you generate millions of spans/second. Head-based sampling (sample at trace entry point, e.g., 1%) is simple; tail-based sampling (buffer spans and decide after the trace completes based on latency/errors) captures 100% of slow/erroneous traces while sampling routine ones.
Active-active means multiple regions simultaneously accept writes — no region is a passive standby. The core challenge is conflict resolution across regions. Architecture: each region has a full stack (app servers, databases, caches); a global load balancer (Cloudflare, Route 53 latency routing) directs users to the nearest region. Data replication: use a globally distributed database (CockroachDB, Spanner, DynamoDB Global Tables) or asynchronous replication with CRDTs for conflict-free merges. For user data, route each user's writes to their home region (sticky routing by user shard) to minimize cross-region conflicts. DNS TTLs: set low (30–60s) to enable fast failover. Circuit breakers on cross-region calls prevent a regional failure from propagating. Operational complexity: multi-region deployments require coordinated deploy pipelines (progressive rollout per region), global secrets management, and cross-region monitoring. SLA target: < 100ms p99 within a region, failover < 30 seconds on regional outage.
Blue-green deployment maintains two identical environments (blue = live, green = new version). Deploy to green, run smoke tests, then switch the load balancer to send 100% traffic to green. Rollback is instant — flip back to blue. Limitation: requires 2× infrastructure cost. Canary release: gradually shift a small percentage of traffic (1% → 10% → 50% → 100%) to the new version while monitoring error rates, latency, and business metrics. Automated canary analysis (Spinnaker, Argo Rollouts with Prometheus metrics) auto-promotes or rolls back based on thresholds. Feature flags (LaunchDarkly, Unleash, custom Redis-backed flags) decouple deployment from release: ship code dark, then turn on features for 1% of users via flag config without redeploying. Database schema changes require backward-compatible migrations: add columns before deploying code that uses them, drop old columns only after the new code is stable — the expand/contract (parallel change) pattern. Kubernetes rolling updates replace pods one at a time with configurable maxSurge and maxUnavailable.
Chaos engineering is the discipline of deliberately injecting failures into a production (or production-like) system to discover weaknesses before they cause unplanned outages. The scientific method: define a "steady state" hypothesis (e.g., p99 latency < 200ms, error rate < 0.1%), inject a fault, observe whether the hypothesis holds, and fix any deviations. Tools: Netflix Chaos Monkey (terminates random EC2 instances), Chaos Gorilla (terminates an entire AZ), Gremlin (fault injection platform supporting CPU hog, memory pressure, latency injection, packet loss, DNS failures), AWS Fault Injection Simulator. Fault types: instance termination, network partition, CPU saturation, disk full, dependency slowdown (add artificial 500ms to Redis calls). GameDays: planned events where engineers simulate large-scale failures and practice incident response. Key requirement: have monitoring and rollback in place before running chaos experiments. Start in staging, graduate to off-peak production with blast radius controls.
Each microservice owns its data store — no shared databases. Maintaining consistency across services requires deliberate patterns. Synchronous consistency: use distributed transactions (2PC via XA, or saga orchestration) for operations that must be atomic across services. This adds latency and coupling. Eventual consistency: publish domain events to Kafka on every state change; downstream services subscribe and update their local views. Use the outbox pattern to guarantee the event is published atomically with the DB change. Compensating transactions correct failures after the fact (saga compensation). Read-your-writes: route a user's subsequent read to the same service instance or use a "causal token" (a logical timestamp or version number passed in request headers) that downstream services use to wait until they have replicated to at least that version. Service mesh (Istio, Linkerd) handles mTLS, retries, circuit breaking, and distributed tracing but does not solve data consistency — that is always an application concern.
A Conflict-free Replicated Data Type (CRDT) is a data structure designed so that concurrent updates from multiple nodes can always be merged deterministically without conflicts. Two types: operation-based (op-based) CRDTs broadcast operations and apply them in any order if operations commute; state-based CRDTs merge by taking the "join" (least upper bound) of two states — requiring a monotonically increasing structure (grows-only set, max register). For collaborative text editing (like Google Docs), the RGA (Replicated Growable Array) CRDT assigns each character a unique ID and a causal predecessor; insertions and deletions are always resolvable without coordination. Yjs and Automerge are popular CRDT libraries for JavaScript. Facebook's Riak uses ORSet (Observed-Remove Set) and LWW-Register (Last-Write-Wins) CRDTs for shopping carts and counters. Trade-off: CRDTs add per-element metadata overhead and may not capture all user intent (e.g., concurrent bold + delete semantics can be ambiguous).
WebSocket maintains a persistent TCP connection per user — 10M concurrent connections cannot be handled by a single server (Linux default: ~65K ports; practical limit per server: ~100K–500K with tuning). Solution: a horizontally scaled WebSocket tier behind a Layer 4 load balancer using IP-hash or consistent hashing to pin a user's reconnects to the same node. Each WebSocket server node maintains an in-memory map of `connection_id → websocket` for connected users. Routing messages: when user A (on node 1) sends to user B (on node 2), node 1 publishes the message to a pub/sub broker (Redis Pub/Sub per room, or Kafka topic per channel). Node 2 subscribes to user B's channel and pushes the message over the open WebSocket. Presence detection: each node publishes a heartbeat for its connected users to a shared Redis key with TTL — other services can check if a user is online. Offline delivery: messages for offline users are stored in a message store (Cassandra or DynamoDB, time-ordered by conversation) and delivered on reconnect. Connection state: store `user_id → node_id` in Redis for cross-node message routing. Scale the WebSocket tier with Kubernetes HPA on connection count; use C10K-optimized event loops (Node.js, Golang net/http, Netty).
The dual-write problem: a service needs to atomically update a DB row and publish a Kafka event. Standard approach fails because DB and Kafka are separate systems — a crash between the two writes causes inconsistency. The outbox pattern: write the event as a row in an `outbox` table within the same DB transaction as the business entity update. A CDC relay (Debezium monitors the PostgreSQL WAL; when a new outbox row appears, it publishes the event to Kafka and marks the row processed) delivers the event to Kafka exactly once after the DB transaction commits. Debezium supports PostgreSQL, MySQL, MongoDB, and SQL Server via their native replication protocols. An alternative relay is a polling publisher (a background job SELECTs unprocessed outbox rows every second), which is simpler but adds latency. The outbox table schema: `(id, aggregate_type, aggregate_id, event_type, payload JSONB, created_at, published_at)`. Idempotent consumers handle the rare case of Debezium re-delivering on restart.
S3-style object storage stores arbitrary binary blobs addressed by (bucket, key). Scale targets: 10 exabytes of data, billions of objects, millions of req/s. Architecture: a metadata layer (stores bucket/key → physical location, size, checksum, ACL) runs on a distributed key-value store with strong consistency for namespace operations. A storage layer distributes object bytes across thousands of storage nodes using erasure coding (e.g., Reed-Solomon 14+4 — tolerate 4 node failures with only 29% storage overhead vs. 200% for 3-way replication). Upload path: client uploads to a front-end server which streams bytes to storage nodes in parallel, collects a success quorum, then commits the metadata record. Download path: resolve metadata to storage node list → stream erasure-coded shards in parallel and reconstruct. Durability: 11 nines (99.999999999%) via erasure coding + cross-AZ replication + background scrubbing (periodic checksum verification). Multipart upload splits large files into 5–100 MB parts uploaded independently, then assembled server-side — enabling parallel uploads and resumability. Lifecycle policies tier infrequently accessed objects to cheaper storage classes (Glacier) after N days.
Requirements: accept score updates from millions of players, serve top-K global and per-region leaderboards in < 50ms, update in real time. Core data structure: Redis Sorted Set (ZADD with O(log N), ZREVRANK for user rank, ZREVRANGE for top-K) — Redis Cluster distributes sorted sets across shards. Single global sorted set does not scale beyond ~100M members per shard without fragmentation; partition by region or game mode and merge top-K at query time. Write path: game server POSTs score update → score service validates → ZADD to Redis Sorted Set (atomic upsert using ZADD XX GT — only update if the new score is greater). Read path: ZREVRANGE leaderboard 0 99 WITHSCORES returns top-100 in O(log N + K); ZREVRANK player:123 returns a specific player's rank. For global cross-shard rankings: each shard maintains a top-K sorted set; a leaderboard aggregator merges K sorted lists in O(K log S) time (S = number of shards). Persistence: persist scores to PostgreSQL for auditability; Redis is the read cache. Snapshots: archive hourly/daily leaderboards to S3 for historical queries. Anti-cheat: validate score deltas against expected game session duration; flag outliers for review.
A distributed configuration store serves as the single source of truth for runtime configuration values, feature flags, and service discovery data — values that must be strongly consistent and highly available. Core requirements: strongly consistent reads and writes (linearizable), watch/notify (clients subscribe to key changes and receive push notifications), high availability (tolerate N/2 - 1 node failures), and low latency (< 5ms p99 for reads). Architecture: use the Raft consensus protocol across N nodes (typically 3 or 5 — odd number for majority quorum). All writes go to the leader via Raft log append; reads can be served from the leader (linearizable) or any follower (potentially stale). Client watches: leader keeps a notification registry; on commit, it fans out change events to subscribed clients over long-lived gRPC streams (etcd uses Watch API with revision-based filtering). Key-value schema: hierarchical keys with prefix scans (etcd prefix range queries, Consul's key-value tree). Leases/TTLs: ephemeral keys bound to a lease that the client must renew via KeepAlive — when the client dies the key expires, enabling service health registration. Snapshot + WAL compaction prevent unbounded disk growth. etcd backs Kubernetes' entire control plane state (all objects, leaders, leases) demonstrating that 3-5 etcd nodes can serve a 5,000-node cluster.
Requirements: evaluate every payment transaction (10K TPS peak) for fraud signals in < 100ms before authorizing; false positive rate < 0.1% (every blocked legitimate transaction costs revenue). Architecture: Client → Payment API → Fraud Evaluation Service (sync, in-path) → Auth Decision. Feature engineering pipeline: stream all events (transactions, logins, device fingerprints) through Kafka into a real-time feature store (Feast + Redis) that maintains pre-computed features per entity: `user:txn_count_1h`, `card:amount_sum_24h`, `device:new_user_count`, `merchant:decline_rate_7d`. Feature freshness: Redis features updated on every event via Flink streaming job (sub-second lag). Scoring: the Fraud Evaluation Service fetches ~50 features from Redis in a single pipeline call (~1ms), runs a gradient boosted tree model (XGBoost, loaded in-process) — inference in < 2ms. Rule engine applies business rules in parallel (velocity limits, block-list checks). Decision: score > threshold → decline; borderline → step-up authentication (3DS challenge). Model retraining: daily batch job on Spark processes labeled outcomes (fraud confirmed by disputes) → retrain XGBoost → shadow test new model for 24h (compare decisions without acting) → promote if precision/recall metrics improve. Feedback loop: fraud labels from chargebacks feed back into training data with a 30–90 day lag.

Frequently Asked Questions

How long is a typical system design interview?

45–60 minutes. Expect to define requirements, draw an architecture, and discuss trade-offs.

What framework should I use?

Requirements → capacity estimation → high-level design → detailed components → bottlenecks → trade-offs. Many candidates use RADIO or 4S as mnemonics.

How do I practice?

Study classic designs (URL shortener, Twitter, chat, rate limiter), mock with peers, and read engineering blogs from FAANG/Netflix/Uber.

What is the biggest mistake candidates make?

Jumping to design before clarifying requirements and scale. Ask about users, writes/sec, consistency needs before drawing anything.

Do I need to know every database?

No — know one SQL and one NoSQL deeply, understand when each fits, and you can extrapolate.

Related Topics

Ready to apply?

TryApplyNow scores matches, tailors resumes, and tracks applications so you can focus on prep, not paperwork.

Try for free →