pom.xml

<properties>
  <flink.version>1.13.6</flink.version>
</properties>

<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-shaded-hadoop-2-uber</artifactId>
    <version>2.7.5-10.0</version>
  </dependency>

  <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
  </dependency>

  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.24</version>
  </dependency>

</dependencies>

<build>
  <extensions>
    <extension>
      <groupId>org.apache.maven.wagon</groupId>
      <artifactId>wagon-ssh</artifactId>
      <version>2.8</version>
    </extension>
  </extensions>

  <plugins>
    <plugin>
      <groupId>org.codehaus.mojo</groupId>
      <artifactId>wagon-maven-plugin</artifactId>
      <version>1.0</version>
      <configuration>
        <!--上传的本地jar的位置-->
        <fromFile>target/${project.build.finalName}.jar</fromFile>
        <!--远程拷贝的地址-->
        <url>scp://root:root@bigdata01:/opt/app</url>
      </configuration>
    </plugin>
  </plugins>

</build>

普通版本+匿名内部类

package com.bigdata;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCount_flink_01 {
    public static void main(String[] args) throws Exception {
        // 导入常用类时要注意   不管是在本地开发运行还是在集群上运行,都这么写,非常方便
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        /**
         * 1.这个是 自动 ,根据流的性质,决定是批处理还是流处理
         * env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
         * 2.批处理流, 一口气把数据算出来
         * env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         * 3.默认是流
         * env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
         */
        // 流处理,默认是这个  可以通过打印批和流的处理结果,体会流和批的含义
//        env.setRuntimeMode(RuntimeExecutionMode.BATCH); 批处理
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
       
        // 设置分区数 也可以在每个方法后面设置一遍
        env.setParallelism(2);
        // 加载数据
        DataStreamSource<String> sourceDs = env.fromElements("spark kafka flink", "spark spark spark", "kafka kafka kafka");

        //转换数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> rsDs = sourceDs.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {

                String[] words = s.split(" ");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        }).map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                // 按照某一列进行分组
                return tuple2.f0;
            }
        }).sum(1);

        // 不能使用sout 若是jar包格式会在8081端口中相关页面打印
        rsDs.print();
        env.execute("单词统计案例");

    }
}

lambda表达式

package com.bigdata;

import com.sun.org.apache.xalan.internal.xsltc.compiler.util.Type;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCount_flink_02 {
    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        
        DataStreamSource<String> sourceDs = env.fromElements("spark kafka flink", "spark spark spark", "kafka kafka kafka");

        /*
        * lambda 表达式的使用
        * 1、函数式接口(一个接口中只有一个未实现的方法)
        * 2、写法 ()->{} 
        * 3、最后一句可以当成返回值
        *
        * 使用dataStream时,需要returns 就比较烦了 而有的方法就不需要
        */
        sourceDs.flatMap((String s, Collector<String> collector)->{
            String[] words = s.split(" ");
            for (String word : words) {
                collector.collect(word);
            }
        }).returns(Types.STRING).map((String word)->
            Tuple2.of(word, 1)
        ).returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy((Tuple2<String, Integer> tuple2)->tuple2.f0).sum(1).print();

        env.execute("单词统计案例");

    }
}

外部传参

package com.bigdata;

/**
 * 使用args 直接进行传参
 *
 */
public class WordCount_flink_04 {
    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        // 获取所有传入的参数
        MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        DataStreamSource<String> sourceDs = null;
        
        // 假如说直接args时,通过长度进行判断  args[0] 获取即可
        // 加载数据
        if (params.has("input")){
            sourceDs = env.readTextFile(params.get("input"));
        }else {
            sourceDs = env.fromElements("spark kafka flink", "spark spark spark", "kafka kafka kafka");
        }

        //转换数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDf = sourceDs.flatMap((String s, Collector<String> collector) -> {
            String[] words = s.split(" ");
            for (String word : words) {
                collector.collect(word);
            }
        }).returns(Types.STRING).map((String word) ->
                Tuple2.of(word, 1)
        ).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2<String, Integer> tuple2) -> tuple2.f0).sum(1);



        // 另一个参数的使用
        if (params.has("output")){
            resultDf.writeAsText(params.get("output"),   
            
                        // 设置最后文件个数为1 此时就不会生成文件夹了
                       FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        }else {
            resultDf.print();
        }
        
        env.execute("单词统计案例--input");

    }
}

使用命令运行flink的jar包

在集群上运行jar包(传参)
方式一:
flink run -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar 
--input /xx --output /xx/xx/x
方式二:
flink run -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar 
/xx/xx/xx /xx/xxx

-c 指定main方法

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部