1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > Spark-自定义累加器-进行字符串拼接(代码及详细实现步骤)

Spark-自定义累加器-进行字符串拼接(代码及详细实现步骤)

时间:2020-07-08 21:22:17

相关推荐

Spark-自定义累加器-进行字符串拼接(代码及详细实现步骤)

看longAccumulator()方法源码里是val acc = new LongAccumulator然后用register(acc)在Spark中注册了累加器,进入ctrl+鼠标左键进入LongAccumulator,可以看到继承了AccumulatorV2[jl.Long, jl.Long],根据LongAccumulator来实现自定义累加器

实现类

//1.继承父类AccumulatorV2[IN,OUT](IN,OUT是Driver发到Executor的类型与Executor返回给Driver的类型)//2.实现抽象方法//3.创建累加器class WordAccumulator extends AccumulatorV2[String,util.ArrayList[String]] {val list = new util.ArrayList[String]()//当前的累加器是不是初始化状态(这里是判断创建的集合是不是空)override def isZero: Boolean = {list.isEmpty}//复制累加器对象override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {new WordAccumulator}//重置累加器对象(这里把集合清空即可)override def reset(): Unit = {list.clear()}//向累加器中增加数据override def add(v: String): Unit = {if (v.contains("h")){list.add(v)}}//合并累加器(不同executor返回会有个合并的过程)override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {list.addAll(other.value)}//获取累加器的结果override def value: util.ArrayList[String] = list}

然后是main函数

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("CheckPoint").setMaster("local")//创建上下文对象val sc = new SparkContext(conf)val dataRDD:RDD[String] = sc.makeRDD(List("chun1","chun2","chun3","chun4"),2)// TODO 创建累加器val wordAccumulator = new WordAccumulator()// TODO 注册累加器sc.register(wordAccumulator)dataRDD.foreach{case word=>{//TODO 执行累加器的累加功能wordAccumulator.add(word)}}// TODO 获取累加结果println(wordAccumulator.value)}

结果:[chun1, chun2, chun3, chun4]

完整代码

package date_9_23import java.utilimport org.apache.spark.rdd.RDDimport org.apache.spark.util.AccumulatorV2import org.apache.spark.{SparkConf, SparkContext}/*** 自定义累加器*/object Spark4_LongAccumulator {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("CheckPoint").setMaster("local")//创建上下文对象val sc = new SparkContext(conf)val dataRDD:RDD[String] = sc.makeRDD(List("chun1","chun2","chun3","chun4"),2)// TODO 创建累加器val wordAccumulator = new WordAccumulator()// TODO 注册累加器sc.register(wordAccumulator)dataRDD.foreach{case word=>{//TODO 执行累加器的累加功能wordAccumulator.add(word)}}// TODO 获取累加结果println(wordAccumulator.value)}}//声明累加器//1.继承父类AccumulatorV2[IN,OUT](IN,OUT是Driver发到Executor的类型与Executor返回给Driver的类型)//2.实现抽象方法//3.创建累加器class WordAccumulator extends AccumulatorV2[String,util.ArrayList[String]] {val list = new util.ArrayList[String]()//当前的累加器是不是初始化状态(这里是判断创建的集合是不是空)override def isZero: Boolean = {list.isEmpty}//复制累加器对象override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {new WordAccumulator}//重置累加器对象(这里把集合清空即可)override def reset(): Unit = {list.clear()}//向累加器中增加数据override def add(v: String): Unit = {if (v.contains("h")){list.add(v)}}//合并累加器(不同executor返回会有个合并的过程)override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {list.addAll(other.value)}//获取累加器的结果override def value: util.ArrayList[String] = list}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。