ListenerBus
Bus 应该一种是对于多个Listener 。
ListenerBus 是运行在Driver 端,消息发送是发生在Driver 或者 Executor 中
CopyOnWriteArrayList 是 List 接口的一个线程安全实现,适用于需要保证线程安全频繁读取和偶尔修改的场景。其基本工作原理是,当对列表进行写操作(如添加、删除、更新元素)时,它会创建一个底层数组的副本,然后在新数组上执行写操作。这种“写时复制”的机制确保了在进行写操作时,不会影响正在进行的读操作,从而实现了线程安全。
参考链接:
https://blog.csdn.net/weixin_43820556/article/details/134582269
https://blog.csdn.net/u010374412/article/details/104354153
SparkListenerBus
/**
* A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
*/
private[spark] trait SparkListenerBus
extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
AsyncEventQueue
AsyncEventQueue
这个类的作用是 保存event事件 和 listener 监听器对象,开启另外一个线程完成 事件投递到listener 监听器、完成事件的处理
/**
* An asynchronous queue for events. All events posted to this queue will be delivered to the child
* listeners in a separate thread.
*
* Delivery will only begin when the `start()` method is called. The `stop()` method should be
* called when no more events need to be delivered.
*/
private class AsyncEventQueue(
val name: String,
conf: SparkConf,
metrics: LiveListenerBusMetrics,
bus: LiveListenerBus)
extends SparkListenerBus
with Logging {
import AsyncEventQueue._
private[scheduler] def capacity: Int = {
val queueSize = conf.getInt(s"$LISTENER_BUS_EVENT_QUEUE_PREFIX.$name.capacity",
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
assert(queueSize > 0, s"capacity for event queue $name must be greater than 0, " +
s"but $queueSize is configured.")
queueSize
}
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](capacity)
ReplayListenerBus
/**
* A SparkListenerBus that can be used to replay events from serialized event data.
*/
private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
LiveListenerBus
/**
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
*
* Until `start()` is called, all posted events are only buffered. Only after this listener bus
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when `stop()` is called, and it will drop further events after stopping.
*/
private[spark] class LiveListenerBus(conf: SparkConf) {
import LiveListenerBus._
private var sparkContext: SparkContext = _
private[spark] val metrics = new LiveListenerBusMetrics(conf)
// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
SQLAppStatusListener
SQLAppStatusListener 还会负责 exec 的信息写进 leveldb。
Spark UI 只是从 leveldb 提取数据而已。
private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
val SparkListenerSQLExecutionEnd(executionId, time, errorMessage) = event
Option(liveExecutions.get(executionId)).foreach { exec =>
exec.completionTime = Some(new Date(time))
exec.errorMessage = errorMessage
if(exec.errorMessage.isDefined && (!errorMessage.get.isEmpty)) {
println(s"Private def onExecutionEnd Error message: ${exec.errorMessage.get}")
}
update(exec)
// Aggregating metrics can be expensive for large queries, so do it asynchronously. The end
// event count is updated after the metrics have been aggregated, to prevent a job end event
// arriving during aggregation from cleaning up the metrics data.
kvstore.doAsync {
exec.metricsValues = aggregateMetrics(exec)
removeStaleMetricsData(exec)
exec.endEvents.incrementAndGet()
update(exec, force = true)
}
}
}
private def update(exec: LiveExecutionData, force: Boolean = false): Unit = {
val now = System.nanoTime()
if (exec.endEvents.get() >= exec.jobs.size + 1) {
exec.write(kvstore, now)
removeStaleMetricsData(exec)
liveExecutions.remove(exec.executionId)
} else if (force) {
exec.write(kvstore, now)
} else if (liveUpdatePeriodNs >= 0) {
if (now - exec.lastWriteTime > liveUpdatePeriodNs) {
exec.write(kvstore, now)
}
}
}
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » Spark 之 SparkListenerBus
发表评论 取消回复