Vetora logo
🔀Data Engineering

Data Partitioning & Shuffles

Data partitioning determines how records are distributed across nodes, files, or partitions in a distributed system. Shuffles are the expensive redistribution of data between partitions during operations like joins and aggregations. Understanding partitioning strategies and shuffle mechanics is essential for optimizing distributed query performance.

Overview

In distributed data processing, a dataset is divided into **partitions** (also called splits, shards, or buckets) that are processed in parallel across multiple machines. How data is partitioned determines which operations are local (fast) and which require cross-node data transfer (slow). Partitioning is the single most impactful design decision for distributed query performance.

Three partitioning strategies dominate: **Hash partitioning** applies a hash function to a key column and assigns records to partitions based on the hash value (e.g., partition = hash(user_id) % num_partitions). This ensures even distribution for uniform key distributions. **Range partitioning** divides the key space into contiguous ranges (e.g., dates January-March in partition 1, April-June in partition 2). This enables efficient range scans but can cause skew if data is not uniformly distributed. **Round-robin partitioning** assigns records to partitions in rotation, ensuring perfect balance but providing no key-based locality.

A **shuffle** occurs when an operation requires data to be reorganized by a different key than the current partitioning. For example, if data is partitioned by user_id but a query groups by country, all records for the same country must be sent to the same partition -- requiring a full cross-network data transfer. In Spark, shuffles are the boundary between stages: operations within a stage are pipelined in memory; shuffles materialize intermediate data to disk and transfer it across the network. A shuffle of 1 TB of data across 200 nodes generates massive network I/O and is often the bottleneck of the entire job.

**Data skew** is the most insidious problem in distributed processing. When one partition contains vastly more data than others (e.g., a 'null' key, a celebrity user, or a popular product), that partition becomes a bottleneck while other nodes sit idle. A perfectly parallel 10-minute job can take 2 hours if one partition has 100x more data. Mitigating skew with techniques like salted keys, broadcast joins, and Spark's Adaptive Query Execution (AQE) is a critical skill for data engineers.

Key Points
  • 1Hash partitioning (partition = hash(key) % N) provides uniform distribution for non-skewed keys and enables partition-pruning for point lookups. It is the default in Spark's repartition() and Kafka's partitioner. It does not support efficient range queries.
  • 2Range partitioning assigns contiguous key ranges to partitions, enabling efficient range scans and sorted output. It is used by Hive/Iceberg for date-based partitioning and by HBase for row key ranges. It is vulnerable to skew if the key distribution is non-uniform.
  • 3A shuffle involves serializing data, writing to local disk, transferring across the network, and deserializing on the receiving side. In Spark, shuffles produce 'shuffle files' on disk. The number of shuffle partitions (spark.sql.shuffle.partitions, default 200) determines parallelism in the reduce stage.
  • 4Data skew occurs when a few keys dominate the dataset. A GROUP BY on a column where 40% of rows have key='unknown' creates one massive partition and many tiny ones. The job's runtime is bottlenecked by the slowest partition. Skew is the #1 cause of slow Spark jobs.
  • 5Broadcast joins eliminate shuffles for small-table joins: the small table (under ~10MB by default, configurable) is broadcast to every executor's memory. The join becomes a local hash lookup on each partition of the large table. This turns an O(n) shuffle into an O(1) broadcast.
  • 6Spark's Adaptive Query Execution (AQE) automatically handles skew by splitting skewed partitions at runtime, coalescing small partitions, and switching join strategies based on actual data sizes rather than optimizer estimates.
Simple Example

Skewed Join in an Ad Analytics Pipeline

An ad analytics pipeline joins a clicks table (10 billion rows) with an advertisers table (100K rows) on advertiser_id. Without optimization, Spark hash-partitions both tables by advertiser_id and shuffles 10 billion rows across the network. Worse, one advertiser (a global brand) accounts for 30% of all clicks, causing massive skew -- one partition processes 3 billion rows while others process 50 million each. Two optimizations fix this: (1) Broadcast join -- the advertisers table is only 100K rows (~5MB), so Spark broadcasts it to every executor and the join becomes a local lookup with zero shuffle. (2) If the small table were larger, salted keys would split the hot advertiser's partition into 10 sub-partitions, spreading the 3 billion rows across 10 tasks instead of 1.

Real-World Examples

Meta (Facebook)

Meta's data warehouse processes exabytes daily using Spark and Presto. Data is partitioned by date (range partitioning) with sub-partitioning by country for their largest tables. Their largest shuffle operations (joining user activity with user profiles across 3 billion users) transfer petabytes across the network. They mitigate skew from highly active users (celebrities, bots) using salted keys and cardinality-aware partitioning that pre-splits known hot keys.

Databricks

Databricks developed Spark's Adaptive Query Execution (AQE) to handle the shuffle and skew problems automatically. AQE observes actual partition sizes at shuffle boundaries and makes runtime decisions: coalescing 1,000 tiny partitions into 50 appropriately-sized ones, splitting a skewed 10GB partition into 10 x 1GB sub-partitions, and switching from sort-merge join to broadcast join when a table turns out to be smaller than estimated. AQE reduced the need for manual tuning by 60% in their customer benchmarks.

LinkedIn

LinkedIn's Spark-based ETL processes trillions of member activity events daily. They discovered that partitioning their member-activity table by member_id created severe skew: power users (recruiters, influencers) generated 1000x more events than average users. They solved this with dynamic repartitioning: a pre-processing step computes per-key cardinalities and assigns hot keys to multiple partitions using a salt suffix. This reduced their largest job's runtime from 8 hours to 45 minutes.

Trade-Offs
AspectDescription
Partition Count vs Partition SizeToo few partitions (e.g., 10 partitions for 1 TB) create large partitions that exhaust executor memory and prevent parallelism. Too many partitions (e.g., 100,000 partitions for 1 TB) create tiny partitions with excessive scheduling overhead, small file problems, and underutilized executors. The sweet spot is partitions of 128MB-1GB each. Spark's AQE can automatically coalesce small post-shuffle partitions.
Shuffle Volume vs ComputationReducing shuffle data volume is almost always worth additional computation. Map-side partial aggregation (combiners) reduces the data sent to the reduce stage. Pre-filtering before a join reduces shuffle volume. Broadcast joins eliminate shuffles entirely for small tables. Every byte not shuffled saves network I/O, disk I/O, and serialization/deserialization CPU.
Pre-Partitioning vs Runtime FlexibilityPre-partitioning data by a known query key (e.g., storing data bucketed by user_id) eliminates shuffle for queries on that key. But if a different query groups by product_id, a full shuffle is still needed. Pre-partitioning optimizes for known access patterns at the cost of flexibility. Iceberg's hidden partitioning and partition evolution help by allowing the partition strategy to change over time without rewriting data.
Skew Mitigation Complexity vs Job PerformanceTechniques like salted keys, two-phase aggregation, and custom partitioners add code complexity and maintenance burden. AQE handles many skew scenarios automatically but cannot solve all cases (e.g., it does not help with skewed source data). For critical production jobs, investing in skew mitigation pays dividends; for ad-hoc exploration, the overhead may not be worth it.
Case Study

Spotify's Shuffle Optimization for Royalty Calculations

Scenario

Spotify's royalty calculation pipeline joins a massive streams table (billions of rows per day of individual song plays) with a rights-holders table (millions of rows of complex licensing agreements) on track_id. The shuffle for this join transferred 5+ TB of data across the network. Additionally, a handful of viral tracks accounted for 10% of all streams, creating severe data skew in the track_id-partitioned shuffle.

Solution

Spotify implemented a multi-pronged optimization: (1) Bucketed tables -- both the streams and rights-holders tables were pre-bucketed by track_id into 2,048 buckets, eliminating the shuffle entirely for the join (bucket-to-bucket local join). (2) For the daily incremental pipeline (only today's new streams), they could not pre-bucket, so they used a salted join: hot track_ids were detected dynamically, salted into 16 sub-keys, and the rights-holders table was replicated 16x for those keys. (3) Map-side partial aggregation reduced the stream count per track before the join.

Outcome

The shuffle volume dropped from 5 TB to under 200 GB (96% reduction). Job wall-clock time dropped from 4 hours to 35 minutes. The skewed partition that previously took 90 minutes (while all other partitions finished in 10 minutes) was eliminated by salting. Cluster cost for the royalty pipeline dropped by 70% because fewer executor-hours were needed.

Common Mistakes
  • Using the default spark.sql.shuffle.partitions=200 for all jobs regardless of data size. For a 10 GB dataset, 200 partitions (50 MB each) is reasonable. For a 10 TB dataset, 200 partitions (50 GB each) will cause out-of-memory errors. Scale partition count with data size (target 128MB-1GB per partition), or enable AQE to handle this automatically.
  • Joining two large tables without checking if one is small enough for a broadcast join. A sort-merge join shuffles both tables; a broadcast join shuffles neither. If one table is under a few GB, broadcasting it can turn a 30-minute shuffle-heavy job into a 2-minute broadcast job. Check table sizes and set spark.sql.autoBroadcastJoinThreshold appropriately.
  • Ignoring data skew and assuming partitions are balanced. Always check partition sizes after a shuffle (Spark UI's 'Stages' tab shows per-task input sizes). If the max partition is 10x the median, you have skew. Address it with salting, partial aggregation, or AQE's skew join optimization before the job blocks your pipeline SLA.
  • Repartitioning data unnecessarily. Every repartition() call triggers a full shuffle. If you repartition by user_id and then immediately group by user_id, Spark already knows the data is partitioned correctly and skips the second shuffle. But repartitioning by user_id and then grouping by product_id triggers two shuffles. Minimize repartitions and let Spark's optimizer manage partitioning when possible.
Related Concepts

See Data Partitioning & Shuffles in action

Explore system design templates that use data partitioning & shuffles and run traffic simulations to see how these concepts perform under real load.

Browse Templates

Observe shuffle overhead and data skew in click aggregation

Metrics to watch
shuffle_data_gbskew_ratiopartition_processing_time_msstraggler_delay_ms
Run Simulation
Test Your Understanding

1What is data skew, and why is it problematic in distributed processing?

2How does a broadcast join eliminate shuffles in Spark?

Deeper Reading