Most Big Data developers and Data Engineers start learning Spark by writing SparkSQL codes to perform ETL on DataFrame (I know I did). I also wrote a post about SparkSQL Programming. However, we quickly learn that there’s more knowlege required to go from processing a few GBs of data to dealing with TBs and PBs of data, which is a challenge for big enterprises. Learning to write correct Spark codes is only a small part of the battle, you will need to understand the Spark Architecture and Spark working internals to correct tune Spark to handle true big data, and it’s the focus of this post.
First, let this sink in: Spark is an in-memory, parallel processing engine that is very scalable. The more data you have, the more powerful Spark can become that sets it apart from other processing engines. Spark is faster than Map Reduce paradigm because it processes data in memory, which means that it can reduce the disk IO that normally slows down Map Reduce jobs. Spark is fast because of the ability to process data in parallel.
Parallelism is key enabler of Spark efficiency. The Spark Architecture is designed so that you can add new computers to process growing amount of big data in parallel.
A Spark cluster has, a driver and mutiple workers (think computers).
SparkSession is the single point of entry to all DataFrame API functionality. SparkSession is available since Spark 2.0, before that Spark Context was used with a limitation of only one Spark Context per JVM. SparkSession can unify numerous Spark Contexts.
SparkSession automatically created in a Databricks notebook as the variable spark
.
# In below code, the `spark` variable specifies a sparkSession
# spark.table reads a table to a dataframe
df = spark.table('<table>')
.select('a', 'b')
.where('a>1')
.orderBy('b')
# spark.read reads files to a dataframe
df = spark.read.parquet('path/to/parquet')
# spark.sql execute sql queries on a table and save the result set to df
df = spark.sql('select * from <table>')
Spark ecosystems have 4 APIs: SparkSQL, Spark Structured Streaming, SparkML, and GraphX (I haven’t used this before, not sure if it’s deprecated or not). Most of Spark developers started with SparkSQL APIs with ingestion and transformations on Spark DataFrame. However, Spark Structured Streaming and SparkML are pretty popular too, which we can discuss later in later posts.
Earlier I mentioned that Driver is responsible to turn operations into jobs or DAGs. DAGs are Directed Acyclic Graphs (fancy word for graphs that have direction with no cycle). In a spark execution plan, each job is a DAG, each node within a DAG can have one or multiple stages, each stage can have multiple tasks (clear?)
Spark parallelizes at 2 levels:
Another characteristic of Spark is lazy execution. When you specify transformations on a Spark DataFrame, Spark records lineage and only start the computation when an action is triggered (refer to my previous post about SparkSQL programming for more information on transformations and actions)
Under the hood, SparkSQL uses Spark Catalyst Optimizer to optimize query performation, similar to how a relational database or a data warehouse plans their query jobs.
The Catalyst Optimizer is a component of Spark SQL that performs optimization on a query through 4 stages:
Catalyst Optimizer is a rule based engine that takes the Logical Plan and rewrites it as an optimized Physical Plan. The Physical Plan is developed BEFORE a query is executed
To view the Catalyst Optimizier in action, use df.explain(True)
to view the Logical and Physical Execution plans of a query.
In Spark 3.0, Adaptive Query Execution (AQE) was introduced. One difference between AQE and Catalyst Optimizer is that AQE modifies the Physical Plan based on Runtime Statistics, so AQE can tune your queries further on the flight. So you may think that AQE is complimentary to Catalyst Optimizer.
For example, during runtime, based on the new information that is previously not available during planning, AQE can decide to change your join strategy to Broadcast Hash Join from Sort Merge Join to reduce data shuffle. Or AQE can coalesce your partitions to optimal size during shuffling stage, or help improve Skew Join.
This option is not turned on by default in Spark, you can enable by setting spark config: spark.conf.set(spark.sql.adaptive.enabled, True)
, and it’s recommended to turn this on. However, If you run Spark on later version of Databricks Runtime, AQE is enabled by default.
We established that Spark processes data in parallel by splitting up data into partitions and move (shuffle) them to each executors so that they can run a task on a small subset of data in memory.
Shufflings, partitionings, and memory can potentially dictate Spark performance. So if you understand these terms in depth, debugging Spark can become much easier, which I explained further in another post about Debugging Long Running Spark Job
To process your data, Spark will first have to ingest files from disk to memory, and by default it reads data into partitions of 128MB. If there’s any wide transformation on the DataFrame, Spark needs to repartition the data and move partititions to cores for processing. The implication of this is each partition will have to fit into the core’s memory or you will have spill or OOM errors. If partitions are not evenly distributed, you can have skew (which means some executors have more works than the others). Correctly tuning partitions upon ingestion and upon shuffling stage can help improve your Spark jobs.
There are 2 types of partitioning:
When Spark read data from disk to memory (dataframe), the initial partition in the dataframe (in MEMORY) will be determined by number of cores (default level of parallelism), dataset size, spark.sql.files.maxPartitionBytes
config, spark.sql.files.openCostInBytes
(default 4MB, overhead of opening file). Remember that this is the size of the partition in Memory, irrelevant to what it is on disk.
Check number of partitions in DataFrame when ingested from disk to memory with df.rdd.getNumPartitions()
. We can estimate the size of your dataframe in memory by multiply the number of partitions in memory by the partition size.
spark.sql.files.maxPartitionBytes
. A situation when setting this config can be beneficial is to write data to 1GB part files.
df.rdd.getNumPartitions()
to check the number of partitions in memory.coalesce()
and repartition()
. Coalesce can only reduce the number of partitions and increase partition size, but as a narrow transformation with no shuffling, coalsce is more efficient than repartition. Repartition returns new DF with exactly N partitions of even size. It can increase or decrease your partition count, but it requires expensive data shufflingShuffle is one of the most expensive operation in Spark. In every wide transformation (for example a groupBy
), shuffle create multiple stages:
The issues with shuffle partitions are:
spark.sql.shuffle.partitions
based on the rule in partition section can help. This setting indicates how many partitions Spark will create for the next stage, and it MUST be managed by user for every job.Besides, there are a few techniques to mitigate excessive shuffles in my previous post Debugging Long Running Spark Job
By default, data in a DataFrame is only present in Spark cluster while bing processed during a query, it won’t be persisted on a cluster afterwards. However, you can explicitly request Spark to persist DataFrame on the cluster by invoking df.cache
. Cache can store as many partitions of the dataframe as the cluster memory allows
Note that cache is another type of persist: df.cache
is df.persist(StorageLevel.MEMORY_AND_DISK)
. This stores partitions in memory and spills excess to disk.
Cache should be used with care because caching consumes cluster resources that could otherwise be used for other executions, and it can prevent Spark from performing query optimization. You should only used cache in below situations:
When you use cache() or persist(), the DataFrame is not fully cached until you invoke an action that goes through every record (e.g., count()). If you use an action like take(1)w, only one partition will be cached because Catalyst realizes that you do not need to compute all the partitions just to retrieve one record.
Don’t forget to cleanup with df.unpersist
to evict the dataframe from cache when you no longer need it.