Often we have necessity in spark applications to read data from various of source and apply business rules over them to extract sense of data so that it constitutes business value proposition.
Misconfigured job configs or having lot of spark actions in the code can cause unnecessary use of cluster for executing these actions and also can cause SLA breaches for SLA driven applications which are heavily relied on realtime data processing.
Apply narrow transformations
When we read data from datasource, it is very important to reduce the size of dataset by applying various filters and removing unused columns etc.
Actions like filter results in applying transformation on same partitions in the executor and some of which may result in empty partitions after applying filters, by doing this we reduce dataset size which can yield benefits when data is transferred over network when its is joined on specific key.
Say if you have spark cluster with 50 nodes, and we read file from datasource say filesystem like HDFS of 500MB of data. HDFS has block size of 128MB hence spark creates 4 partitions (500/128), which is disallowing us to fully utilise resources in spark cluster. Instead reparation data to increase parallelism. Parallelism in spark is achieved at thread level unlike at process in map-reduce.
The best way to decide on the number of partitions in an RDD is to make the number of partitions equal to the number of cores in the cluster so that all the partitions will process in parallel and the resources will be utilized in an efficient way.
Also note that — If an RDD has too many partitions, then task scheduling may take more time than the actual execution time.
val orc_df = spark.read.orc("part-e33b03ac0355-c000.snappy.orc")scala> orc_df.rdd.getNumPartitions
res3: Int = 1scala> val new_orc_rdd = orc_df.repartition(16)
res4: Int = 16
Although Catalyst Optimizer runs set of rules to combine various filters and projections, cutting down dataset before shuffles is individual responsibility.
Executors — Choose right number of work horses required to get the job done, with
—-num-executors property. If this property is not enabled spark by default uses dynamic allocation and utilise most of your cluster resources.
Executor Cores — set the appropriate number CPU cores while submitting job. If you request for 20 executors(
—-num-executors) with 5 cores(
—-executor-cores), then at the max we can have 100 tasks running in parallel.
If executors are beefed up, then write through put while persisting data may suffer.
Allocate not more than 4GB of driver memory, since most of the accumulators sends data back to driver they are already aggregated for actions like count.
Enable speculation — Spark will perform speculative execution of tasks, if one or more tasks are running slowly in a stage, Spark will relaunch those tasks on another worker node.
Configure right shuffle partitions —
spark.sql.shuffle.partitions controls the number of shuffle partitions, setting this to appropriate value becomes important.
For ex — let’s consider simple word count example. we can see that when groupBy action is triggered, number of partitions is 200 i.e. because spark sets default parallelism to 200.
scala> val words = Array("one", "two", "two", "three", "three", "three")
words: Array[String] = Array(one, two, two, three, three, three)scala> val wordRDD = words.map(word => (word,1))
wordRDD: Array[(String, Int)] = Array((one,1), (two,1), (two,1), (three,1), (three,1), (three,1))scala> val wordDF = wordRDD.toList.toDF("num", "count").groupBy("num").sum()
res1: Int = 200
For smaller datasets this can be overhead in scheduling delays causing slower processing and under utilization of all executors.
Use coalesce to reduce partitions on each executor so that partitions are combined on their local nodes and hence reduces network shuffles.
res0: Int = 6scala> val _coal = wordDF.coalesce(2)
_coal: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [num: string, sum(count): bigint]scala> _coal.rdd.getNumPartitions
res2: Int = 2
For larger it can cause executors to process too much of data causing memory errors. Also larger amounts of data per executor leads to disk spills which leads to IO to and from disk, causing slowness of application and lot of time spent in GC.
Ideal limits is to set 1.5X of your your initial partitions after repartitioning.
Resource allocation — If dynamic allocation is true (default), make sure your jobs are rightly configured, else it will end up using resource in cluster and causing rest of jobs to be queued.
Even if number of executors is set to 99 and shuffle partitions are set to 50, then at max 50 tasks can be executed in parallel resulting in rest of executors remaining idle, i.e. min(total_cores, shuffle_partitions)
Before applying any transformations on data, it is recommended to profile datasets which can greatly reduce debugging time.
Map side joins — For any smaller data sets less than 1GB, broadcast them across all executors to reduce the look up times in each nodes and avoids data shuffling across nodes in spark cluster.
Cache — Cache RDD’s which needs to be reused in future instead of reading from source all the times. Smaller datasets can be cached in-memory(MEMORY_ONLY), larger can be read from disk(DISK_ONLY), and spark can store few of the partitions in-mem and rest in disk(MEMORY_AND_DISK).
As a thumb rule, if we cache rdd’s in deserialized, it is going to consume more memory(2x) needed to store them as objects, because of overhead of java deserialization.
Avoiding groupingBy — As much as possible we need to avoid grouping keys across partitions because keys are shuffled across all executors unlike reduce which aggregates within each partition first and then shuffles data to executors.
Avoid UDF’s — Avoid using UDF’s since it involves deserialization of data, which creates higher memory footprint in evaluating data and then repacking it again. Instead use SQL functions as much as possible.