- Elements
import org.apache.flink.streaming.api.datastream.DataStreamSource; | |
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
public class elements { | |
public static void main(String[] args) throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setParallelism(1); | |
DataStreamSource<Integer> listSource = env.fromElements(1, 2, 3, 4, 5, 6); | |
SingleOutputStreamOperator<Integer> map = listSource.map(i -> i + i ); | |
map.print(); | |
env.execute("from_Elements"); | |
} | |
} |
- TextFile
package com.shyl.source; | |
import org.apache.flink.api.common.functions.FlatMapFunction; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.configuration.Configuration; | |
import org.apache.flink.streaming.api.datastream.DataStreamSource; | |
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.util.Collector; | |
public class TextFile { | |
public static void main(String[] args) throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); | |
DataStreamSource<String> source = env.readTextFile("F:\\BigData\\Flink1.4\\input\\1.txt").setParallelism(1); | |
SingleOutputStreamOperator<Tuple2<String, Integer>> flatmap = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { | |
@Override | |
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { | |
String[] key = s.split(" "); | |
for (String s1 : key) { | |
collector.collect(new Tuple2<String, Integer>(s1, 1)); | |
} | |
} | |
}); | |
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatmap.keyBy(0).sum(1); | |
sum.print(); | |
env.execute("TextFile"); | |
} | |
} |
- socket
package com.shyl.source; | |
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.streaming.api.datastream.DataStreamSource; | |
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
public class socket { | |
public static void main(String[] args) throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
DataStreamSource<String> source = env.socketTextStream("192.168.52.201", 9999); | |
SingleOutputStreamOperator<Object> map = source.map(new MapFunction<String, Object>() { | |
@Override | |
public Object map(String s) throws Exception { | |
return new Tuple2<String, Integer>(s, 1); | |
} | |
}); | |
map.print(); | |
env.execute("new socket"); | |
} | |
} |
- kafka
package com.shyl.source; | |
import org.apache.flink.api.common.eventtime.WatermarkStrategy; | |
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.api.java.tuple.Tuple3; | |
import org.apache.flink.streaming.api.datastream.DataStreamSource; | |
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; | |
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; | |
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; | |
import java.time.Duration; | |
import java.util.Properties; | |
public class kafka { | |
public static void main(String[] args) throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
// 创建 kafka 数据源的配置 | |
Properties properties = new Properties(); | |
properties.setProperty("bootstrap.servers","192.168.52.201:9092"); | |
properties.setProperty("group_id","kafka_source"); | |
// 指定消费者读取的位置 | |
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties); | |
consumer.setStartFromEarliest(); // 尽可能从最早的记录开始 | |
// consumer.setStartFromLatest (); // 从最新的记录开始 | |
// consumer.setStartFromTimestamp (); // 从指定的时间开始(毫秒) | |
// consumer.setStartFromGroupOffsets (); // 默认的方法 从 Kafka brokers 中的 consumer 组(consumer 属性中的 group.id 设置)提交的偏移量中开始读取分区。 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置。 | |
DataStreamSource<String> kafkasource = env.addSource(consumer); | |
SingleOutputStreamOperator<Object> map = kafkasource.map(new MapFunction<String, Object>() { | |
@Override | |
public Object map(String s) throws Exception { | |
return new Tuple2<>(s, 1); | |
} | |
}); | |
map.print(); | |
env.execute("kafka_source"); | |
} | |
} |
- jdbc:flink 没有对 mysql 直接提供支持 ,需要自定义数据源。
package com.shyl.source; | |
import org.apache.flink.configuration.Configuration; | |
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; | |
import java.sql.*; | |
public class JDBCSourceFromMysql extends RichParallelSourceFunction<base_dic> { | |
private PreparedStatement ps = null; | |
private Connection connection = null; | |
@Override | |
public void open(Configuration parameters) throws Exception { | |
super.open(parameters); | |
connection = getConnetion(); | |
String sql = "select * from base_dic"; | |
ps = this.connection.prepareStatement(sql); | |
System.out.println(getRuntimeContext().getJobId()); | |
} | |
@Override | |
public void close() throws Exception { | |
super.close(); | |
if (connection != null) { | |
connection.close(); | |
} | |
if (ps != null) { | |
ps.close(); | |
} | |
} | |
@Override | |
public void run(SourceContext<base_dic> sourceContext) throws Exception { | |
ResultSet resultSet = ps.executeQuery(); | |
while (resultSet.next()) { | |
base_dic basedic = new base_dic(); | |
basedic.setDic_code(resultSet.getString("dic_code")); | |
basedic.setDic_name(resultSet.getString("dic_name")); | |
basedic.setParent_code(resultSet.getString("parent_code")); | |
basedic.setCreate_time(resultSet.getString("create_time")); | |
basedic.setOperate_time(resultSet.getString("operate_time")); | |
sourceContext.collect(basedic); | |
} | |
} | |
@Override | |
public void cancel() { | |
} | |
private static Connection getConnetion() { | |
String driver = "com.mysql.cj.jdbc.Driver"; | |
String url = "jdbc:mysql://192.168.52.201:3306/gmall"; | |
String username = "root"; | |
String password = "1234kxmall!@#ABC"; | |
Connection con = null; | |
try { | |
Class.forName(driver); | |
try { | |
con = DriverManager.getConnection(url, username, password); | |
} catch (SQLException e) { | |
throw new RuntimeException(e); | |
} | |
} catch (ClassNotFoundException e) { | |
throw new RuntimeException(e); | |
} | |
return con; | |
} | |
} |
basc_inc 类
package com.shyl.source; | |
public class base_dic { | |
private String dic_code; | |
private String dic_name; | |
private String parent_code; | |
private String create_time; | |
private String operate_time; | |
public base_dic() { | |
} | |
public String getDic_code() { | |
return dic_code; | |
} | |
public void setDic_code(String dic_code) { | |
this.dic_code = dic_code; | |
} | |
public String getDic_name() { | |
return dic_name; | |
} | |
public void setDic_name(String dic_name) { | |
this.dic_name = dic_name; | |
} | |
public String getParent_code() { | |
return parent_code; | |
} | |
public void setParent_code(String parent_code) { | |
this.parent_code = parent_code; | |
} | |
public String getCreate_time() { | |
return create_time; | |
} | |
public void setCreate_time(String create_time) { | |
this.create_time = create_time; | |
} | |
public String getOperate_time() { | |
return operate_time; | |
} | |
public void setOperate_time(String operate_time) { | |
this.operate_time = operate_time; | |
} | |
@Override | |
public String toString() { | |
return "base_inc{" + | |
"dic_code='" + dic_code + '\'' + | |
", dic_name='" + dic_name + '\'' + | |
", parent_code='" + parent_code + '\'' + | |
", create_time='" + create_time + '\'' + | |
", operate_time='" + operate_time + '\'' + | |
'}'; | |
} | |
} |
jdbcSource
package com.shyl.source; | |
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.streaming.api.datastream.DataStreamSource; | |
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
public class JDBC { | |
public static void main(String[] args) throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setParallelism(1); | |
DataStreamSource<base_dic> mysqlSource = env.addSource(new JDBCSourceFromMysql()); | |
SingleOutputStreamOperator<Object> map = mysqlSource.map(new MapFunction<base_dic, Object>() { | |
@Override | |
public Object map(base_dic value) throws Exception { | |
return value.toString(); | |
} | |
}); | |
map.print(); | |
env.execute("mysql_source"); | |
} | |
} |
# 备注
以 flink1.4.4 版本为例,以下是 pom 文件
</dependency> | |
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-clients_2.12</artifactId> | |
<version>1.14.4</version> | |
</dependency> | |
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-streaming-java_2.12</artifactId> | |
<version>1.14.4</version> | |
<!-- <scope>provided</scope>--> | |
</dependency> | |
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-api</artifactId> | |
<version>1.7.12</version> | |
</dependency> | |
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web --> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-runtime-web_2.12</artifactId> | |
<version>1.14.4</version> | |
<!-- <scope>test</scope>--> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-connector-kafka_2.11</artifactId> | |
<version>1.14.4</version> | |
</dependency> | |
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc --> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-connector-jdbc_2.12</artifactId> | |
<version>1.14.4</version> | |
<!-- <scope>provided</scope>--> | |
</dependency> | |
<!-- https://mvnrepository.com/artifact/com.mysql/mysql-connector-j --> | |
<dependency> | |
<groupId>com.mysql</groupId> | |
<artifactId>mysql-connector-j</artifactId> | |
<version>8.0.31</version> | |
</dependency> |