博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark源码分析 – SchedulerBackend
阅读量:4511 次
发布时间:2019-06-08

本文共 9828 字,大约阅读时间需要 32 分钟。

SchedulerBackend, 两个任务, 申请资源和task执行和管理

对于SparkDeploySchedulerBackend, 基于actor模式, 主要就是启动和管理两个actor

Deploy.Client Actor, 负责资源申请, 在SparkDeploySchedulerBackend初始化的时候就会被创建, 然后Client会去到Master上注册, 最终完成在Worker上的ExecutorBackend的创建(参考, ), 并且这些ExecutorBackend都会被注册到Driver Actor上
Driver Actor, 负责task的执行
由于Spark是原先基于Mesos的, 然后为了兼容性才提供Standalone模式, 所以你可以看到Driver Actor中的接口都是mesos风格的, 在mesos的情况下应该是动态的申请资源, 然后执行task (猜测, 还没有看源码)
但对于coarse-grained Mesos mode和Spark's standalone deploy mode, 这步被简化成当TaskScheduler初始化的时候, 直接就将资源分配好了, 然后Driver Actor只是负责调度task在这些executor上执行
所以在makeOffers的注释上, 写的是Make fake resource offers, 因为这里其实没有真正的offer resources
关于Driver Actor如何调用task去执行, 关键在scheduler.resourceOffers

SchedulerBackend

package org.apache.spark.scheduler.cluster/** * A backend interface for cluster scheduling systems that allows plugging in different ones under * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as * machines become available and can launch tasks on them. */private[spark] trait SchedulerBackend {  def start(): Unit  def stop(): Unit  def reviveOffers(): Unit  def defaultParallelism(): Int  // Memory used by each executor (in megabytes)  protected val executorMemory: Int = SparkContext.executorMemoryRequested  // TODO: Probably want to add a killTask too}

 

StandaloneSchedulerBackend

用于coarse-grained Mesos mode和Spark's standalone deploy mode

可用看到主要目的, 就是创建并维护driverActor
主要的逻辑都在driverActor 中

/** * A standalone scheduler backend, which waits for standalone executors to connect to it through * Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*). */private[spark]class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)  extends SchedulerBackend with Logging{  // Use an atomic variable to track total number of cores in the cluster for simplicity and speed  var totalCoreCount = new AtomicInteger(0)  class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
// ……后面分析  }  var driverActor: ActorRef = null  val taskIdsOnSlave = new HashMap[String, HashSet[String]]  override def start() {    val properties = new ArrayBuffer[(String, String)]    val iterator = System.getProperties.entrySet.iterator    while (iterator.hasNext) {      val entry = iterator.next      val (key, value) = (entry.getKey.toString, entry.getValue.toString)      if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {        properties += ((key, value))      }    }    driverActor = actorSystem.actorOf( // 关键就是创建driverActor      Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)  }  private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")  override def stop() {    try {      if (driverActor != null) {        val future = driverActor.ask(StopDriver)(timeout) // 关闭driverActor        Await.result(future, timeout)      }    } catch {      case e: Exception =>        throw new SparkException("Error stopping standalone scheduler's driver actor", e)    }  }  override def reviveOffers() {    driverActor ! ReviveOffers  // 发送ReviveOffers event给driverActor   }  override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))      .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))  // Called by subclasses when notified of a lost worker  def removeExecutor(executorId: String, reason: String) {    try {      val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)      Await.result(future, timeout)    } catch {      case e: Exception =>        throw new SparkException("Error notifying standalone scheduler's driver actor", e)    }  }}

DriverActor

关键的函数, makeOffers, 在executors上launch tasks, 什么时候调用?

RegisterExecutor的时候,
Task StatusUpdate的时候,
收到ReviveOffers event的时候, 新的task被submit的时候, delay scheduling被触发的时候(per second)
关于delay scheduling, 应该是为了保持活度, 当没有任何状态变化时, 仍然需要继续保持launch tasks

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {    private val executorActor = new HashMap[String, ActorRef] // track所有executorActor Ref    private val executorAddress = new HashMap[String, Address]    private val executorHost = new HashMap[String, String]    private val freeCores = new HashMap[String, Int]    private val actorToExecutorId = new HashMap[ActorRef, String]    private val addressToExecutorId = new HashMap[Address, String]    override def preStart() {      // Listen for remote client disconnection events, since they don't go through Akka's watch()      context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])      // Periodically revive offers to allow delay scheduling to work      val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong      context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)    }    def receive = {      case RegisterExecutor(executorId, hostPort, cores) =>  // 接收从StandaloneExecutorBackend发来的RegisterExecutor        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)        if (executorActor.contains(executorId)) {          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)        } else {          logInfo("Registered executor: " + sender + " with ID " + executorId)          sender ! RegisteredExecutor(sparkProperties)          context.watch(sender) // watch executor actor          executorActor(executorId) = sender          executorHost(executorId) = Utils.parseHostPort(hostPort)._1          freeCores(executorId) = cores          executorAddress(executorId) = sender.path.address          actorToExecutorId(sender) = executorId          addressToExecutorId(sender.path.address) = executorId          totalCoreCount.addAndGet(cores)          makeOffers()        }      case StatusUpdate(executorId, taskId, state, data) =>        scheduler.statusUpdate(taskId, state, data.value)        if (TaskState.isFinished(state)) {          freeCores(executorId) += 1          makeOffers(executorId)        }      case ReviveOffers => // 接收从StandaloneSchedulerBackend发来的ReviveOffers         makeOffers()      case StopDriver =>        sender ! true        context.stop(self)      case RemoveExecutor(executorId, reason) =>        removeExecutor(executorId, reason)        sender ! true      case Terminated(actor) =>        actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))      case RemoteClientDisconnected(transport, address) =>        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))      case RemoteClientShutdown(transport, address) =>        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))    }    // Make fake resource offers on all executors    def makeOffers() {      launchTasks(scheduler.resourceOffers(        executorHost.toArray.map {
case (id, host) => new WorkerOffer(id, host, freeCores(id))})) } // Make fake resource offers on just one executor // 可以看到这里传给scheduler.resourceOffers的WorkOffer,是根据之前已经分布好的executor静态生成的 // 而不是动态得到的workeroffer, 如果用mesos, 这里应该是动态获取workeroffer, 然后传给scheduler.resourceOffers def makeOffers(executorId: String) { launchTasks(scheduler.resourceOffers( Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) } // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { freeCores(task.executorId) -= 1 executorActor(task.executorId) ! LaunchTask(task) // launch就是给executorActor发送LaunchTask event } } // Remove a disconnected slave from the cluster def removeExecutor(executorId: String, reason: String) { if (executorActor.contains(executorId)) { logInfo("Executor " + executorId + " disconnected, so removing it") val numCores = freeCores(executorId) actorToExecutorId -= executorActor(executorId) addressToExecutorId -= executorAddress(executorId) executorActor -= executorId executorHost -= executorId freeCores -= executorId totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) } } }

 

SparkDeploySchedulerBackend

关键就是创建和管理Driver and Client Actor

private[spark] class SparkDeploySchedulerBackend(    scheduler: ClusterScheduler,    sc: SparkContext,    master: String,    appName: String)  extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)  with ClientListener  with Logging {  var client: Client = null
override def start() {    super.start() // 调用StandaloneSchedulerBackend的start,创建DriverActor    // The endpoint for executors to talk to us    val driverUrl = "akka://spark@%s:%s/user/%s".format(      System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),      StandaloneSchedulerBackend.ACTOR_NAME)    val args = Seq(driverUrl, "{
{EXECUTOR_ID}}", "{
{HOSTNAME}}", "{
{CORES}}") val command = Command( // 生成worker中ExecutorRunner中执行的command, 其实就是运行StandaloneExecutorBackend "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(null) val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome, "http://" + sc.ui.appUIAddress) // 生成application description client = new Client(sc.env.actorSystem, master, appDesc, this) // 创建Client Actor, 并start client.start() } override def stop() { stopping = true super.stop() client.stop() if (shutdownCallback != null) { shutdownCallback(this) } }}

转载于:https://www.cnblogs.com/fxjwind/p/3504052.html

你可能感兴趣的文章
load data with matlab
查看>>
ctypes调用dll的参数问题
查看>>
微信支付接口的调用(转)
查看>>
XSS攻击
查看>>
浅谈Sql各种join的用法
查看>>
Durid数据库连接池配置(不使用框架)
查看>>
BarCode128B字符转换函数(PB,SQL)
查看>>
watir学习资料
查看>>
Jmeter属性和变量
查看>>
java并发编程:并发容器之CopyOnWriteArrayList(转)
查看>>
python基础——面向对象进阶下
查看>>
Linux vi 命令详解
查看>>
本地如何搭建IPv6环境测试你的APP
查看>>
oracle、mysql新增字段,字段存在则不处理
查看>>
C++ NULL与nullptr的区别
查看>>
Discretized Streams, 离散化的流数据处理
查看>>
Spark源码分析 – SchedulerBackend
查看>>
黑马程序员 Java输入\输出
查看>>
python字符串处理
查看>>
live555学习笔记4-计划任务(TaskScheduler)深入探讨
查看>>