1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > 大数据Spark(三十六):SparkStreaming实战案例一 WordCount

大数据Spark(三十六):SparkStreaming实战案例一 WordCount

时间:2021-01-30 20:03:59

相关推荐

大数据Spark(三十六):SparkStreaming实战案例一 WordCount

目录

SparkStreaming实战案例一 WordCount

需求

准备工作

代码实现

第一种方式:构建SparkConf对象

第二种方式:构建SparkContext对象

完整代码如下所示:

应用监控

其一、Streaming流式应用概要信息

其二、性能衡量标准

SparkStreaming实战案例一 WordCount

需求

从TCP Socket数据源实时消费数据,对每批次Batch数据进行词频统计WordCount,流程图如下:

准备工作

1.在node01上安装nc命令

nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据

yum install -y nc

2.在node01启动客户端工具发送消息

nc -lk 9999

代码实现

/docs/latest/streaming-programming-guide.html

从官方文档可知,提供两种方式构建StreamingContext实例对象,如下:

第一种方式:构建SparkConf对象

第二种方式:构建SparkContext对象

完整代码如下所示:

package cn.itcast.streamingimport mons.lang3.StringUtilsimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 基于IDEA集成开发环境,编程实现从TCP Socket实时读取流式数据,对每批次中数据进行词频统计。*/object SparkStreamingDemo01_WordCount {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//batchDuration the time interval at which streaming data will be divided into batches//流数据将被划分为批的时间间隔,就是每隔多久对流数据进行一次微批划分!val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node1", 9999)val resultDStream: DStream[(String, Int)] = inputDStream.filter(StringUtils.isNotBlank(_)).flatMap(_.trim.split("\\s+")).map((_, 1)).reduceByKey(_ + _)resultDStream.print(10)// 启动并等待程序停止// 对于流式应用来说,需要启动应用ssc.start()// 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止ssc.awaitTermination()// 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)ssc.stop(stopSparkContext = true, stopGracefully = true)//注意://上面的代码可以做WordCount,但是只能对当前批次的数据进行累加!}}

应用监控

运行上述词频统计案例,登录到WEB UI监控页面:http://localhost:4040/

查看相关监控信息。

其一、Streaming流式应用概要信息

运行结果监控截图:

每批次Batch数据处理总时间TD = 批次调度延迟时间SD + 批次数据处理时间PT

其二、性能衡量标准

SparkStreaming实时处理数据性能如何(是否可以实时处理数据)??如何衡量的呢??

需要满足:

每批次数据处理时间TD <= BatchInterval每批次时间间隔

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。