Flink 窗口 window

2021-03-08 07:28

阅读:662

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val socketStream = env.socketTextStream("hadoop102",7777)

    val dataStream: DataStream[SensorReading] = socketStream.map(d => {
      val arr = d.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
    })

    //统计10秒内的最小温度
    val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(10)) //10秒滚动窗口,不指定时间特性,默认为ProcessingTime
      .reduce((data1, data2)=>(data1._1,data1._2.min(data2._2)))

    //打印原始的dataStream
    dataStream.print("data stream")

    //打印窗口数据流
    minTemperatureStream.print("min temperature")

    env.execute("window test")

  }

}

  测试:  

  连续输入两条数据

[atguigu@hadoop102 ~]$ nc -lk 7777
sensor_1, 1547718200, 30.8
sensor_1, 1547718201, 40.8

  在一个10秒的滚动窗口内,窗口流minTemperatureStream 只输出了一条数据。此时触发TimeWindow去计算的时机就是第一条数据来的10秒过后。

data stream> SensorReading(sensor_1,1547718200,30.8)
data stream> SensorReading(sensor_1,1547718201,40.8)
min temperature> (sensor_1,30.8)

 

案例2:带水位的滚动时间窗口

  代码分析:

  ①通过env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)设定窗口的时间特性为事件时间。

  ②在assignTimestampsAndWatermarks()方法中,传递一个BoundedOutOfOrdernessTimestampExtractor类实现对象,构造器参数就是容忍的延迟时间,实现方法,指明时间戳用哪个字段。

object WindowTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val socketStream = env.socketTextStream("hadoop102",7777)

    val dataStream: DataStream[SensorReading] = socketStream
      .map(d => {
        val arr = d.split(",")
        SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(2)) {
        override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
      })

      //.assignAscendingTimestamps(_.timestamp) //升序数据添指定时间戳

    //统计5秒内的最小温度
    val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5)) //5秒滚动窗口
      .reduce((data1, data2)=>(data1._1,data1._2.min(data2._2)))

    //打印原始的dataStream
    dataStream.print("data stream")

    //打印窗口数据流
    minTemperatureStream.print("min temperature")

    env.execute("window test")

  }

}

  测试:

  当输入第一条数据时,时间戳是1547718200(单位秒),因为窗口的长度为5,所以理论上当时间戳为1547718205的数据来后,窗口会打印输出,但是由于设定了延迟2秒,所以此时水位才到1547718203,所以只有当时间戳为1547718207或之后的数据到来,水位线涨到大于等于1547718205时,窗口才会触发计算并关闭。

  sockt输入数据如下

[atguigu@hadoop102 ~]$ nc -lk 7777
sensor_1, 1547718200, 30.8
sensor_1, 1547718201, 31         
sensor_1, 1547718202, 32  
sensor_1, 1547718203, 33  
sensor_1, 1547718204, 34  
sensor_1, 1547718205, 35  
sensor_1, 1547718206, 36  
sensor_1, 1547718207, 37  
sensor_1, 1547718208,38

  控制台打印如下:

data stream> SensorReading(sensor_1,1547718200,30.8)
data stream> SensorReading(sensor_1,1547718201,31.0)
data stream> SensorReading(sensor_1,1547718202,32.0)
data stream> SensorReading(sensor_1,1547718203,33.0)
data stream> SensorReading(sensor_1,1547718204,34.0)
data stream> SensorReading(sensor_1,1547718205,35.0)
data stream> SensorReading(sensor_1,1547718206,36.0)
data stream> SensorReading(sensor_1,1547718207,37.0)
min temperature> (sensor_1,30.8)
data stream> SensorReading(sensor_1,1547718208,38.0)

 

案例3:滑动时间窗口

  滑动窗口和滚动窗口特性类似,滚动窗口可以看作一种特殊的滑动窗口,其窗口长度与滑动长度一样。在.timeWindow(Time.seconds(10),Time.seconds(5)) 方法中,设定了窗口的长度为10,滑动长度为5。窗口长度决定了窗口计算的数据的范围有多大,而滑动长度决定了窗口计算并关闭的时机。

//统计10秒内的最小温度,5秒输出一次
val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature))
  .keyBy(_._1)
  .timeWindow(Time.seconds(10),Time.seconds(5)) //滑动窗口
  .reduce((data1, data2)=>(data1._1,data1._2.min(data2._2)))

  

 

 

 


评论


亲,登录后才可以留言!