Flink 窗口 window
2021-03-08 07:28
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object WindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val socketStream = env.socketTextStream("hadoop102",7777) val dataStream: DataStream[SensorReading] = socketStream.map(d => { val arr = d.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble) }) //统计10秒内的最小温度 val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(10)) //10秒滚动窗口,不指定时间特性,默认为ProcessingTime .reduce((data1, data2)=>(data1._1,data1._2.min(data2._2))) //打印原始的dataStream dataStream.print("data stream") //打印窗口数据流 minTemperatureStream.print("min temperature") env.execute("window test") } }
测试:
连续输入两条数据
[atguigu@hadoop102 ~]$ nc -lk 7777 sensor_1, 1547718200, 30.8 sensor_1, 1547718201, 40.8
在一个10秒的滚动窗口内,窗口流minTemperatureStream 只输出了一条数据。此时触发TimeWindow去计算的时机就是第一条数据来的10秒过后。
data stream> SensorReading(sensor_1,1547718200,30.8) data stream> SensorReading(sensor_1,1547718201,40.8) min temperature> (sensor_1,30.8)
案例2:带水位的滚动时间窗口
代码分析:
①通过env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)设定窗口的时间特性为事件时间。
②在assignTimestampsAndWatermarks()方法中,传递一个BoundedOutOfOrdernessTimestampExtractor类实现对象,构造器参数就是容忍的延迟时间,实现方法,指明时间戳用哪个字段。
object WindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val socketStream = env.socketTextStream("hadoop102",7777) val dataStream: DataStream[SensorReading] = socketStream .map(d => { val arr = d.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(2)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000 }) //.assignAscendingTimestamps(_.timestamp) //升序数据添指定时间戳 //统计5秒内的最小温度 val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(5)) //5秒滚动窗口 .reduce((data1, data2)=>(data1._1,data1._2.min(data2._2))) //打印原始的dataStream dataStream.print("data stream") //打印窗口数据流 minTemperatureStream.print("min temperature") env.execute("window test") } }
测试:
当输入第一条数据时,时间戳是1547718200(单位秒),因为窗口的长度为5,所以理论上当时间戳为1547718205的数据来后,窗口会打印输出,但是由于设定了延迟2秒,所以此时水位才到1547718203,所以只有当时间戳为1547718207或之后的数据到来,水位线涨到大于等于1547718205时,窗口才会触发计算并关闭。
sockt输入数据如下
[atguigu@hadoop102 ~]$ nc -lk 7777 sensor_1, 1547718200, 30.8 sensor_1, 1547718201, 31 sensor_1, 1547718202, 32 sensor_1, 1547718203, 33 sensor_1, 1547718204, 34 sensor_1, 1547718205, 35 sensor_1, 1547718206, 36 sensor_1, 1547718207, 37 sensor_1, 1547718208,38
控制台打印如下:
data stream> SensorReading(sensor_1,1547718200,30.8) data stream> SensorReading(sensor_1,1547718201,31.0) data stream> SensorReading(sensor_1,1547718202,32.0) data stream> SensorReading(sensor_1,1547718203,33.0) data stream> SensorReading(sensor_1,1547718204,34.0) data stream> SensorReading(sensor_1,1547718205,35.0) data stream> SensorReading(sensor_1,1547718206,36.0) data stream> SensorReading(sensor_1,1547718207,37.0) min temperature> (sensor_1,30.8) data stream> SensorReading(sensor_1,1547718208,38.0)
案例3:滑动时间窗口
滑动窗口和滚动窗口特性类似,滚动窗口可以看作一种特殊的滑动窗口,其窗口长度与滑动长度一样。在.timeWindow(Time.seconds(10),Time.seconds(5)) 方法中,设定了窗口的长度为10,滑动长度为5。窗口长度决定了窗口计算的数据的范围有多大,而滑动长度决定了窗口计算并关闭的时机。
//统计10秒内的最小温度,5秒输出一次 val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(10),Time.seconds(5)) //滑动窗口 .reduce((data1, data2)=>(data1._1,data1._2.min(data2._2)))