Design a distributed counter system handling 1M+ increments per second using sharded sub-counters in Redis, async aggregation workers, and batched database persistence for YouTube-scale view counting.
Counting things at scale sounds trivial until you realize that a single viral video can receive millions of view increments per second. The distributed counter problem appears in nearly every large-scale consumer application: YouTube view counts, Instagram likes, Twitter retweet counts, and click-through analytics. It is a popular system design interview question because it tests a candidate's ability to reason about write-heavy workloads, hot keys, eventual consistency, and the trade-offs between accuracy and performance.
The fundamental challenge is that a single counter key (such as a viral video's view count) can become an extreme hot spot. A single Redis key receiving 10 million INCR operations per second would saturate a single Redis core, which tops out at roughly 1 million operations per second. The naive approach of incrementing a single database row is even worse, as it creates lock contention and write amplification that can bring down the entire database tier.
The sharded counter pattern solves this by splitting each logical counter into N sub-counters (shards). Write operations increment a randomly selected shard, distributing the load across multiple Redis hash slots and potentially multiple cluster nodes. A background aggregation worker periodically sums all shards to produce an approximate total for fast reads. The trade-off is that the displayed count is slightly stale (typically by a few seconds), which is perfectly acceptable for social media metrics but would not work for financial counters.
At production scale, systems like YouTube handle billions of counter updates per day with peak bursts during viral events. The system must sustain 1 million increments per second during normal operation and absorb 10 million per second viral spikes without dropping events. Read latency must remain under 5 milliseconds for the approximate count that powers the user interface.
The architecture cleanly separates the write path (optimized for throughput) from the read path (optimized for latency), with an asynchronous aggregation layer bridging the two. This separation allows each path to scale independently based on its unique workload characteristics.
On the write path, counter increment requests flow through the API Gateway (JWT authentication, per-user rate limiting to prevent bot abuse) and load balancer to the CounterService. CounterService hashes the item ID with a random shard index to select one of 16 sub-counter shards, then performs an atomic INCR on ShardCache (a 12-node Redis cluster). The Redis INCR completes in approximately 2ms. CounterService also publishes a fire-and-forget increment event to AggStream (Kafka with 128 partitions), which buffers up to 5 million messages to absorb viral traffic spikes. No database is touched on the hot write path.
The aggregation layer consists of two worker types. The AggregatorWorker runs continuously with 40 worker instances, consuming from AggStream. Every 5 seconds per active key, it reads all 16 sub-counter shards via MGET from ShardCache, computes the sum, and writes the aggregated total to MasterCache (a separate 6-node Redis cluster). This produces the approximate count that is at most 5 seconds stale. The PersistWorker batches aggregated counts and writes them to CounterDB (DynamoDB on-demand) every 30 seconds for durable storage, reducing write costs by 30x compared to per-increment database writes.
On the read path, ReadService handles two endpoints: the approximate count (GET from MasterCache, approximately 2ms, used by 95% of reads) and the exact count (query from CounterDB, approximately 15ms, used for analytics and auditing). The separation of ShardCache and MasterCache prevents write amplification from the high-throughput shard INCR operations from interfering with read latency on the master cache.
Choice
16 sub-counter shards per logical counter in Redis
Rationale
A single Redis key receiving 10 million INCR operations per second saturates a single Redis core (limited to roughly 1M ops/sec). Splitting into 16 shards distributes writes across multiple hash slots, with each shard seeing at most 625K ops/sec. This is the same pattern used by YouTube for view counting at scale.
Choice
Dedicated Redis clusters for writes (shards) and reads (aggregated totals)
Rationale
ShardCache handles 10M+ write operations per second while MasterCache handles 500K read operations per second. Separating them prevents the write amplification from high-throughput INCR operations on shard keys from degrading read latency. Each cluster can be sized and tuned independently for its specific workload profile.
Choice
AggregatorWorker sums shards every 5 seconds; Kafka absorbs burst traffic
Rationale
During viral events, increment traffic spikes 10x. Kafka absorbs the burst with a 5 million message buffer while workers process at a sustainable rate. Without the event stream as a shock absorber, the aggregation workers would need to scale instantly with traffic, which is both expensive and slow to provision.
Choice
PersistWorker batches writes to DynamoDB every 30 seconds
Rationale
At 1 million increments per second, per-increment database writes would require 1M write IOPS, costing roughly $30K per day on DynamoDB on-demand pricing. Batching 30 seconds of aggregated counts into a single write per key reduces this to approximately 33K writes per second for 1 million active keys, cutting costs by 30x.
Target RPS
1M sustained increments/sec; 10M peak during viral events; 500K reads/sec
Latency (p99)
< 10ms p99 for writes; < 5ms for approximate reads; < 30ms for exact reads
Storage
52 GB ShardCache (12 nodes); 26 GB MasterCache (6 nodes); DynamoDB on-demand for persistence
Availability
99.9% — approximate counts available even during partial Redis failures
This template is for educational and illustration purposes only. It may not represent the optimal production design for this problem. Real-world systems involve additional considerations (compliance, specific cloud provider constraints, organizational requirements) not captured here. Use this as a starting point for discussion, not as a production blueprint.
The AggregatorWorker sums all 16 sub-counter shards every 5 seconds. Between aggregations, the master count lags behind by up to 5 seconds of accumulated increments. For a viral video receiving 100K views per second, the displayed count could be 500K behind reality. However, showing '12.3M views' versus '12.8M views' is indistinguishable to users. Exact counts are available from the database for analytics use cases that require precision over speed.
The system handles viral spikes through three mechanisms. First, the 16-shard design distributes writes across multiple Redis nodes, preventing any single key from becoming a bottleneck. Second, Kafka acts as a shock absorber with a 5 million message buffer, allowing the aggregation workers to process at a sustainable rate while the queue drains. Third, the write path touches only Redis (not the database), so the hottest component can handle the throughput natively.
ShardCache stores the 16 sub-counter shards for each counter key and handles the write-heavy INCR workload (up to 10M ops/sec). MasterCache stores a single aggregated value per counter key and handles the read-heavy GET workload (500K reads/sec). Separating them prevents write amplification on the shard tier from degrading read latency. They also use different cluster sizes: 12 nodes for ShardCache (optimized for write throughput) versus 6 nodes for MasterCache (optimized for read latency).
For likes (which should be counted at most once per user per item), CounterService checks a Bloom filter stored in ShardCache before performing the increment. The Bloom filter uses approximately 12 bytes per entry, so 1 billion user-item pairs require about 12 GB distributed across the shard cluster. The Bloom filter has a roughly 1% false positive rate, which means about 1% of legitimate likes are silently dropped. This is acceptable for social media but would not be appropriate for voting or financial applications.
DynamoDB is well-suited for the counter persistence workload because it provides predictable write latency at any scale, automatic partitioning, and on-demand pricing that aligns with bursty batch writes. The data model is simple (a counter key mapped to a count value), which does not benefit from relational features like joins or complex queries. DynamoDB's eventual consistency model is also a natural fit, since the persisted counts are already eventually consistent by design.
Sign in to join the discussion.
Ready to design your own Distributed Counter?
Open the simulator, place components on the canvas, wire them up, and run a traffic simulation to see how your architecture performs under real load.
Open Simulator