Flink常用算子代码实现(Scala和Java)
2021-03-30 03:28
标签:-- 输出 exe join hadoop 数据库连接 连接 tor order Flink常用算子代码实现 (Scala版本和Java版本) map: scala语法简化: filter算子,返回满足条件的结果。 mapPartition的作用:原本是一个map调用一次,现在改成一个分区调用一次。 现在换成mapPartition: FlatMap:take one element and produce zero, one or more elements. val result = input1.join(input2).where(0).equalTo(1) Flink常用算子代码实现(Scala和Java) 标签:-- 输出 exe join hadoop 数据库连接 连接 tor order 原文地址:https://www.cnblogs.com/bigband/p/13588683.htmlmap之scala实现
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
mapFunction(env)
}
def mapFunction(env: ExecutionEnvironment):Unit = {
val data = env.fromCollection(List(1,2,3,4,5))
data.map((x:Int)=>x+1).print()
}
输出:
2
3
4
5
6
data.map((x:Int)=>x+1).print()
println("----")
data.map((x)=>x+1).print()
println("----")
data.map(x=>x+1).print()
println("----")
data.map(_+1).print()
map之Java实现
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
mapFunction(env);
}
public static void mapFunction(ExecutionEnvironment env) throws Exception{
List
filter之scala实现
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
filterFunction(env)
}
def filterFunction(env: ExecutionEnvironment):Unit = {
env.fromCollection(List(1,2,3,4,5))
.map(_+1)
.filter(_>3)
.print()
}
输出:
4
5
6
filter 之Java实现
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
filterFunction(env);
}
public static void filterFunction(ExecutionEnvironment env) throws Exception {
List
mapPartition 之scala实现
import scala.util.Random
//新建一个数据库工具类,用来连接数据库
object DBUtils {
def getConnection() = {
//获取数据库连接
new Random().nextInt(10)
}
def returnConnection(connection: String) = {
//把数据存到数据库
}
}
如果使用map函数,每次都会去请求数据库连接,请求太频繁会把数据库搞崩溃,但是mapPartition就不会,它是一个分区的数据请求一次,可以设置并行度,较少数据库的请求压力。
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
//filterFunction(env)
mapPartitionFunction(env)
}
def mapPartitionFunction(env:ExecutionEnvironment):Unit = {
val students = new ListBuffer[String]
for(i {
val connection = DBUtils.getConnection()
println(connection + "......")
x
}).print();
// data.map(x=>{
// //每一个元素要存储到数据库中,肯定要先获取到一个connection
// val connection = DBUtils.getConnection() + "...."
//
// //把数据保存到DB
// DBUtils.returnConnection(connection)
// }).print();
}
现在的情况是: 使用map会请求100次,使用mapPartition 会请求4次,大大降低数据库的压力。
mapPartition之java实现
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
mapPartition(env);
}
public static void mapPartition(ExecutionEnvironment env) throws Exception {
List
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
mapPartition(env);
}
public static void mapPartition(ExecutionEnvironment env) throws Exception {
List
first(n)之scala实现
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
firstFunction(env)
}
def firstFunction(env: ExecutionEnvironment) : Unit = {
val info = ListBuffer[(Int,String)]()
info.append((1,"Hadoop"))
info.append((1,"Spark"))
info.append((1,"Flink"))
info.append((2,"Java"))
info.append((2,"Spring"))
info.append((3,"Linux"))
info.append((4,"VUE"))
val data = env.fromCollection(info)
data.first(3).print()
输出:
(1,Hadoop)
(1,Spark)
(1,Flink)
data.groupBy(0).first(2).print()
输出:
(3,Linux)
(1,Hadoop)
(1,Spark)
(2,Java)
(2,Spring)
(4,VUE)
data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print();
输出:
(3,Linux)
(1,Spark)
(1,Hadoop)
(2,Spring)
(2,Java)
(4,VUE)
data.groupBy(0).sortGroup(1,Order.ASCENDING).first(2).print();
(3,Linux)
(1,Flink)
(1,Hadoop)
(2,Java)
(2,Spring)
(4,VUE)
}
first 之 java实现
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
firstFunction(env);
}
public static void firstFunction(ExecutionEnvironment env) throws Exception {
List
flatMap之scala实现
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
flatMapFunction(env)
}
def flatMapFunction(env: ExecutionEnvironment) : Unit = {
val info = ListBuffer[String]()
info.append("hadoop,spark")
info.append("flink,spark")
info.append("hadoop,flink,spark")
env.fromCollection(info).flatMap(_.split(",")).print()
输出:
hadoop
spark
flink
spark
hadoop
flink
spark
env.fromCollection(info).flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()
输出:
(hadoop,2)
(flink,2)
(spark,3)
}
flatMap之Java 实现
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
flatMapFunction(env);
}
public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
List
distinct 之scala实现
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
distinctFunction(env)
}
def distinctFunction(env: ExecutionEnvironment) :Unit = {
val info = ListBuffer[String]()
info.append("hadoop,spark")
info.append("flink,spark")
info.append("hadoop,flink,spark")
env.fromCollection(info).flatMap(_.split(",")).distinct().print()
}
输出:
hadoop
flink
spark
distinct之java实现
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
distinctFunction(env);
}
public static void distinctFunction(ExecutionEnvironment env) throws Exception {
List
join之scala实现
解释:0表示第一个输入的字段,1表示第二个输入的字段。
input1的第0个字段和input2的第1个字段做join。def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
joinFunction(env)
}
def joinFunction(env: ExecutionEnvironment):Unit = {
val info1 = ListBuffer[(Int,String)]() //编号 名字
info1.append((1,"张三"))
info1.append((2,"李四"))
info1.append((3,"王五"))
info1.append((4,"小强"))
val info2 = ListBuffer[(Int,String)]() //编号 城市
info2.append((1,"北京"))
info2.append((2,"上海"))
info2.append((3,"成都"))
info2.append((5,"武汉"))
val data1 = env.fromCollection(info1)
val data2 = env.fromCollection(info2)
data1.join(data2).where(0).equalTo(0).apply((first,second)=>{
(first._1,first._2,second._2)
}).print();
}
输出:
(3,王五,成都)
(1,张三,北京)
(2,李四,上海)
join之java实现
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
joinFunction(env);
}
public static void joinFunction(ExecutionEnvironment env) throws Exception{
List
outjoin之scala实现
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
outjoinFunction(env)
}
def outjoinFunction(env: ExecutionEnvironment):Unit = {
val info1 = ListBuffer[(Int,String)]() //编号 名字
info1.append((1,"张三"))
info1.append((2,"李四"))
info1.append((3,"王五"))
info1.append((4,"小强"))
val info2 = ListBuffer[(Int,String)]() //编号 城市
info2.append((1,"北京"))
info2.append((2,"上海"))
info2.append((3,"成都"))
info2.append((5,"武汉"))
val data1 = env.fromCollection(info1)
val data2 = env.fromCollection(info2)
data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first,second)=> {
if (second == null) {
(first._1, first._2, "null")
} else {
(first._1, first._2, second._2)
}
}).print();
}
输出:
(3,王五,成都)
(1,张三,北京)
(2,李四,上海)
(4,小强,null)
outerJoin之java实现
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
outerjoinFunction(env);
}
public static void outerjoinFunction(ExecutionEnvironment env) throws Exception{
List
cross之scala实现
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
crossFunction(env)
}
def crossFunction(env: ExecutionEnvironment):Unit = {
val info1 = ListBuffer[String]()
info1.append("长城")
info1.append("长安")
val info2 = ListBuffer[Int]()
info2.append(1)
info2.append(2)
info2.append(3)
val data1 = env.fromCollection(info1)
val data2 = env.fromCollection(info2)
data1.cross(data2).print()
}
输出:
(长城,1)
(长城,2)
(长城,3)
(长安,1)
(长安,2)
(长安,3)
cross之java实现
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
crossFunction(env);
}
public static void crossFunction(ExecutionEnvironment env) throws Exception{
List
sink scala 代码
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val data = 1.to(10)
val text = env.fromCollection(data)
val path = "/Users/zhiyingliu/tmp/flink/ouput"
text.writeAsText(path,WriteMode.OVERWRITE).setParallelism(3)
env.execute("sinkTest")
}
sink Java 代码
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
List