ListenerBus

Bus 应该一种是对于多个Listener 。
ListenerBus 是运行在Driver 端,消息发送是发生在Driver 或者 Executor 中

CopyOnWriteArrayList 是 List 接口的一个线程安全实现,适用于需要保证线程安全频繁读取和偶尔修改的场景。其基本工作原理是,当对列表进行写操作(如添加、删除、更新元素)时,它会创建一个底层数组的副本,然后在新数组上执行写操作。这种“写时复制”的机制确保了在进行写操作时,不会影响正在进行的读操作,从而实现了线程安全。

参考链接:
https://blog.csdn.net/weixin_43820556/article/details/134582269
https://blog.csdn.net/u010374412/article/details/104354153

SparkListenerBus
/**
 * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
 */
private[spark] trait SparkListenerBus
  extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {

AsyncEventQueue

AsyncEventQueue
这个类的作用是 保存event事件 和 listener 监听器对象,开启另外一个线程完成 事件投递到listener 监听器、完成事件的处理

/**
 * An asynchronous queue for events. All events posted to this queue will be delivered to the child
 * listeners in a separate thread.
 *
 * Delivery will only begin when the `start()` method is called. The `stop()` method should be
 * called when no more events need to be delivered.
 */
private class AsyncEventQueue(
    val name: String,
    conf: SparkConf,
    metrics: LiveListenerBusMetrics,
    bus: LiveListenerBus)
  extends SparkListenerBus
  with Logging {

  import AsyncEventQueue._

  private[scheduler] def capacity: Int = {
    val queueSize = conf.getInt(s"$LISTENER_BUS_EVENT_QUEUE_PREFIX.$name.capacity",
      conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
    assert(queueSize > 0, s"capacity for event queue $name must be greater than 0, " +
      s"but $queueSize is configured.")
    queueSize
  }

  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](capacity)
ReplayListenerBus
/**
 * A SparkListenerBus that can be used to replay events from serialized event data.
 */
private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {

LiveListenerBus

/**
 * Asynchronously passes SparkListenerEvents to registered SparkListeners.
 *
 * Until `start()` is called, all posted events are only buffered. Only after this listener bus
 * has started will events be actually propagated to all attached listeners. This listener bus
 * is stopped when `stop()` is called, and it will drop further events after stopping.
 */
private[spark] class LiveListenerBus(conf: SparkConf) {

  import LiveListenerBus._

  private var sparkContext: SparkContext = _

  private[spark] val metrics = new LiveListenerBusMetrics(conf)

  // Indicate if `start()` is called
  private val started = new AtomicBoolean(false)
  // Indicate if `stop()` is called
  private val stopped = new AtomicBoolean(false)

  private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()

SQLAppStatusListener

SQLAppStatusListener 还会负责 exec 的信息写进 leveldb。
Spark UI 只是从 leveldb 提取数据而已。

  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
    val SparkListenerSQLExecutionEnd(executionId, time, errorMessage) = event
    Option(liveExecutions.get(executionId)).foreach { exec =>
      exec.completionTime = Some(new Date(time))
      exec.errorMessage = errorMessage
      if(exec.errorMessage.isDefined && (!errorMessage.get.isEmpty)) {
        println(s"Private def onExecutionEnd Error message: ${exec.errorMessage.get}")
      }
      update(exec)

      // Aggregating metrics can be expensive for large queries, so do it asynchronously. The end
      // event count is updated after the metrics have been aggregated, to prevent a job end event
      // arriving during aggregation from cleaning up the metrics data.
      kvstore.doAsync {
        exec.metricsValues = aggregateMetrics(exec)
        removeStaleMetricsData(exec)
        exec.endEvents.incrementAndGet()
        update(exec, force = true)
      }
    }
  }

  private def update(exec: LiveExecutionData, force: Boolean = false): Unit = {
    val now = System.nanoTime()
    if (exec.endEvents.get() >= exec.jobs.size + 1) {
      exec.write(kvstore, now)
      removeStaleMetricsData(exec)
      liveExecutions.remove(exec.executionId)
    } else if (force) {
      exec.write(kvstore, now)
    } else if (liveUpdatePeriodNs >= 0) {
      if (now - exec.lastWriteTime > liveUpdatePeriodNs) {
        exec.write(kvstore, now)
      }
    }
  }

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部