How Spark Runs on a Cluster
Architecture of a spark application contains following main components :
Spark Driver : It is just a process on a physical machine that is responsible for maintaining the state of the application running on the cluster .
Spark Executors : They are processes that perform the tasks assigned by spark driver and return results (success or failure) .
Cluster Manager : It is responsible for maintaining cluster of machines that will run spark application , cluster manager has its own driver and worker .
There are 3 types of cluster mangers available :
1 . Built-In
2 . YARN
3 . Mesos
Execution Modes : It gives power to determine where resources are located physically.
1. Cluster Mode : User submits script to a cluster Manager . Cluster Manager then launches the driver process on a worker node inside the cluster , in addition to executor processes .
2. Client Mode : It is same as cluster mode except that spark driver is on client machine (edge node) ie client machine is responsible for maintaining driver process and executor process .
3. Local Mode : Entire Spark application is run on a single machine and parallelism is achieved through threads on single machine.
Life Cycle of a Spark Application (Outside Spark)
Let's say there are 4 nodes (1 Master , 3 Worker ) .
Part 1 : Client Request - In this part we are asking for resources from spark driver process , In case accepted cluster manager will place the driver onto a node in the cluster . The client process that submitted the original job exists and application is off and running on cluster .
Part 2 : Launch - At this time driver process has been placed on cluster and begins running user code . This code must include sparkSession that initialises a spark cluster (driver + executors). The sparkSession will communicate with the cluster manager asking it to launch spark executor process across the cluster .
Part 3 : Execution - The driver and the workers communicate among themselves , executing code and moving data around . The driver schedules tasks onto each worker , and each worker responds with the status of those tasks and success or failure .
Part 4 : Completion - The driver process exits with success or failure . The cluster manager then shuts down the executors in spark cluster for the driver .
Life Cycle of a Spark Application (Inside Spark)
Part 1 :
SparkSession - Spark jobs within an application are executed serially (unless we use threading to launch multiple actions in parallel ) .
## Creating a sparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Test")\
.config("Spark.conf.test","test-value")\
.getOrCreate()
From SparkSession we can access all of low-level and legacy contexts (SparkContext and SQLContext) , it was introduced in Spark 2.X
A SparkContext object within SparkSession represents the connection to the spark cluster . This class is how we communicate with some of Spark's low-level API such as RDD .
In previous versions of Spark (1.X) SQLContext and HiveContext provided the ability to work with DataFrames and Spark SQL and were commonly stored as the variable sqlContext . In 2.X two API's were combined into SparkSession and they can be accessed via SparkSession only .
Part 2 :
Logical Instructions - Each Action triggers one complete spark job .
eg :
df1 = spark.range(2,1000,2)
df2 = spark.range(2,1000,4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12,["id"])
step4 = step3.selectExpr("sum(id)")
step4.collect()
o/p - [Row(sum(id)=25000)]
step4.explain()
== Physical Plan ==
*(4) HashAggregate(keys=[], functions=[sum(id#6L)])
+- Exchange SinglePartition
+- *(3) HashAggregate(keys=[], functions=[partial_sum(id#6L)])
+- *(3) Project [id#6L]
+- *(3) BroadcastHashJoin [id#6L], [id#2L], Inner, BuildRight
:- *(3) Project [(id#0L * 5) AS id#6L]
: +- Exchange RoundRobinPartitioning(5)
: +- *(1) Range (2, 1000, step=2, splits=1)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- Exchange RoundRobinPartitioning(6)
+- *(2) Range (2, 1000, step=4, splits=1)
Part 3 :
Spark Job - In general there should be one spark job for one action . Action always return results .
Each job breaks down into a series of stages , the number of which depends on how many shuffle operations need to take place .
eg : Above Code breaks down as :
Stage 1 with 8 tasks (default prop. of range function to create 8 tasks ) .
Stage 2 with 8 tasks
Stage 3 with 6 tasks (step12 since there repartition of 6)
Stage 4 with 5 tasks
Stage 5 with 200 tasks (step 3 - join step - default repartition on join is 200 )
Stage 6 with 1 task
Stages in Spark represents group of tasks that can be executed together to compute the same operation on multiple machines . In general , Spark will try to pack as much work as possible (ie as many transformations as possible inside your job ) into the same stage , but engine starts new stages after operations called Shuffle.
Shuffle represents a physical repartitioning of the data , this type of repartitioning requires coordinating across executors to move data around . Spark starts a new stage after each shuffle and keeps track of what order the stages must run in to compute the final results.
We can change default 200 values to less by :
spark.conf.set("spark.sql.shuffle.partitions",50)
A good practise is that the number of partitions should be larger than the number of executors on cluster.
Stages in Spark consists of tasks . Each task corresponds to a combination of blocks of data and a set of transformations that will run on a single executor . If there is 1 big partition wee will have1 task but if 1000 little partitions the we will have 1000 task to be computed parallelly (too much ie 1million tasks parallelism is also not good). Task is just a unit of computation applied to a unit of data (the partition ) . Partitioning data into a greater number of partitions means that more can be executed in parallel .
Part 4 :
Pipelining - Unlike MapReduce , spark performs many steps as it can at one point in time before writing data to memory or disk . One of the key optimizations that spark perform is pipelining ,which occurs at and below the RDD level.
With pipelining ,any sequence of operations that feed data directly into each other ,without needing to move it across nodes, is collapsed into a single stage of tasks that do all operations together.
eg : if there is RDD program that does a map, then a filter and then another map ,this all will result in single stage of tasks that immediately read each input record .This pipelined version of computation is much faster than writing the intermediate results to memory or disk after each step .
Part 5 :
Shuffle Persistence - When Spark needs to run an operation that has to move data across nodes, such as reduce-by-key (where input data for each key needs to be first brought together from many nodes) then engine cannot perform pipelining ,instead it performs a cross-network shuffle.
Spark always executes shuffle by first having the "source" tasks (those sending data) write shuffle files to their local disks during their execution stage .Then the stage that does the grouping and reduction launches and runs the tasks that fetch their corresponding records from each shuffle file and performs that computation . Saving the shuffle files to disk lets Spark run this stage later in time than the source stage (eg . if there are not enough executors to run both at the same time ), and also lets the engine re-launch reduce tasks on failure without rerunning all the input tasks. This automatic optimisation can save time in a workload that runs multiple jobs over same data , but of-course for better performance we can perform caching of RDD or DataFrame .
GOOD Practice :
Spark Applications should be resilient to at least some degree of change in input data or otherwise ensure that these failures are handled in a graceful and resilient way .
Output schema must be gracefully handled.
Proper unit testing on business logic.
In production prefer using DF rather than sparkSQL since with use of DF code is scalable in case there is change in source data/tables.
Partition data wherever possible (ie before joins etc ) .
For Unit testing (JUnit or ScalaTest) can be used .
Spark-submit properties :
spark-submit --jars [JAR FILES] \
--py-files [PY_FILES (comma seperated list of .zip,or.py) \
--files (files in working directory of each executor)\
--conf PROP=VALUE \
--driver-memory [2G (default is 1GB) ] \
--executor-memory [2G] \
--verbose \
--driver-core [10 (default is 1)] \
--executor-core [10 (default is 1)] \
--queue [name] \
--keytab [full path to file that contains the keytab]
By default Spark's scheduler runs jobs in FIFO fashion . It is also possible to configure fair sharing between jobs . Under fair sharing ,Spark assigns tasks between jobs in a round-robin fashion so that all the jobs get a roughly equal share of cluster resources .
spark.scheduler.mode=FAIR
Deploying Spark .
There are 2 high-level options for where to deploy spark clusters : deploy in an on-premises cluster or in public cloud (aws,azure,GCP etc).
With On-permise deployment cluster is fixed in size and generally 3rd party is used for maintenance and support eg: CDH (cloudera).Cluster Manager is required as well .
With public clouds resources can be launched and shut down elastically .Also there is option to select difference machines with different CPU/GPU needs.
Cluster Managers :
1 : Standalone Mode - Run multiple Spark Applications on same cluster . Cluster can run only Spark .
2 : YARN - Hadoop YARN is a framework for job scheduling and cluster management . 3 : Mesos - Apache clustering system .
Performance Tuning .
Comments