Flink的特点:Flink的相同代码逻辑,既可以做批处理也可以做流处理,相同的数据源可以做流处理也可以做批处理

Source读取数据

1、将普通List转成Source

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

public class Demo1ListSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        ArrayList<String> list = new ArrayList<>();
        list.add("java");
        list.add("hadoop");
        list.add("flink");
        list.add("spark");

        /*
         * list Source :有界流
         */
        DataStream<String> listDS = env.fromCollection(list);

        listDS.print();

        env.execute();
    }
}

2、文件的Source

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class Demo1FileSInk {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        //创建file sink
        FileSink<String> sink = FileSink
        //指定保存数据的路径和数据格式
        .forRowFormat(new Path("data/socket_path"), new SimpleStringEncoder<String>("UTF-8"))
        .withRollingPolicy(
            DefaultRollingPolicy.builder()
            //包含了至少10秒的数据量
            .withRolloverInterval(Duration.ofSeconds(10))
            //10秒没有新的记录滚动一个文件
            .withInactivityInterval(Duration.ofSeconds(10))
            //文件达到1M滚动一个新的文件
            .withMaxPartSize(MemorySize.ofMebiBytes(1))
            .build())
        .build();

        linesDS.sinkTo(sink);

        env.execute();
    }
}


3、Mysql的Source

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class Demo4MysqlSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Student> studentsDS = env.addSource(new MySQLSource());

        studentsDS.print();

        env.execute();
    }
}

class MySQLSource implements SourceFunction<Student> {
    @Override
    public void run(SourceContext<Student> ctx) throws Exception {
        //使用JDBC读取mysql中的数据
        Class.forName("com.mysql.jdbc.Driver");
        //创建数据库链接
        Connection con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata31", "root", "123456");
        //查看数据
        PreparedStatement stat = con.prepareStatement("select id,name,age,gender,clazz from students");
        ResultSet resultSet = stat.executeQuery();
        while (resultSet.next()) {
            long id = resultSet.getLong("id");
            String name = resultSet.getString("name");
            long age = resultSet.getLong("age");
            String gender = resultSet.getString("gender");
            String clazz = resultSet.getString("clazz");
            //将数据发送到下游
            ctx.collect(new Student(id, name, age, gender, clazz));
        }
    }

    @Override
    public void cancel() {
    }
}


@Data
@AllArgsConstructor
class Student {
    private Long id;
    private String name;
    private Long age;
    private String gender;
    private String clazz;
}

4、Kafka的Source

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo5KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //创建kafka source
        KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers("master:9092,node1:9092,node2:9092")//broker列表
        .setTopics("bigdata")//指定topic
        .setGroupId("Demo5KafkaSource")//指定消费者组,保证一条数据在一个组内只消费一次
        .setStartingOffsets(OffsetsInitializer.earliest())//指定起始消费的位置
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();

        //使用kafka source 创建流(无界流)
        DataStreamSource<String> kafkaDS = env
        .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        kafkaDS.print();

        env.execute();
    }
}

Sink存储(写)数据

1、文件的Sink

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class Demo1FileSInk {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        //创建file sink
        FileSink<String> sink = FileSink
        //指定保存数据的路径和数据格式
        .forRowFormat(new Path("data/socket_path"), new SimpleStringEncoder<String>("UTF-8"))
        .withRollingPolicy(
            DefaultRollingPolicy.builder()
            //包含了至少10秒的数据量
            .withRolloverInterval(Duration.ofSeconds(10))
            //10秒没有新的记录滚动一个文件
            .withInactivityInterval(Duration.ofSeconds(10))
            //文件达到1M滚动一个新的文件
            .withMaxPartSize(MemorySize.ofMebiBytes(1))
            .build())
        .build();

        linesDS.sinkTo(sink);

        env.execute();
    }
}

2、Mysql的Sink

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class Demo2MySQLSink {

    public static void main(String[] args) throws Exception {
        //1、创建flink的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        //一行转换成多行
        DataStream<String> wordsDS = linesDS
        .flatMap((FlatMapFunction<String, String>) (line, out) -> {
            for (String word : line.split(",")) {
                //将数据发送到下游
                out.collect(word);
            }
        }).returns(Types.STRING);

        //转换成kv格式
        DataStream<Tuple2<String, Integer>> kvDS = wordsDS
        .map(word -> {
            //返回一个二元组
            return Tuple2.of(word, 1);
        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        //按照单词进行分组
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS
        .keyBy(kv -> kv.f0);

        //统计数量
        DataStream<Tuple2<String, Integer>> countDS = keyByDS
        .reduce((kv1, kv2) -> {
            int count = kv1.f1 + kv2.f1;
            return Tuple2.of(kv1.f0, count);
        });

        //使用自定义sink
        countDS.addSink(new MySQLSink());

        //3、启动flink
        env.execute("wc");
    }
}

//RichSinkFunction:多个open和close方法
//SinkFunction
class MySQLSink extends RichSinkFunction<Tuple2<String, Integer>> {

    private Connection con;
    private PreparedStatement stat;

    //open方法在任务启动的是偶执行,每一个task内执行一次
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("创建数据库链接");
        //使用JDBC读取mysql中的数据
        Class.forName("com.mysql.jdbc.Driver");
        //创建数据库链接
        con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata31", "root", "123456");
        //replace into :如果key存在就更新,不存在就插入,表需要有主键
        stat = con.prepareStatement("replace into word_count values(?,?)");
    }

    //在任务取消时执行
    @Override
    public void close() throws Exception {
        stat.close();
        con.close();
    }

    //invoke每一条数据执行一次
    @Override
    public void invoke(Tuple2<String, Integer> kv, Context context) throws Exception {
        String word = kv.f0;
        Integer count = kv.f1;
        stat.setString(1, word);
        stat.setInt(2, count);
        stat.execute();
    }
}

3、Kafka的Sink

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo4KafkaSInk {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> studentDS = env.readTextFile("data/student_json");

        //创建kafka sink
        KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers("master:9092,node1:9092,node2:9092")
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                             .setTopic("students")
                             .setValueSerializationSchema(new SimpleStringSchema())
                             .build()
                            )
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//数据处理的语义
        .build();

        //使用kafka sink
        studentDS.sinkTo(sink);

        env.execute();

    }
}

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部