一、广播变量使用

源码中给的例子是:org.apache.spark.examples.BroadcastTest

其中我们关心的只有两行代码,即创建广播变量和使用广播变量

//准备测试数据
val arr1 = (0 until num).toArray
//创建广播变量
val barr1 = sc.broadcast(arr1)
//使用广播变量
val observedSizes = sc.parallelize(1 to 10, slices).map(_ => barr1.value.length)

二、创建广播变量

为了思路清晰,和主线关联不大的代码会剔除掉

1、SparkContext

  private[spark] def env: SparkEnv = _env

  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    //isLocal 是一个 Boolean 类型
    //当设置master 为 local 或者 local[n] 时 为true
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    bc
  }

2、BroadcastManager

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
    //计算 BroadcastId 
    val bid = nextBroadcastId.getAndIncrement()
    //三个参数 :1、数据 2、是否local 3、广播id
    //最终需要构造一个 TorrentBroadcast 对象
    broadcastFactory.newBroadcast[T](value_, isLocal, bid)
  }

3、TorrentBroadcast

机制如下:

Driver 将序列化对象划分为小块,并将这些块存储在Driver 的BlockManager中。

在每个Executor上首先尝试从其BlockManager中获取对象。如果获取不到,则使用远程获取(从Driver 和/或 其他Executor(如果可用)获取小块)。一旦获取成功,就会将块放入自己的BlockManager中,并准备好供其他Executor从中获取。

这可以防止Driver成为发送多个广播数据副本(每个Executor一个)的瓶颈。

private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
  extends Broadcast[T](id) with Logging with Serializable {

  //广播对象的软引用
  //它通过从 Driver 和/或其他 Executor 读取块来构建此值。
  @transient private var _value: SoftReference[T] = _

  //将广播id 封装成 广播块id
  private val broadcastId = BroadcastBlockId(id)

  //构建该对象时就执行 writeBlocks
  private val numBlocks: Int = writeBlocks(obj)

  //每个区块的大小。默认值为4MB。此值仅由 broadcaster 读取
  @transient private var blockSize: Int = _
  
  private def setConf(conf: SparkConf): Unit = {
    //spark.broadcast.compress 默认 true
    //是否在发送广播变量之前对其进行压缩 , 通常都需要压缩 
    compressionCodec = if (conf.get(config.BROADCAST_COMPRESS)) {
      //最终会找 spark.io.compression.codec   默认  lz4
      //用于压缩内部数据的编解码器,如RDD分区、事件日志、广播变量和洗牌输出。
      //默认情况下,Spark提供四种编解码器:lz4、lzf、snappy和zstd。还可以使用完全限定的类名来指定编解码器
      //‌LZ4压缩算法以其极高的压缩和解压缩速度而闻名,但压缩比并不突出。‌
      Some(CompressionCodec.createCodec(conf))
    } else {
      None
    }
    //spark.broadcast.blockSize   默认 4m
    //TorrentBroadcastFactory每块块的大小,
    //值太大会降低广播过程中的并行性(使其变慢);
    //但是,如果它太小,BlockManager的性能可能会受到影响
    blockSize = conf.get(config.BROADCAST_BLOCKSIZE).toInt * 1024
    //spark.broadcast.checksum  默认 true
    //是否启用广播校验和。
    //如果启用,广播将包括一个校验和,这可以帮助检测损坏的块,
    //但代价是计算和发送更多的数据。如果网络有其他机制来保证数据在广播过程中不会损坏,则可以禁用它
    checksumEnabled = conf.get(config.BROADCAST_CHECKSUM)
  }
  setConf(SparkEnv.get.conf)


  //将对象划分为多个块,并将这些块放入块管理器中。
  private def writeBlocks(value: T): Int = {
    import StorageLevel._
    // 在Driver中存储广播变量的副本,这样在Driver上运行的Task就不会创建广播变量值的重复副本。
    //因为是 Driver 上的Task 用因此没有序列化,是直接放到了内存和磁盘
    val blockManager = SparkEnv.get.blockManager
    if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
      throw new SparkException(s"Failed to store $broadcastId in BlockManager")
    }
    //如果 blockManager 放成功了 继续向下执行
    try {
      //获取支持此ChunkedByteBuffer的ByteBuffers的副本。
      val blocks =
        TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
      //构建检验和
      if (checksumEnabled) {
        checksums = new Array[Int](blocks.length)
      }
      blocks.zipWithIndex.foreach { case (block, i) =>
        //计算每个块的校验和
        if (checksumEnabled) {
          checksums(i) = calcChecksum(block)
        }
        //为每个块计算一个id
        val pieceId = BroadcastBlockId(id, "piece" + i)
        //只读字节缓冲区,物理上存储为多个块,而不是单个连续数组
        val bytes = new ChunkedByteBuffer(block.duplicate())
        //调用blockManager 存储每个块 存储级别时 内存和磁盘 并序列化 (因为要分发,因此要序列化)
        if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
          throw new SparkException(s"Failed to store $pieceId of $broadcastId " +
            s"in local BlockManager")
        }
      }
      blocks.length
    } catch {

    }
  }

}

三、使用广播变量

1、Broadcast

  def value: T = {
    getValue()
  }
  //调用 TorrentBroadcast 获取
  protected def getValue(): T

2、TorrentBroadcast

  //广播对象的软引用
  //它通过从 Driver 和/或其他 Executor 读取块来构建此值。
  @transient private var _value: SoftReference[T] = _

  override protected def getValue() = synchronized {
    //如果有就直接获取,如果没有拉过来再获取
    val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
    if (memoized != null) {
      memoized
    } else {
      val newlyRead = readBroadcastBlock()
      _value = new SoftReference[T](newlyRead)
      newlyRead
    }
  }

 private def readBroadcastBlock(): T = Utils.tryOrIOException {
    TorrentBroadcast.torrentBroadcastLock.withLock(broadcastId) {
      //由于我们只根据“broadcastId”进行锁定,因此每当使用“broadcast Cache”时,我们应该只触摸“broadcastId”。
      //获取 broadcastManager 的缓存 因为存的时候选择的级别是  MEMORY_AND_DISK_SER
      val broadcastCache = SparkEnv.get.broadcastManager.cachedValues

      //根据 broadcastId 从broadcastCache 获取块
      Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
        setConf(SparkEnv.get.conf)
        val blockManager = SparkEnv.get.blockManager
        //从本地块管理器获取块作为Java对象的迭代器。
        blockManager.getLocalValues(broadcastId) match {
          case Some(blockResult) =>
            if (blockResult.data.hasNext) {
              val x = blockResult.data.next().asInstanceOf[T]
              releaseBlockManagerLock(broadcastId)

              //如果从本地块管理取到了值,就将值返回,并放到缓存中
              if (x != null) {
                broadcastCache.put(broadcastId, x)
              }

              x
            } else {
              throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
            }
          case None =>
            //本地块管理中没有该广播id对应的块数据
            val estimatedTotalSize = Utils.bytesToString(numBlocks * blockSize)
            logInfo(s"Started reading broadcast variable $id with $numBlocks pieces " +
              s"(estimated total size $estimatedTotalSize)")
            val startTimeNs = System.nanoTime()
            //从Driver和/或其他Executor获取torrent块
            //请注意,所有这些块都存储在BlockManager中并报告给Driver,因此其他Executor也可以从该Executor中提取这些块。
            val blocks = readBlocks()
            logInfo(s"Reading broadcast variable $id took ${Utils.getUsedTimeNs(startTimeNs)}")

            try {
              val obj = TorrentBroadcast.unBlockifyObject[T](
                blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)
              // 将合并的副本存储在BlockManager中,这样此Executor上的其他Task就不需要重新获取它。
              val storageLevel = StorageLevel.MEMORY_AND_DISK
              //写一个由单个对象组成的块 ,该Executor其他Task使用时直接全量获取广播变量
              if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
                throw new SparkException(s"Failed to store $broadcastId in BlockManager")
              }

              if (obj != null) {
                broadcastCache.put(broadcastId, obj)
              }

              obj
            } finally {
              blocks.foreach(_.dispose())
            }
        }
      }
    }
  }

四、总结

创建广播变量

        1、创建广播变量id

        2、创建TorrentBroadcast

        3、TorrentBroadcast会自动调用writeBlocks将数据写入到BlockManager(这里会写两份数据,一份是整体的结果用于本Executor使用广播变量时使用,存储级别是MEMORY_AND_DISK,一份是拆分成块用于其他Executor拉取使用,存储级别是MEMORY_AND_DISK_SER)

使用广播变量

        1、因为本身广播变量的类型就是TorrentBroadcast,因此直接调用其getValue方法获取

        2、查看本类_value属性是否有值,如果有值直接返回(加速同Task反复调用)

        3、查看缓存是否有值(BroadcastManager.cachedValues)如果有值直接返回(加速同Task反复调用)

        4、查看本Executor的BlockManager是否有该广播变量整体的数据,如果有值直接返回(加速同Executor的不同Task使用)

        5、从Driver或者其他Excutor获取该广播变量的块序列数据

        如果获取到广播变量的数据都会向本类_value属性、缓存、本Executor的BlockManager设置或存入整体数据用于重复使用时加速

广播变量的创建以及使用流程如下:(下载放大后就很清晰哟)

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部