Stream整合Flume
2021-03-11 22:31
标签:line creat 转化 spl min new col util mem Stream整合Flume 标签:line creat 转化 spl min new col util mem 原文地址:https://www.cnblogs.com/xjqi/p/12831494.html 1 package com.bawei.stream
2
3 import java.net.InetSocketAddress
4
5 import org.apache.spark.storage.StorageLevel
6 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
7 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
8 import org.apache.spark.streaming.{Seconds, StreamingContext}
9 import org.apache.spark.{SparkConf, SparkContext}
10
11
12 object StreamFlume {
13 def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
14 val newCount =runningCount.getOrElse(0)+newValues.sum
15 Some(newCount)
16 }
17
18
19 def main(args: Array[String]): Unit = {
20 //配置sparkConf参数
21 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]")
22 //构建sparkContext对象
23 val sc: SparkContext = new SparkContext(sparkConf)
24 sc.setLogLevel("WARN")
25 //构建StreamingContext对象,每个批处理的时间间隔
26 val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
27 //设置checkpoint
28 scc.checkpoint("C:\\Users\\Desktop\\checkpoint2")
29 //设置flume的地址,可以设置多台
30 val address=Seq(new InetSocketAddress("192.168.182.147",8888))
31 // 从flume中拉取数据
32 val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,address,StorageLevel.MEMORY_AND_DISK)
33
34 //获取flume中数据,数据存在event的body中,转化为String
35 val lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))
36 //实现单词汇总
37 val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)
38
39 result.print()
40 scc.start()
41 scc.awaitTermination()
42 }
43 }