Spark Architecture

Spark can be launched in different modes and each of this mode has different architecture.

  1. Local: Single JVM
  2. Standalone: from datastax (Static Allocation)
  3. YARN: from Hadoop (Dynamic Allocation)
  4. Mesos: Spark’s own Arch (Dynamic Allocation)

In these, 2,3,4 are distributed architectures. Standalone and Mesos architectures are similar to that of YARN.

https://tekmarathon.com/2017/02/13/hadoop-2-x-architecture/

In YARN, there are two different modes

  • Spark YARN Client Mode Architecture: This is for Spark in Scala/Python shell (aka Interactive Mode). Here Spark Driver will be run in the Edge Node and if the Driver Program is killed or edge node crashes, the application gets killed.
  • Spark YARN Cluster Mode Architecture: This is when user submits spark application using spark-submit. Here the Spark Driver is initiated inside the Application Master.

Unlike Hadoop Driver Program, Spark Driver is also responsible for

  • DAG Scheduler and Task Scheduler: Once Executors are launched inside Containers, they will have direct communication with this Scheduler. This play far more important role than that of the YARN Scheduler in Spark Applications.
  • Spark UI: UI with application DAG, Jobs and Stages are all served by the Spark Driver.

Spark Terminology – Nodes, Containers, Executors, Cores/Slots, Tasks, Partitions, Jobs, Stages

  • Spark cluster can be formed with ‘n’ Nodes.
  • Each Node can have 1+ containers. Number of containers are decided based on the min and max container memory limits in yarn-site.xml.
  • Each Container must have exactly 1 Executor JVM.
  • Each Executor can have 1+ Slots (aka Cores). The minimum slots required for Spark application are 2. Recommended range is between 8-32. We can choose a maximum of 2-3x times actual physical cores on a node.
  • Tasks are run inside the Slots.Task is a unit of work assigned to Executor core/slot by the Task Scheduler.
  • Partition is a block of data(like blocks in HDFS file). Spark RDD is split into 1+ partitions. Each Partition requires one thread of computation (aka Task) and hence an RDD with ‘n’ partitions requires ‘n’ Tasks to perform any Transformation.
  • Jobs: A Spark Application is split into ‘n’ Jobs based on number of Actions inside it. Basically for every Action a Job will be launched.
  • Stages: A Job is divided into ‘m’ Stages. A Stage is a group that can be put together based on operations, for example: map() and filter() can put together into a stage. And this Stage is finally split into ‘n’ Tasks.

Dynamic Allocation in Spark

Dynamic Allocation is a spark feature that allows addition or removal of executors launched by the application dynamically to match the workload.

Unlike static allocation of resources (prior to 1.6.0) where spark used to reserve fixed amount of CPU and Memory resources, in Dynamic Allocation its purely based on the workload.

Note: This is the main difference between Spark Standalone Architecture (static allocation) and Spark YARN/Mesos Architecture.

 
// flag to enable/diable DA feature
spark.dynamicAllocation.enabled: true/false

// Application starts with this many executors
spark.dynamicAllocation.minExecutors: m

// Application can increase to this many executors at max
spark.dynamicAllocation.maxExecutors: n

// For the FIRST Time when this time is hit, number of executors 
// will be increased
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout: x secs

// Next Time onwards, whenever this time is hit, it increases 
// number of executors till maxExecutors is hit
spark.dynamicAllocation.schedulerBacklogTimeout: y secs

// It releases the executor when it sees no Task is scheduled 
// on Executor for this time
spark.dynamicAllocation.executorIdleTimeout: z secs

There were few issues with dynamic allocation in Streaming Applications because

  • Executors may never be idle as they run for every N secs
  • Receiver will be running on a Slot/Core inside Executor which is never finished and hence idleTimeout will never be hit

https://issues.apache.org/jira/browse/SPARK-12133

 
// For streaming applications, disable above switch and enable below one
spark.streaming.dynamicAllocation.enabled

Tuning Spark Applications

Tuning performance of  Spark Applications can be done at various stages

  • OS Level
  • JVM Level
  • YARN Level
  • Spark Level

OS Level
In yarn-site.xml, we can allocate physical and virtual memory for a container initialized the node.
https://tekmarathon.com/2017/02/13/important-yarn-configuration-properties/

JVM Level
We can look at the performance of the JVM Garbage Collection and then fine tune GC parameters
./bin/spark-submit –name “My app” –master yarn –conf spark.eventLog.enabled=false –conf “spark.executor.extraJavaOptions=-XX:OldSize=100M -XX:MaxNewSize=100M -XX:+PrintGCDetails -XX:+PrintGCTimeStamps” myApp.jar
http://www.oracle.com/technetwork/java/javase/tech/vmoptions-jsp-140102.html

YARN Level
While submitting the job, we can control
Number of executors (Executor is run inside container and 1 Executor per Container)
Memory for each executor
Number of cores for each executor (This value can be raised to a maximum of 2x times the actual cores, but beaware that it can also raise the bar for memory)
Memory Overhead
./bin/spark-submit –name “My app” –master yarn –num-executors 8 –executor-memory 4G –executor-cores 16 –conf “spark.yarn.executor.memoryOverhead=1024M” myApp.jar
https://hadoop.apache.org/docs/r2.7.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml

Spark Level
Prior to Spark 1.6.0, executor memory (spark.executor.memory) was split into two different pools
Storage Memory: Where it caches RDDs
Execution Memory: Where it holds execution objects
From 1.6.0 onwards, they are combined into a unified pool and there is no hard line split between the two. It is dynamically decided at run time on ratio of memory allocation for these two pools.

memory_allocation
https://0x0fff.com/spark-memory-management/

Based on all the above factors, we should target tuning the memory settings based on

  • Objectives (EFFICIENCY vs RELIABILITY) and
  •  Workloads (whether its a BATCH/STREAMING)


spark_mem_mgmt

Some TIPS:

    • Cost of garbage collection is directly proportional to the number of objects hence try to reduce number of objects (for example use Array(int) instead of List)
    • For Batch Applications use default GC (ParallelGC) and for Streaming Applications use ConcMarkSweepGC
// BATCH Apps: default GC
-XX:+UseParallelGC -XX:ParallelGCThreads=<#>

// Streaming Apps
-XX:+UseConcMarkSweepGC -XX:ParallelCMSThreads=<#>
OR
// G1 GC Available from Java7, which is considered as good replacement to CMS
--XX:+UseG1GC
    • KRYO Serialization: This is 10x times faster than Java Serialization. In general, for 1G disk file, it takes 2-3G to store it into memory which is is the cost of Java Serialization.
conf.set("spark.serializer", "org.apache.spark.serializer.KyroSer");
// We need to register our custom classes with KYRO Serializer
  • TACHYON: Use tachyon for off-heap storage. The advantage is that even if the Executor JVM crashes, it stays in the OFF_HEAP storage.
 

References:
https://www.youtube.com/watch?v=dTR30Fy02Yo&t=19s

Mawazo

Mostly technology with occasional sprinkling of other random thoughts

amintabar

Amir Amintabar's personal page

101 Books

Reading my way through Time Magazine's 100 Greatest Novels since 1923 (plus Ulysses)

Seek, Plunnge and more...

My words, my world...

ARRM Foundation

Do not wait for leaders; do it alone, person to person - Mother Teresa

Executive Management

An unexamined life is not worth living – Socrates

javaproffesionals

A topnotch WordPress.com site

thehandwritinganalyst

Just another WordPress.com site

coding algorithms

"An approximate answer to the right problem is worth a good deal more than an exact answer to an approximate problem." -- John Tukey

%d bloggers like this: