Flink基本API的使用
2021-03-14 09:28
标签:code 元组 选择器 数据集 字段名 int() 如何 大数据 因此 Flink使用 DataSet 和 DataStream 代表数据集。DateSet 用于批处理,代表数据是有限的,而 DataStream 用于流数据,代表数据是无界的。数据集中的数据是不可以变的,也就是说不能对其中的元素增加或删除。我们通过数据源创建 DataSet 或者 DataStream ,通过 map,filter 等转换(transform)操作对数据集进行操作产生新的数据集。 编写 Flink 程序一般经过如下几个步骤: 下面我们将介绍编写 Flink 程序所涉及的基本 API。 1、输入和输出 以第一个为例创建 execution 环境的代码如下 //显式调用该方法触发程序的执行 words.txt 文件内容: 上面代码创建了 execution 环境,同时利用 env 创建了输入源。在数据集上调用 print 方法可以将数据输出到控制台,当然也可以调用 writeAsText 等方法将数据输出到其他介质。上面流处理最后一行代码调用了 execute 方法,在流处理中需要显式调用该方法触发程序的执行。 上述代码有两种方式运行,一种是直接在 IDE 中执行,就像运行一个普通的 Java 程序,Flink 将启动一个本地的环境执行程序。另一种方式是将程序打包,提交到 Flink 集群运行。上面例子基本包含了一个 Flink 程序的基本骨架,但是并没有对数据集进行更多的 transform 操作,下面我们简单介绍基本 transform 操作。 2.1、批处理: 2.2、流处理: 这里批处理和流处理除了数据集的类型不同,其余写法都一样。就是将每个单词映射成了一个(单词, 1)二元组。与 map 类似的 transform 还有 filter,过滤不需要的记录,读者可以自行尝试。 Flink 的数据模型并不是基于 key-value 的,key 是虚拟的,可以看做是定义在数据上的函数。 3.1、在 Tuple 中定义 key 3.2、用字段表达式指定 key 这里指定 WC 对象的 word 字段作为 key。字段表达式语法如下: 3.3、字段表达式的举例 "count": WC类的 count 字段作为key 3.4、使用 Key Selector 指定 key 可以看到实现的效果与 keyBy(0) 是一样的。 以上便是 Flink 指定 key 的方法。 总结 Flink基本API的使用 标签:code 元组 选择器 数据集 字段名 int() 如何 大数据 因此 原文地址:https://www.cnblogs.com/linjiqin/p/12498687.html
1.获取执行环境(execution environment)
2.加载/创建初始数据集(初始化数据)
3.在数据集上进行各种转换操作,生成新的数据集(转换操作数据)
注:下文统一称为:transform
4.指定将计算的结果放到何处去(输出数据)
5.触发程序执行
首先,需要获得 execution 环境,Flink 提供了以下三种方式:ExecutionEnvironment.getExecutionEnvironment();
ExecutionEnvironment.createLocalEnvironment()
ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles)
1.1、批处理:final ExecutionEnvironment evn = ExecutionEnvironment.getExecutionEnvironment();
DataSet
1.2、流处理:final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream
env.execute();
a b c d e f g
2、map操作
这里的 map 操作类似 MapReduce 中的 map,对数据进行解析,处理。示例如下DataSet
DataStream
3、指定 key
大数据处理经常需要按照某个维度进行处理,也就是需要指定 key。在 DataSet 中使用 groupBy 指定 key,而在 DataStream 中使用 keyBy 指定 key。这里我们以 keyBy 为例进行介绍。//0 代表 Tuple2(二元组)中第一个元素
KeyedStream
public class WC {
public String word;
public int count;
}
DataStream
a、Java对象使用字段名作为key,例子如上
b、对于 Tuple 类型使用字段名(f0, f1,...)或者偏移(从0开始)指定 key,例如 f0 和 5 分别代表 Tuple 第一个字段和第六个字段
c、Java 对象和 Tuple 嵌套的字段作为 key,例如:f1.user.zip 表示 Tuple 第二个字段中的 user 对象中的 zip 字段作为 key
d、通配符 * 代表选择所有类型作为 keypublic static class WC {
public ComplexNestedClass complex; //nested POJO
private int count;
// getter / setter for private field (count)
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3
"complex": complex 的所有字段(递归地)作为key
"complex.word.f2": ComplexNestedClass 类中 word 三元组的第三个字段
"complex.hadoopCitizen": complex类中的 hadoopCitizen 字段
通过 key 选择器函数来制定 key,key 选择器的输入为每个元素,输出为指定的 key,例子如下words.keyBy(new KeySelector
这篇文章主要介绍了 Flink 程序的基本骨架。获得环境、创建输入源、对数据集做 transform 以及输出。由于数据处理经常会按照不同维度(不同的 key)进行统计,因此,本篇内容重点介绍了 Flink 中如何指定 key。后续将会继续介绍 Flink API 的使用。