1. Elements
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class elements {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> listSource = env.fromElements(1, 2, 3, 4, 5, 6);
        SingleOutputStreamOperator<Integer> map = listSource.map(i -> i + i );
        map.print();
        env.execute("from_Elements");
    }
}
  1. TextFile
package com.shyl.source;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class TextFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = env.readTextFile("F:\\BigData\\Flink1.4\\input\\1.txt").setParallelism(1);
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatmap = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] key = s.split(" ");
                for (String s1 : key) {
                    collector.collect(new Tuple2<String, Integer>(s1, 1));
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatmap.keyBy(0).sum(1);
        sum.print();
        env.execute("TextFile");
    }
}
  1. socket
package com.shyl.source;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class socket {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("192.168.52.201", 9999);
        SingleOutputStreamOperator<Object> map = source.map(new MapFunction<String, Object>() {
            @Override
            public Object map(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        map.print();
        env.execute("new socket");
    }
}
  1. kafka
package com.shyl.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.time.Duration;
import java.util.Properties;
public class kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        创建 kafka 数据源的配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","192.168.52.201:9092");
        properties.setProperty("group_id","kafka_source");
        // 指定消费者读取的位置
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest();     // 尽可能从最早的记录开始
//        consumer.setStartFromLatest ();       // 从最新的记录开始
//        consumer.setStartFromTimestamp (); // 从指定的时间开始(毫秒)
//        consumer.setStartFromGroupOffsets (); // 默认的方法 从 Kafka brokers 中的 consumer 组(consumer 属性中的 group.id 设置)提交的偏移量中开始读取分区。 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置。
        DataStreamSource<String> kafkasource = env.addSource(consumer);
        SingleOutputStreamOperator<Object> map = kafkasource.map(new MapFunction<String, Object>() {
            @Override
            public Object map(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });
        map.print();
        env.execute("kafka_source");
    }
}
  1. jdbc:flink 没有对 mysql 直接提供支持 ,需要自定义数据源。
package com.shyl.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.sql.*;
public class JDBCSourceFromMysql extends RichParallelSourceFunction<base_dic> {
    private PreparedStatement ps = null;
    private Connection connection = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnetion();
        String sql = "select * from base_dic";
        ps = this.connection.prepareStatement(sql);
        System.out.println(getRuntimeContext().getJobId());
    }
    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }
    @Override
    public void run(SourceContext<base_dic> sourceContext) throws Exception {
        ResultSet resultSet = ps.executeQuery();
        while (resultSet.next()) {
            base_dic basedic = new base_dic();
            basedic.setDic_code(resultSet.getString("dic_code"));
            basedic.setDic_name(resultSet.getString("dic_name"));
            basedic.setParent_code(resultSet.getString("parent_code"));
            basedic.setCreate_time(resultSet.getString("create_time"));
            basedic.setOperate_time(resultSet.getString("operate_time"));
            sourceContext.collect(basedic);
        }
    }
    @Override
    public void cancel() {
    }
    private static Connection getConnetion() {
        String driver = "com.mysql.cj.jdbc.Driver";
        String url = "jdbc:mysql://192.168.52.201:3306/gmall";
        String username = "root";
        String password = "1234kxmall!@#ABC";
        Connection con = null;
        try {
            Class.forName(driver);
            try {
                con = DriverManager.getConnection(url, username, password);
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
        return con;
    }
}

basc_inc 类

package com.shyl.source;
public class base_dic {
    private String dic_code;
    private String dic_name;
    private String parent_code;
    private String create_time;
    private String operate_time;
    public base_dic() {
    }
    public String getDic_code() {
        return dic_code;
    }
    public void setDic_code(String dic_code) {
        this.dic_code = dic_code;
    }
    public String getDic_name() {
        return dic_name;
    }
    public void setDic_name(String dic_name) {
        this.dic_name = dic_name;
    }
    public String getParent_code() {
        return parent_code;
    }
    public void setParent_code(String parent_code) {
        this.parent_code = parent_code;
    }
    public String getCreate_time() {
        return create_time;
    }
    public void setCreate_time(String create_time) {
        this.create_time = create_time;
    }
    public String getOperate_time() {
        return operate_time;
    }
    public void setOperate_time(String operate_time) {
        this.operate_time = operate_time;
    }
    @Override
    public String toString() {
        return "base_inc{" +
                "dic_code='" + dic_code + '\'' +
                ", dic_name='" + dic_name + '\'' +
                ", parent_code='" + parent_code + '\'' +
                ", create_time='" + create_time + '\'' +
                ", operate_time='" + operate_time + '\'' +
                '}';
    }
}

jdbcSource

package com.shyl.source;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class JDBC {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<base_dic> mysqlSource = env.addSource(new JDBCSourceFromMysql());
        SingleOutputStreamOperator<Object> map = mysqlSource.map(new MapFunction<base_dic, Object>() {
            @Override
            public Object map(base_dic value) throws Exception {
                return value.toString();
            }
        });
        map.print();
        env.execute("mysql_source");
    }
}

# 备注

以 flink1.4.4 版本为例,以下是 pom 文件

</dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.14.4</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.12</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>1.14.4</version>
<!--            <scope>test</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.14.4</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>1.14.4</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.mysql/mysql-connector-j -->
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <version>8.0.31</version>
        </dependency>