Flink(二)快速上手之wordCount(java)

2021-01-13 17:32

阅读:901

标签:接受   oca   throws   oal   data   sele   local   pre   处理   

  • 创建maven功臣
  • pom文件
4.0.0MyFlink
    MyFlink
    1.0-SNAPSHOTorg.apache.flink
            flink-java
            1.9.0compileorg.apache.flink
            flink-streaming-java_2.11
            1.9.0compileorg.apache.maven.plugins
                maven-shade-plugin
                3.1.0falsepackageshadecom.google.code.findbugs:jsr305org.slf4j:*log4j:*com.haier.cosmodata.source.MyDataStreamSourceDemoreference.conf
                                    *:*:*:*
                                    META-INF/*.SFMETA-INF/*.DSAMETA-INF/*.RSA
  • StreamWordCount
package com.sgg.bigdata;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 流式处理WordCount
 * Created by huqian on 2020/5/23 22:24
 */
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
         //创建一个流处理的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //接受socket数据流
        DataStreamSource textDataSteam = env.socketTextStream("localhost",7777);

        //逐一读取数据,打散之后进行WordCount
        SingleOutputStreamOperator> wordCountDataStream = textDataSteam
                .flatMap(new FlatMapFunction>() {
                    public void flatMap(String s, Collector> collector) throws Exception {
                        String[] tokens = s.split(" ");

                        for (String token : tokens) {
                            if (token.length() > 0) {
                                collector.collect(new Tuple2(token, 1));
                            }
                        }
                    }
                })
                .filter(new FilterFunction>() {
                    public boolean filter(Tuple2 stringIntegerTuple2) throws Exception {
                        if (stringIntegerTuple2.equals(null)) {
                            return false;
                        }
                        return true;
                    }
                })
                .keyBy(0)
                .sum(1);

        //打印输出
        wordCountDataStream.print();

        //执行任务
        env.execute("StreamWordCountJob");
        //测试需要开启端口7777

    }
}

-- 测试
技术图片
技术图片

Flink(二)快速上手之wordCount(java)

标签:接受   oca   throws   oal   data   sele   local   pre   处理   

原文地址:https://www.cnblogs.com/xinjitu-001/p/12945437.html


评论


亲,登录后才可以留言!