# 启动 flink 测试环境
- 启动本地集群
bin/start-cluster.sh
- 启动 sql-client
bin/sql-client.sh
-
sql-client 的 3 种模式
- ’table‘
set 'sql-client.execution.result-mode' = 'table';- ‘changelog’
set 'sql-client.execution.result-mode' = 'changelog';- ‘tableau’
set 'sql-client.execution.result-mode' = 'tableau'; -
设置批处理 / 流处理模式
set 'execution.runtime-mode'='batch';
set 'execution.runtime-mode'='streaming';
# 创建不同数据源的表
# kafka
- kafka-csv
create table if not exists kafka_table( | |
user_id bigint, | |
item_id bigint comment '物品id', | |
behavior string comment '行为', | |
ts timestamp_ltz(3) metadata from 'timestamp' ) | |
with( | |
'connector'='kafka', | |
'topic' ='topic_flinksql', | |
'properties.bootstrap.servers'='nodev2001:9092', | |
'properties.group.id'='testGroup', | |
'scan.startup.mode'='earliest-offset', | |
'format'='csv' | |
) |
- 数据样例
1,1,浏览 | |
2,3,点击 | |
3,2,评论 | |
4,2,分享 | |
5,5,点击 | |
6,2,点击 | |
7,3,点击 | |
8,2,分享 | |
9,3,点击 | |
10,2,浏览 | |
11,3,浏览 | |
12,5,评论 | |
13,1,评论 | |
14,4,分享 | |
15,1,分享 | |
16,4,分享 | |
17,5,浏览 | |
18,5,点击 | |
19,4,评论 | |
20,3,分享 | |
21,3,评论 | |
22,2,分享 | |
23,1,分享 | |
24,5,分享 | |
25,4,评论 | |
26,2,点击 | |
27,3,评论 | |
28,2,评论 | |
29,1,点击 | |
30,5,浏览 | |
31,2,点击 | |
32,1,分享 | |
33,5,点击 | |
34,1,浏览 | |
35,3,点击 | |
36,4,评论 | |
37,3,评论 | |
38,2,浏览 | |
39,3,评论 | |
40,4,分享 | |
41,3,浏览 | |
42,2,点击 | |
43,3,浏览 | |
44,3,评论 | |
45,2,点击 | |
46,5,分享 | |
47,1,分享 | |
48,5,浏览 | |
49,2,评论 | |
50,5,点击 | |
51,4,评论 | |
52,4,分享 | |
53,4,浏览 | |
54,1,评论 | |
55,1,评论 | |
56,2,浏览 | |
57,1,分享 | |
58,4,点击 | |
59,1,评论 | |
60,2,点击 | |
61,1,评论 | |
62,2,浏览 | |
63,1,点击 | |
64,3,点击 | |
65,1,浏览 | |
66,3,点击 | |
67,1,点击 | |
68,2,评论 | |
69,5,浏览 | |
70,1,分享 | |
71,4,点击 | |
72,1,分享 | |
73,1,点击 | |
74,1,浏览 | |
75,5,点击 | |
76,5,浏览 | |
77,4,浏览 | |
78,2,浏览 | |
79,5,点击 | |
80,2,点击 | |
81,1,点击 | |
82,5,分享 | |
83,5,评论 | |
84,5,点击 | |
85,5,浏览 | |
86,5,点击 | |
87,1,评论 | |
88,2,评论 | |
89,4,分享 | |
90,1,分享 | |
91,4,浏览 | |
92,3,分享 | |
93,3,浏览 | |
94,1,点击 | |
95,5,点击 | |
96,3,点击 | |
97,3,评论 | |
98,3,分享 | |
99,4,分享 | |
100,1,评论 |
- 利用 kafka-json 插入数据
create table if not exists kafka_table( | |
user_id bigint, | |
item_id bigint comment '物品id', | |
behavior string comment '行为', | |
ts timestamp_ltz(3) metadata from 'timestamp' ) | |
with( | |
'connector'='kafka', | |
'topic' ='topic_flinksql', | |
'properties.bootstrap.servers'='nodev2001:9092', | |
'properties.group.id'='testGroup', | |
'scan.startup.mode'='earliest-offset', | |
'format'='json', | |
'json.ignore-parse-errors'='true' // 忽略解析错误 | |
) |
- 数据示例
{"user_id":1,"item_id":1,"behavior":"浏览"} | |
{"user_id":2,"item_id":3,"behavior":"点击"} | |
{"user_id":3,"item_id":2,"behavior":"评论"} | |
{"user_id":4,"item_id":2,"behavior":"分享"} | |
{"user_id":5,"item_id":5,"behavior":"点击"} | |
{"user_id":6,"item_id":2,"behavior":"点击"} | |
{"user_id":7,"item_id":3,"behavior":"点击"} | |
{"user_id":8,"item_id":2,"behavior":"分享"} | |
{"user_id":9,"item_id":3,"behavior":"点击"} | |
{"user_id":10,"item_id":2,"behavior":"浏览"} | |
{"user_id":11,"item_id":3,"behavior":"浏览"} | |
{"user_id":12,"item_id":5,"behavior":"评论"} | |
{"user_id":13,"item_id":1,"behavior":"评论"} | |
{"user_id":14,"item_id":4,"behavior":"分享"} | |
{"user_id":15,"item_id":1,"behavior":"分享"} | |
{"user_id":16,"item_id":4,"behavior":"分享"} | |
{"user_id":17,"item_id":5,"behavior":"浏览"} | |
{"user_id":18,"item_id":5,"behavior":"点击"} | |
{"user_id":19,"item_id":4,"behavior":"评论"} | |
{"user_id":20,"item_id":3,"behavior":"分享"} | |
{"user_id":21,"item_id":3,"behavior":"评论"} | |
{"user_id":22,"item_id":2,"behavior":"分享"} | |
{"user_id":23,"item_id":1,"behavior":"分享"} | |
{"user_id":24,"item_id":5,"behavior":"分享"} | |
{"user_id":25,"item_id":4,"behavior":"评论"} | |
{"user_id":26,"item_id":2,"behavior":"点击"} | |
{"user_id":27,"item_id":3,"behavior":"评论"} | |
{"user_id":28,"item_id":2,"behavior":"评论"} | |
{"user_id":29,"item_id":1,"behavior":"点击"} | |
{"user_id":30,"item_id":5,"behavior":"浏览"} | |
{"user_id":31,"item_id":2,"behavior":"点击"} | |
{"user_id":32,"item_id":1,"behavior":"分享"} | |
{"user_id":33,"item_id":5,"behavior":"点击"} | |
{"user_id":34,"item_id":1,"behavior":"浏览"} | |
{"user_id":35,"item_id":3,"behavior":"点击"} | |
{"user_id":36,"item_id":4,"behavior":"评论"} | |
{"user_id":37,"item_id":3,"behavior":"评论"} | |
{"user_id":38,"item_id":2,"behavior":"浏览"} | |
{"user_id":39,"item_id":3,"behavior":"评论"} | |
{"user_id":40,"item_id":4,"behavior":"分享"} | |
{"user_id":41,"item_id":3,"behavior":"浏览"} | |
{"user_id":42,"item_id":2,"behavior":"点击"} | |
{"user_id":43,"item_id":3,"behavior":"浏览"} | |
{"user_id":44,"item_id":3,"behavior":"评论"} | |
{"user_id":45,"item_id":2,"behavior":"点击"} | |
{"user_id":46,"item_id":5,"behavior":"分享"} | |
{"user_id":47,"item_id":1,"behavior":"分享"} | |
{"user_id":48,"item_id":5,"behavior":"浏览"} | |
{"user_id":49,"item_id":2,"behavior":"评论"} | |
{"user_id":50,"item_id":5,"behavior":"点击"} | |
{"user_id":51,"item_id":4,"behavior":"评论"} | |
{"user_id":52,"item_id":4,"behavior":"分享"} | |
{"user_id":53,"item_id":4,"behavior":"浏览"} | |
{"user_id":54,"item_id":1,"behavior":"评论"} | |
{"user_id":55,"item_id":1,"behavior":"评论"} | |
{"user_id":56,"item_id":2,"behavior":"浏览"} | |
{"user_id":57,"item_id":1,"behavior":"分享"} | |
{"user_id":58,"item_id":4,"behavior":"点击"} | |
{"user_id":59,"item_id":1,"behavior":"评论"} | |
{"user_id":60,"item_id":2,"behavior":"点击"} | |
{"user_id":61,"item_id":1,"behavior":"评论"} | |
{"user_id":62,"item_id":2,"behavior":"浏览"} | |
{"user_id":63,"item_id":1,"behavior":"点击"} | |
{"user_id":64,"item_id":3,"behavior":"点击"} | |
{"user_id":65,"item_id":1,"behavior":"浏览"} | |
{"user_id":66,"item_id":3,"behavior":"点击"} | |
{"user_id":67,"item_id":1,"behavior":"点击"} |
- 创建 kafka 表可以使用的元数据信息作为字段
| 键 | 数据类型 | 描述 | R/W |
|---|---|---|---|
topic |
STRING NOT NULL |
Kafka 记录的 Topic 名。 | R |
partition |
INT NOT NULL |
Kafka 记录的 partition ID。 | R |
headers |
MAP NOT NULL |
二进制 Map 类型的 Kafka 记录头(Header)。 | R/W |
leader-epoch |
INT NULL |
Kafka 记录的 Leader epoch(如果可用)。 | R |
offset |
BIGINT NOT NULL |
Kafka 记录在 partition 中的 offset。 | R |
timestamp |
TIMESTAMP_LTZ(3) NOT NULL |
Kafka 记录的时间戳。 | R/W |
timestamp-type |
STRING NOT NULL |
Kafka 记录的时间戳类型。可能的类型有 “NoTimestampType”, “CreateTime”(会在写入元数据时设置),或 “LogAppendTime”。 | R |
# hive
-
配置支持 hive 环境:要与 Hive 集成,您需要在 Flink 下的
/lib/目录中添加一些额外的依赖包, 以便通过 Table API 或 SQL Client 与 Hive 进行交互。 -
下载相关的 jar 包阿里云云效 maven 下载地址搜索
flink-sql-connector-hive-3.1.3_2.12 -
Apache Hive 是基于 Hadoop 之上构建的,首先您需要在环境变量 Hadoop 的依赖。
export HADOOP_CLASSPATH=/opt/module/hadoop-3.3.2 -
Hive 的 lib 目录下找到如下两个包,复制到 flink 的 lib 目录下。
/flink-1.17.2
/lib
// Flink's Hive connector
flink-connector-hive_2.12-1.17.2.jar
// Hive dependencies
hive-exec-3.1.0.jar
libfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately
// add antlr-runtime if you need to use hive dialect
antlr-runtime-3.5.2.jar
-
移动 planner jar 包
把
FLINK_HOME/opt下的 jar 包flink-table-planner_2.12-1.17.2.jar移动到FLINK_HOME/lib下,并且将FLINK_HOME/lib下的 jar 包flink-table-planner-loader-1.17.2.jar移出去。 具体原因请参见 FLINK-25128。你可以使用如下命令来完成移动 planner jar 包的工作:mv $FLINK_HOME/opt/flink-table-planner_2.12-1.17.2.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.17.2.jar
mv $FLINK_HOME/lib/flink-table-planner-loader-1.17.2.jar $FLINK_HOME/opt/flink-table-planner-loader-1.17.2.jar
NOTE: 只有当要使用 Hive 语法 或者 HiveServer2 endpoint, 你才需要做上述的 jar 包移动。 但是在集成 Hive 的时候,推荐进行上述的操作。
-
启动 hive,flink,创建 hive 的 catalog
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'test',
'hive-conf-dir' = '/opt/module/hive/conf'
);
USE CATALOG myhive;
-
查询 hive 库的表
- 查看 hive 表数据