You Spark job is running for a long time, what to do?
Generally, long-running Spark jobs can be due to various factors. We like to call them the 5S - Spill, Skew, Shuffle, Storage, and Serialization. So, how do we identify the main culprit?
🔎 Look for Skew: Are some of the tasks taking longer than others? Do you have a join operation?
🔎 Look for Spill: Any out-of-memory errors? Do the executors have enough memory to finish their tasks? Got any disk spills?
🔎 Look for Shuffle: Large amounts of data being shuffled across executors? Do you have a join operation?
🔎 Look for Storage Issues: Do you have small files or highly nested directory structure?
🔎 Look for inefficient Spark code, or serialization issues.
Now, the solution may vary depending on the root cause. Most of the times, the root cause indicators will show up in the SparkUI and it’s important that you understand how to read it.
In general, always try to cut down the time Spark takes to load data into memory, parallelize tasks across executors, and scale the memory according to data size.
TL,DR - Skew is caused by imbalance partitions. To fix Skew, try to repartition data, or set SkewHint when reading in data from disk to memory. If Skew is caused by partition imbalance after shuffling stage, enable SkewJoin option with Adaptive Query Execution, and set the correct shuffle partitions size. In most cases, turning on Adaptive Query Execution can help mitigate Skew issue.
Let’s talk about one of the most common issues you might encounter in Spark - “skew”. Skew refers to an imbalance in partition sizes, which can also lead to a spill. When you read in data in Spark, it’s typically divided into partitions of 128 MB, distributed evenly according to the maxPartitionBytes
 setting. However, during transformations, Spark will need to shuffle your data, and some partitions may end up having significantly more data than others, creating skew in your data.
When a partition is bigger than the others, the executor will take longer to process that partition and need more memory, which might eventually result in a spill to disk or Out Of Memory (OOM) errors.
If you disk spill or OOM errors are caused by skew, instead of solving for RAM problem, solve for uneven distribution of records across all partitions
Option 1: If you run Spark on Databricks, use skew hint (refer Skew Join optimization). For example, assuming that you know the column used in the join is skewed, set the skew hint for that column. With this skew hint information, Spark can hopefully construct a better query plan for the join
## Set skew hint when loading the table
df = spark.read.format("delta").load(trxPath)
.hint("skew","<join_column>")
Option 2: If you use Spark 3.x, utilize Adaptive Query Execution (AQE). AQE can automatically adapt query plans at runtime based on more accurate metrics, leading to more optimized execution. After a shuffle, AQE can automatically use split shuffle read to split skewed partitions into smaller partitions, ensuring that the executors are not burdened by larger skews. This can lead to faster query execution and a more efficient use of cluster resources. Using AQE is highly recommended, and generally more effective than other options
## Turn AQE on
spark.conf.set('spark.sql.adaptive.enabled', True)
spark.conf.set('spark.sql.adaptive.skewedJoin.enabled', True)
Option3 : Another option is to salt the skewed column with a random number to create more evenly partitions but at the cost of extra processing. This is a more complex operations, which we should discuss in another post.
TL,DR - Spill is caused by executors lacking of memory to process partitions. To fix Spill think about how you can add more memory to the executors, or manage the partitions sizes
If your Spark executors don’t have enough local memory to process their allocated partitions, Spark has to spill the data to disk. Spill is a Spark’s measure of moving an RDD from local RAM to disk and then back to executor’s RAM again for further processing with the goal of avoiding an out-of-memory (OOM) error. However, this can lead to expensive disk reads and writes, and significantly slow down the entire job.
This process occurs when a partition becomes too large to fit into RAM, and it may be a result of skew in the data. Some potential causes of spill include setting spark.sql.files.maxPartitionBytes
 too high, using explode()
 on an array, performing a join
 or crossJoin
 of two tables, or aggregating results by a skewed column.
You can find Spill indicators on SparkUI under each stage’s detail tabe or Aggregated Metrics by Executor. When data is spilled, both the size of the data in memory and on disk will be provided. Typically, the size on disk will be smaller due to compression that occurs when serializing data before it is written to disk.
To mitigate Spill issues, there are several actions you can take.
spark.conf.set()
 to manage the size and number of partitions.
spark.conf.set(spark.sql.shuffle.partitions, {num_partitions})
to reduce data in each partition shuffled across executorsspark.conf.set(’spark.sql.files.maxPartitionBytes’, {MB}*1024*1024)
to reduce size of partition when Spark read from disk to memoryIt’s worth noting that ignoring spill may not always be a good idea, as even a small percentage of tasks can delay an entire process. Therefore, it’s important to take note of spills and manage them proactively to enhance the performance of your Spark jobs.
TL,DR - Shuffle refers to the movement of data across executors, and it’s inevitable with wide transformation jobs. To tune shuffle operations, think about how you can reduce the amount of data that get shuffled across the cluster network
Shuffle is the act of moving data between executors. If you have multiple data partitions in different executors on a Spark cluster, shuffle is necessary and inevitable. Most of the time, shuffle operations are actually quite fast. However, there are situations when shuffle can become the culprit of slowing down your Spark job. For example, moving data across the network is slow, and the more data you have to move, the slower it will get. Moreover, Incorrect shuffles can cause Skew, and potentital Spill
If you use wide transformation (distinct
, join
, groupBy/count
, orderBy
, crossJoin
) in your Spark job and you have multiple executors in the Spark cluster, shuffle will most likely happen.
In the SparkUI, you can find the Shuffle Read Size and Shuffle Write Size numbers in the stages on the SparkUI
To reduce the impact of shuffle on your Spark job, try to reduce the amount of data you have to shuffle across network
Bucketing can be a useful technique to improve the efficiency of data processing by pre-sorting and aggregating data in the Join operation. The process involves dividing the data into N buckets, with both tables requiring the same number of buckets. This technique can be particularly effective for large datasets of several TBs, where data is queried and joined repeatedly.
Bucketing should be performed by a skilled data engineer as part of the data preparation process. It’s worth noting that bucketing is only worthwhile if the data is frequently queried and joined, and filtering does not improve the join operation.
By using bucketing in the Join operation, the process of shuffling and exchanging data can be eliminated, resulting in a more efficient join.
TL,DR - Storage issues are problems related to how data is stored on disk, which can lead to high overhead with ingesting data by open, read, close files operation. To fix issues relating to Storage, think about how you can reduce the read, write, ingest files on disk
There are a few storage-related potential issues that can slow down operations in Spark data processing. One is the overhead of opening, reading, and closing many small files. To address this, it’s recommended to aim for files with a size of 1GB or larger, which can significantly reduce the time spent on these operations.
Another is directory scanning issue which can arise with highly partitioned datasets that have multiple directories for each partition, requiring the driver to scan all of them on disk. For example, files on your storage are fine-grained partitioned to year-month-date-minute.
A third issue involves schema operations, such as inferring the schema for JSON and CSV files. This can be very costly, requiring a full read of the files to determine data types, even if you only need a subset of the data. By contrast, reading Parquet files typically only requires one-time reading of the schema, which is much faster. Therefore, it’s recommended to use Parquet as file storage for Spark considering that Parquet stores schema information in the file
However, schema evolution support, even with Parquet files, can also be expensive, as it requires reading all the files and merging the schema collectively. For this reason, starting from Spark 1.5, schema merging is turned off by default and needs to be turned on by setting the spark.sql.parquet.mergeSchema
 option.
To address the issue of tiny files in a storage location, if you are using Delta Lake, consider using autoOptimize
with optimizeWrite
 and autoCompact
 features. These will automatically coalesce small files into larger ones, however they have some subtle differences:
To address the issue of slow performance when scanning directories, consider registering your data as tables in order to leverage the metastore to track the files in the storage location. This may have some initial overhead, but will benefit you in the long run by avoiding repeated directory scans.
To address issues with schema operations, one option is to specify the schema when reading in non-Parquet file types, though this can be a time-consuming process. Alternatively, you can register tables so that the metastore can track the table schema. The best option is to use Delta Lake which offer zero reads of schema with a metastore, and at most one reads for schema evolution. Delta Lake also provides other benefits such as ACID transactions, time travel, DML operations, schema evolution, etc.
TL,DR - Bad codes can also slow your Spark down, always try to use spark sql built-in functions and avoid UDFs when develop your Spark codes. If UDFs are needed, try vectorized UDFs for Python and Typed Transformations for Scala
Refer to my post about SparkSQL Programming to understand the difference between built-in functions, UDFs and vectorized UDFs
Slower Spark Jobs can sometimes occur as a result of suboptimal code. One example of this would be code segments that have not been reworked to support more efficient Spark operations.
As a rule of thumb, always use spark.sql.functions
 whenever possible, regardless of which language you’re using. You can expect similar performance for both Python or Scala with these functions
In Scala, if you have to use User-defined functions that are not supported by standard spark functions, it’s more efficient to use Typed transformations instead of standard Scala UDFs. In general, Scala is more efficient than Python with UDFs and Typed transformation.
If you’re working with Python, avoid using Spark UDFs and vectorized UDFs if possible. But if you have to use UDF, always use vectorized UDF