Flink 流处理API之一
2021-03-11 14:31
标签:property record var 完成 lov water fse values 数据源 批处理环境 流式数据处理环境 如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1 返回本地执行环境,需要在调用时指定默认的并行度。 返回集群执行环境,将Jar提交到远程服务器。 需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。 引入kafka连接器的依赖 print方法其实就是一种Sink 官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。 增加依赖关系: 主函数中添加sink: 通过kafka消费者控制台查看: 增加依赖关系: 定义一个redis的mapper类,用于定义保存到redis时调用的命令: 访问redis客户端查看数据: 增加依赖关系: 在主函数中调用: 在ES中查看: 增加依赖关系: 添加MyJdbcSink: The By default the bucketing sink will split by the current system time when elements arrive and will use the datetime pattern There are two configuration options that specify when a part file should be closed and a new one started: This will create a sink that writes to bucket files that follow this schema: File Formats The Row-encoded Formats Bulk-encoded Formats Flink comes with three built-in BulkWriter factories: Parquet format Hadoop SequenceFile format Flink 流处理API之一 标签:property record var 完成 lov water fse values 数据源 原文地址:https://www.cnblogs.com/hyunbar/p/12632931.html1、 Environment
1.1 getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
1.2 createLocalEnvironment
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
1.3 createRemoteEnvironment
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")
2、Source
2.1 从集合中读取数据
def main(args: Array[String]): Unit = {
val env1: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataList = List(
("1", 1231231L, 200),
("2", 1231231L, 201),
("3", 1231231L, 202)
).map{
case (id, ts, vc) => {
WaterSensor( id, ts, vc )
}
}
val dataDS: DataStream[WaterSensor] = env1.fromCollection(dataList)
dataDS.print()
env1.execute()
}
case class WaterSensor(id:String, ts:Long, vc:Double)
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val sensorDS: DataStream[WaterSensor] = env.fromCollection(
List(
WaterSensor("ws_001", 1577844001, 45.0),
WaterSensor("ws_002", 1577844015, 43.0),
WaterSensor("ws_003", 1577844020, 42.0)
)
)
sensorDS.print()
env.execute("sensor")
}
2.2 从文件读取数据
// TODO 从文件中获取数据源
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
// 相对路径
//val fileDS: DataStream[String] = env.readTextFile("input/word.txt")
// Flink默认无法识别hdfs协议,需要引入相关jar包
val fileDS: DataStream[String] = env.readTextFile("hdfs://linux1:9000/directory/app-20191213160742-0000")
fileDS.print("file>>>>")
env.execute()
2.3 从Kafka中读取数据
2.4 自定义source
def main(args: Array[String]): Unit = {
// TODO 从文件中获取数据源
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
env.addSource( new MySource() ).print("mine>>>>")
env.execute()
}
// 自定义数据源
// 1. 继承SourceFunction
// 2. 重写方法
class MySource extends SourceFunction[WaterSensor]{
private var flg = true
// 运行数据采集逻辑
override def run(ctx: SourceFunction.SourceContext[WaterSensor]): Unit = {
while ( flg ) {
// 将数据由数据源环境进行采集
ctx.collect(WaterSensor( "1", 1L, 1 ))
Thread.sleep(200)
}
}
// 取消数据采集
override def cancel(): Unit = {
flg = false
}
}
3、Sink
stream.addSink(new MySink(xxxx))
public DataStreamSink
3.1 Kafka
//向kafka中写入数据
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
env.setParallelism(1)
val ds: DataStream[String] = env.readTextFile("input/word.txt")
ds.addSink( new FlinkKafkaProducer011[String]( "linux1:9092", "waterSensor", new SimpleStringSchema() ) )
env.execute()
bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic sensor
3.2 Redis
// TODO 向kafka中写入数据
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
env.setParallelism(1)
val ds: DataStream[String] = env.readTextFile("input/word.txt")
val conf = new FlinkJedisPoolConfig.Builder().setHost("linux4").setPort(6379).build()
ds.addSink( new RedisSink[String](conf, new RedisMapper[String] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "word")
}
override def getKeyFromData(t: String): String = {
t.split(" ")(1)
}
override def getValueFromData(t: String): String = {
t.split(" ")(0)
}
}))
env.execute()
HGETALL sensor
3.3 Elasticsearch
// TODO 向kafka中写入数据
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
env.setParallelism(1)
val list = List(
WaterSensor("sensor_1", 150000L, 25),
WaterSensor("sensor_1", 150001L, 27),
WaterSensor("sensor_1", 150005L, 30),
WaterSensor("sensor_1", 150007L, 40)
)
val waterSensorDS: DataStream[WaterSensor] = env.fromCollection(list)
val httpHosts = new java.util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("linux1", 9200))
val esSinkBuilder = new ElasticsearchSink.Builder[WaterSensor]( httpHosts, new ElasticsearchSinkFunction[WaterSensor] {
override def process(t: WaterSensor, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
println("saving data: " + t)
val json = new java.util.HashMap[String, String]()
json.put("data", t.toString)
val indexRequest = Requests.indexRequest().index("water").`type`("readingData").source(json)
requestIndexer.add(indexRequest)
println("saved successfully")
}
} )
waterSensorDS.addSink(esSinkBuilder.build())
env.execute()
3.4 JDBC
def main(args: Array[String]): Unit = {
// TODO 向JDBC中写入数据
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
env.setParallelism(1)
val list = List(
WaterSensor("sensor_1", 150000L, 25),
WaterSensor("sensor_1", 150001L, 27),
WaterSensor("sensor_1", 150005L, 30),
WaterSensor("sensor_1", 150007L, 40)
)
val waterSensorDS: DataStream[WaterSensor] = env.fromCollection(list)
waterSensorDS.addSink( new MyJDBCSink )
env.execute()
}
// 自定义Sink
// 1. 继承 RichSinkFunction
// 2. 重写方法
class MyJDBCSink extends RichSinkFunction[WaterSensor] {
private var conn : Connection = _
private var pstat : PreparedStatement = _
override def open(parameters: Configuration): Unit = {
//Class.forName()
conn = DriverManager.getConnection("jdbc:mysql://linux1:3306/rdd", "root", "000000")
pstat = conn.prepareStatement("insert into user (id, name, age) values (?, ?, ?)")
}
override def invoke(ws: WaterSensor, context: SinkFunction.Context[_]): Unit = {
pstat.setInt(1, 1)
pstat.setString(2, ws.id)
pstat.setInt(3, ws.vc)
pstat.executeUpdate()
}
override def close(): Unit = {
pstat.close()
conn.close()
}
}
3.5 HDFS
BucketingSink
has been deprecated since Flink 1.9 and will be removed in subsequent releases. Please use the StreamingFileSink instead.3.5.1 BucketingSink:
"yyyy-MM-dd--HH"
to name the buckets
Long.MAX_VALUE
)// the SequenceFileWriter only works with Flink Tuples
import org.apache.flink.api.java.tuple.Tuple2
val input: DataStream[Tuple2[A, B]] = ...
val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path")
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")))
sink.setWriter(new SequenceFileWriter[IntWritable, Text])
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
input.addSink(sink)
/base/path/{date-time}/part-{parallel-task}-{count}
3.5.2 StreamingFileSink
StreamingFileSink
supports both row-wise and bulk encoding formats, such as Apache Parquet. These two variants come with their respective builders that can be created with the following static methods:
StreamingFileSink.forRowFormat(basePath, rowEncoder)
StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
val input: DataStream[String] = ...
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build()
input.addSink(sink)
上一篇:Windows之wsl简单配置