Stream整合Flume

2021-03-11 22:31

阅读:319

标签:line   creat   转化   spl   min   new   col   util   mem   

 1 package com.bawei.stream
 2 
 3 import java.net.InetSocketAddress
 4 
 5 import org.apache.spark.storage.StorageLevel
 6 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
 7 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
 8 import org.apache.spark.streaming.{Seconds, StreamingContext}
 9 import org.apache.spark.{SparkConf, SparkContext}
10 
11 
12 object StreamFlume {
13   def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
14     val newCount =runningCount.getOrElse(0)+newValues.sum
15     Some(newCount)
16   }
17 
18 
19   def main(args: Array[String]): Unit = {
20     //配置sparkConf参数
21     val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]")
22     //构建sparkContext对象
23     val sc: SparkContext = new SparkContext(sparkConf)
24     sc.setLogLevel("WARN")
25     //构建StreamingContext对象,每个批处理的时间间隔
26     val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
27     //设置checkpoint
28     scc.checkpoint("C:\\Users\\Desktop\\checkpoint2")
29     //设置flume的地址,可以设置多台
30     val address=Seq(new InetSocketAddress("192.168.182.147",8888))
31     // 从flume中拉取数据
32     val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,address,StorageLevel.MEMORY_AND_DISK)
33 
34     //获取flume中数据,数据存在event的body中,转化为String
35     val lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))
36     //实现单词汇总
37     val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)
38 
39     result.print()
40     scc.start()
41     scc.awaitTermination()
42   }
43 }

 

Stream整合Flume

标签:line   creat   转化   spl   min   new   col   util   mem   

原文地址:https://www.cnblogs.com/xjqi/p/12831494.html


评论


亲,登录后才可以留言!