9 Process Function (Low-Level API)
2020-12-26 18:26
标签:incr 存在 第一个 string get end date ide port (1) ProcessFunction (2) KeyedProcessFunction (3) CoProcessFunction (8) ProcessAllWindowFunction KeyedProcessFunction 用来操作 KeyedStream。 KeyedProcessFunction 会处理流的每一个元素, Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法: 9 Process Function (Low-Level API) 标签:incr 存在 第一个 string get end date ide port 原文地址:https://www.cnblogs.com/andyonline/p/13369585.html一 8个Process Function
在没有开窗和keyby的情况下使用
在keyby之后使用
(4) ProcessJoinFunction
(5) BroadcastProcessFunction
(6) KeyedBroadcastProcessFunction
(7) ProcessWindowFunction
在开窗后使用二 KeyedProcessFunction
输出为 0 个、 1 个或者多个元素。所有的 Process Function 都继承自 RichFunction 接口,所以
都有 open()、 close() 和 getRuntimeContext() 等方法。而 KeyedProcessFunction[KEY, IN, OUT]
还额外提供了两个方法:
? processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素都会调用这
个方法,调用结果将会放在 Collector 数据类型中输出。 Context 可以访问元素的时间
戳,元素的 key,以及 TimerService 时间服务。 Context 还可以将结果输出到别的流 (side
outputs)。
? onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]) 是一个回调函数。
当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。目录 94
Collector 为输出结果的集合。 OnTimerContext 和 processElement 的 Context 参数一样,
提供了上下文的一些信息,例如 firing trigger 的时间信息 (事件时间或者处理时间)。TimerService and Timers
? currentProcessingTime(): Long 返回当前处理时间
? currentWatermark(): Long 返回当前水位线的时间戳
? registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前 key 的 processing time 的 timer。当 processing time 到达定时时间时,触发 timer。
? registerEventTimeTimer(timestamp: Long): Unit 会注册当前 key 的 event time
timer。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
? deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时
器。如果没有这个时间戳的定时器,则不执行。
? deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,
如果没有此时间戳的定时器,则不执行。
当定时器 timer 触发时,执行回调函数 onTimer()。 processElement() 方法和 onTimer() 方法是
同步(不是异步)方法,这样可以避免并发访问和操作状态。三 code
1 事件时间的keyProcessFunction 和定时器的使用
package test4
import java.sql.Timestamp
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
// nc -lk 9999
//a 1
object ProcessingTimeOnTimer {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val stream = env
.socketTextStream("localhost", 9999, ‘\n‘)
.map(line => {
val arr = line.split(" ")
(arr(0), arr(1))
})
.keyBy(_._1)
.process(new MyKeyedProcess)
stream.print()
env.execute()
}
class MyKeyedProcess extends KeyedProcessFunction[String, (String, String), String] {
// 来一条数据调用一次
override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), String]#Context, out: Collector[String]): Unit = {
// 当前机器时间
val curTime = ctx.timerService().currentProcessingTime()
// 当前机器时间10s之后,触发定时器
ctx.timerService().registerProcessingTimeTimer(curTime + 10 * 1000L)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("位于时间戳:" + new Timestamp(timestamp) + "的定时器触发了!")
}
}
}
2 处理时间定时器的使用
package test4
import java.sql.Timestamp
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
// nc -lk 9999
//a 1
object ProcessingTimeOnTimer {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val stream = env
.socketTextStream("localhost", 9999, ‘\n‘)
.map(line => {
val arr = line.split(" ")
(arr(0), arr(1))
})
.keyBy(_._1)
.process(new MyKeyedProcess)
stream.print()
env.execute()
}
class MyKeyedProcess extends KeyedProcessFunction[String, (String, String), String] {
// 来一条数据调用一次
override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), String]#Context, out: Collector[String]): Unit = {
// 当前机器时间
val curTime = ctx.timerService().currentProcessingTime()
// 当前机器时间10s之后,触发定时器
ctx.timerService().registerProcessingTimeTimer(curTime + 10 * 1000L)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("位于时间戳:" + new Timestamp(timestamp) + "的定时器触发了!")
}
}
}
3 连续一秒钟温度上升的例子
package test4
import test2.{SensorReading, SensorSource}
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
// 如果某一个传感器连续1s中温度上升,报警!
object TempIncreaseAlert {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env
.addSource(new SensorSource)
.keyBy(_.id)
.process(new TempIncreaseAlertFunction)
stream.print()
env.execute()
}
class TempIncreaseAlertFunction extends KeyedProcessFunction[String, SensorReading, String] {
// 用来存储最近一次的温度
// 当保存检查点的时候,会将状态变量保存到状态后端
// 默认状态后端是内存,也可以配置hdfs等为状态后端
// 懒加载,当运行到process方法的时候,才会惰性赋值
// 状态变量只会被初始化一次
// 根据`last-temp`这个名字到状态后端去查找,如果状态后端中没有,那么初始化
// 如果在状态后端中存在`last-temp`的状态变量,直接懒加载
// 默认值是`0.0`
lazy val lastTemp = getRuntimeContext.getState(
new ValueStateDescriptor[Double](
"last-temp",
Types.of[Double]
)
)
// 存储定时器时间戳的状态变量
// 默认值是`0L`
lazy val currentTimer = getRuntimeContext.getState(
new ValueStateDescriptor[Long](
"timer",
Types.of[Long]
)
)
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
// 获取最近一次的温度, 使用`.value()`
val prevTemp = lastTemp.value()
// 将当前温度存入状态变量, `.update()`
lastTemp.update(value.temperature)
// 获取定时器状态变量中的时间戳
val curTimerTimestamp = currentTimer.value()
// 温度:1,2,3,4,5,2
if (prevTemp == 0.0 || value.temperature prevTemp && curTimerTimestamp == 0L) {
// 如果温度上升,且保存定时器时间戳的状态变量为空,就注册一个定时器
// 注册一个1s之后的定时器
val timerTs = ctx.timerService().currentProcessingTime() + 1000L
ctx.timerService().registerProcessingTimeTimer(timerTs)
// 将时间戳存入状态变量
currentTimer.update(timerTs)
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("传感器ID为 " + ctx.getCurrentKey + " 的传感器,温度连续1秒钟上升了!")
currentTimer.clear() // 清空状态变量
}
}
}
4 侧输出流
package test4
import test2.{SensorReading, SensorSource}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object SideOutputExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env
.addSource(new SensorSource)
.process(new FreezingMonitor)
stream
.getSideOutput(new OutputTag[String]("freezing-alarms"))
.print()
stream.print() // 打印主流
env.execute()
}
// 为什么用`ProcessFunction`? 因为没有keyBy分流
class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] {
// 定义侧输出标签
lazy val freezingAlarmOutput = new OutputTag[String]("freezing-alarms")
// 来一条数据,调用一次
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if (value.temperature
文章标题:9 Process Function (Low-Level API)
文章链接:http://soscw.com/essay/38403.html