04-240606Spark笔记

1.行动算子-2

  • save相关算子:

格式:

def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
 path: String,
 codec: Option[Class[_ <: CompressionCodec]] = None): Unit

例子:

  val rdd = sc.makeRDD(List(
    ("a",1),("a",2),("a",3)
  ))
​
    rdd.saveAsTextFile("output")
    rdd.saveAsObjectFile("output1")
    // saveAsSequenceFile方法要求数据的格式必须为K-V类型
    rdd.saveAsSequenceFile("output2")

输出结果:

image-20240604225213130

  • foreach

格式:

def foreach(f: T => Unit): Unit = withScope {
 val cleanF = sc.clean(f)
 sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

例子:

    val rdd = sc.makeRDD(List(1,2,3,4))
​
    //foreach 其实是Driver端内存集合的循环遍历方法
    rdd.collect().foreach(println) //Driver
    println("***************")
    // foreach 其实是Executor端内存数据打印
    rdd.foreach(println)    // Executor
    // 算子 : Operator(操作)
    //         RDD的方法和Scala集合对象的方法不一样
    //         集合对象的方法都是在同一个节点的内存中完成的。
    //         RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
    //         为了区分不同的处理效果,所以将RDD的方法称之为算子。
    //        RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。

输出结果:

image-20240604232824753

2. 序列化

2.1 闭包检测
  • 闭包检测

因为Driver需要给两个Executor共享User方法,共享就需要序列化

案例:

  def main(args: Array[String]): Unit = {
​
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
​
    val rdd = sc.makeRDD(List[Int]())
​
    val user = new User()
​
    // SparkException: Task not serializable
    // NotSerializableException: com.atguigu.bigdata.spark.core.rdd.operator.action.Spark07_RDD_Operator_Action$User
​
    // RDD算子中传递的函数是会包含闭包操作,那么就会进行检测功能
    // 闭包检测
    rdd.foreach(
      num => {
        println("age = " + (user.age + num))
      }
    )
​
    sc.stop()
​
  }
  //class User extends Serializable {
  // 样例类在编译时,会自动混入序列化特质(实现可序列化接口)
  //case class User() {
  class User {
    var age : Int = 30
  }
  • RDD 的分区器

自己来写分区器:

    def main(args: Array[String]): Unit = {
        val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.makeRDD(List(
            ("nba", "xxxxxxxxx"),
            ("cba", "xxxxxxxxx"),
            ("wnba", "xxxxxxxxx"),
            ("nba", "xxxxxxxxx"),
        ),3)
        val partRDD: RDD[(String, String)] = rdd.partitionBy( new MyPartitioner )
​
        partRDD.saveAsTextFile("output")
​
        sc.stop()
    }

自定义的分区器:

    class MyPartitioner extends Partitioner{
        // 分区数量
        override def numPartitions: Int = 3
​
        // 根据数据的key值返回数据所在的分区索引(从0开始)
        override def getPartition(key: Any): Int = {
            key match {
                case "nba" => 0
                case "wnba" => 1
                case _ => 2
            }
        }
    }
* 自定义分区器
* 1. 继承Partitioner
* 2. 重写方法

输出结果:

image-20240605170312913

image-20240605170321664

  • RDD 文件读取与保存

案例1:

    def main(args: Array[String]): Unit = {
        val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.textFile("output1")
        println(rdd.collect().mkString(","))
​
        val rdd1 = sc.objectFile[(String, Int)]("output2")
        println(rdd1.collect().mkString(","))
​
        val rdd2 = sc.sequenceFile[String, Int]("output3")
        println(rdd2.collect().mkString(","))
​
        sc.stop()
    }

输出结果:

image-20240605170535800

案例2:

    def main(args: Array[String]): Unit = {
        val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.makeRDD(
            List(
                ("a", 1),
                ("b", 2),
                ("c", 3)
            )
        )
​
        rdd.saveAsTextFile("output1")
        rdd.saveAsObjectFile("output2")
        rdd.saveAsSequenceFile("output3")
​
        sc.stop()
    }

输出结果:

image-20240605170643956

1. 数据结构:

image-20240605170954358

  • 累加器

累加器用来把 Executor 端变量信息聚合到 Driver 端。

![image-20240605202228850](E:\Files2\Typictures\image-20240605202228850.png

image-20240605202424331

Acc,累加器可以把Excutor端的数据返回到Driver中去:

image-20240605202543334

案例:

    def main(args: Array[String]): Unit = {
​
        val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.makeRDD(List(1,2,3,4))
​
        // reduce : 分区内计算,分区间计算
        //val i: Int = rdd.reduce(_+_)
        //println(i)
        var sum = 0
        rdd.foreach(
            num => {
                sum += num
            }
        )
        println("sum = " + sum)
​
        sc.stop()
​
    }
  • 系统累加器

案例:

    def main(args: Array[String]): Unit = {
​
        val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.makeRDD(List(1,2,3,4))
​
        // 获取系统累加器
        // Spark默认就提供了简单数据聚合的累加器
        val sumAcc = sc.longAccumulator("sum")
​
        //sc.doubleAccumulator
        //sc.collectionAccumulator
​
        rdd.foreach(
            num => {
                // 使用累加器
                sumAcc.add(num)
            }
        )
​
        // 获取累加器的值
        println(sumAcc.value)
​
        sc.stop()
​
    }

累加器的一些特殊情况:

少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
一般情况下,累加器会放置在行动算子进
    def main(args: Array[String]): Unit = {
​
        val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.makeRDD(List(1,2,3,4))
​
        // 获取系统累加器
        // Spark默认就提供了简单数据聚合的累加器
        val sumAcc = sc.longAccumulator("sum")
​
        //sc.doubleAccumulator
        //sc.collectionAccumulator
​
        val mapRDD = rdd.map(
            num => {
                // 使用累加器
                sumAcc.add(num)
                num
            }
        )
​
        // 获取累加器的值
        // 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
        // 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
        // 一般情况下,累加器会放置在行动算子进行操作
        mapRDD.collect()
        mapRDD.collect()
        println(sumAcc.value)
​
        sc.stop()
​
    }
  • 自定义累加器

分布式共享只写变量

案例:

    def main(args: Array[String]): Unit = {
​
        val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
        val sc = new SparkContext(sparConf)
​
        val rdd = sc.makeRDD(List("hello", "spark", "hello"))
​
        // 累加器 : WordCount
        // 创建累加器对象
        val wcAcc = new MyAccumulator()
        // 向Spark进行注册
        sc.register(wcAcc, "wordCountAcc")
​
        rdd.foreach(
            word => {
                // 数据的累加(使用累加器)
                wcAcc.add(word)
            }
        )
​
        // 获取累加器累加的结果
        println(wcAcc.value)
​
        sc.stop()
​
    }
    /*
      自定义数据累加器:WordCount
​
      1. 继承AccumulatorV2, 定义泛型
         IN : 累加器输入的数据类型 String
         OUT : 累加器返回的数据类型 mutable.Map[String, Long]
​
      2. 重写方法(6)
     */
    class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
​
        private var wcMap = mutable.Map[String, Long]()
​
        // 判断是否初始状态
        override def isZero: Boolean = {
            wcMap.isEmpty
        }
​
        override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
            new MyAccumulator()
        }
​
        override def reset(): Unit = {
            wcMap.clear()
        }
​
        // 获取累加器需要计算的值
        override def add(word: String): Unit = {
            val newCnt = wcMap.getOrElse(word, 0L) + 1
            wcMap.update(word, newCnt)
        }
​
        // Driver合并多个累加器
        override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
​
            val map1 = this.wcMap
            val map2 = other.value
​
            map2.foreach{
                case ( word, count ) => {
                    val newCount = map1.getOrElse(word, 0L) + count
                    map1.update(word, newCount)
                }
            }
        }
​
        // 累加器结果
        override def value: mutable.Map[String, Long] = {
            wcMap
        }
    }
  • 广播变量

实现原理:

广播变量用来高效分发较大的对象。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务

分别发送。

案例:

    def main(args: Array[String]): Unit = {
​
        val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
        val sc = new SparkContext(sparConf)
​
        val rdd1 = sc.makeRDD(List(
            ("a", 1),("b", 2),("c", 3)
        ))
//        val rdd2 = sc.makeRDD(List(
//            ("a", 4),("b", 5),("c", 6)
//        ))
        val map = mutable.Map(("a", 4),("b", 5),("c", 6))
​
​
​
        // join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用
        //val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
        //joinRDD.collect().foreach(println)
        // (a, 1),    (b, 2),    (c, 3)
        // (a, (1,4)),(b, (2,5)),(c, (3,6))
        rdd1.map {
            case (w, c) => {
                val l: Int = map.getOrElse(w, 0)
                (w, (c, l))
            }
        }.collect().foreach(println)
​
​
​
        sc.stop()
​
    }

join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用

image-20240606162528164

Spark 中的广播变量就可以将闭包的数据保存到Executor的内存中

Spark 中的广播变量不能更改 : 分布式共享只读变量

image-20240606162609035

封装广播变量1

案例:

    def main(args: Array[String]): Unit = {
​
        val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
        val sc = new SparkContext(sparConf)
​
        val rdd1 = sc.makeRDD(List(
            ("a", 1),("b", 2),("c", 3)
        ))
        val map = mutable.Map(("a", 4),("b", 5),("c", 6))
​
        // 封装广播变量
        val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
​
        rdd1.map {
            case (w, c) => {
                // 方法广播变量
                val l: Int = bc.value.getOrElse(w, 0)
                (w, (c, l))
            }
        }.collect().foreach(println)
​
​
​
        sc.stop()
​
    }

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部