# 启动 flink 测试环境
  1. 启动本地集群
bin/start-cluster.sh
  1. 启动 sql-client
bin/sql-client.sh
  1. sql-client 的 3 种模式

    1. ’table‘
    set 'sql-client.execution.result-mode' = 'table';
    

    image-20241022134808047

    1. ‘changelog’
    set 'sql-client.execution.result-mode' = 'changelog';
    
    1. ‘tableau’
    set 'sql-client.execution.result-mode' = 'tableau';
    
  2. 设置批处理 / 流处理模式

set 'execution.runtime-mode'='batch';
set 'execution.runtime-mode'='streaming';
# 创建不同数据源的表
# kafka
  1. kafka-csv
create table if not exists kafka_table(
user_id bigint,
item_id bigint comment '物品id',
behavior string comment '行为',
ts timestamp_ltz(3) metadata from 'timestamp' )
with(
'connector'='kafka',
'topic' ='topic_flinksql',
'properties.bootstrap.servers'='nodev2001:9092',
'properties.group.id'='testGroup',
'scan.startup.mode'='earliest-offset',
'format'='csv'
)
  1. 数据样例
1,1,浏览
2,3,点击
3,2,评论
4,2,分享
5,5,点击
6,2,点击
7,3,点击
8,2,分享
9,3,点击
10,2,浏览
11,3,浏览
12,5,评论
13,1,评论
14,4,分享
15,1,分享
16,4,分享
17,5,浏览
18,5,点击
19,4,评论
20,3,分享
21,3,评论
22,2,分享
23,1,分享
24,5,分享
25,4,评论
26,2,点击
27,3,评论
28,2,评论
29,1,点击
30,5,浏览
31,2,点击
32,1,分享
33,5,点击
34,1,浏览
35,3,点击
36,4,评论
37,3,评论
38,2,浏览
39,3,评论
40,4,分享
41,3,浏览
42,2,点击
43,3,浏览
44,3,评论
45,2,点击
46,5,分享
47,1,分享
48,5,浏览
49,2,评论
50,5,点击
51,4,评论
52,4,分享
53,4,浏览
54,1,评论
55,1,评论
56,2,浏览
57,1,分享
58,4,点击
59,1,评论
60,2,点击
61,1,评论
62,2,浏览
63,1,点击
64,3,点击
65,1,浏览
66,3,点击
67,1,点击
68,2,评论
69,5,浏览
70,1,分享
71,4,点击
72,1,分享
73,1,点击
74,1,浏览
75,5,点击
76,5,浏览
77,4,浏览
78,2,浏览
79,5,点击
80,2,点击
81,1,点击
82,5,分享
83,5,评论
84,5,点击
85,5,浏览
86,5,点击
87,1,评论
88,2,评论
89,4,分享
90,1,分享
91,4,浏览
92,3,分享
93,3,浏览
94,1,点击
95,5,点击
96,3,点击
97,3,评论
98,3,分享
99,4,分享
100,1,评论
  1. 利用 kafka-json 插入数据
create table if not exists kafka_table(
user_id bigint,
item_id bigint comment '物品id',
behavior string comment '行为',
ts timestamp_ltz(3) metadata from 'timestamp' )
with(
'connector'='kafka',
'topic' ='topic_flinksql',
'properties.bootstrap.servers'='nodev2001:9092',
'properties.group.id'='testGroup',
'scan.startup.mode'='earliest-offset',
'format'='json',
'json.ignore-parse-errors'='true'  // 忽略解析错误
)
  1. 数据示例
{"user_id":1,"item_id":1,"behavior":"浏览"}
{"user_id":2,"item_id":3,"behavior":"点击"}
{"user_id":3,"item_id":2,"behavior":"评论"}
{"user_id":4,"item_id":2,"behavior":"分享"}
{"user_id":5,"item_id":5,"behavior":"点击"}
{"user_id":6,"item_id":2,"behavior":"点击"}
{"user_id":7,"item_id":3,"behavior":"点击"}
{"user_id":8,"item_id":2,"behavior":"分享"}
{"user_id":9,"item_id":3,"behavior":"点击"}
{"user_id":10,"item_id":2,"behavior":"浏览"}
{"user_id":11,"item_id":3,"behavior":"浏览"}
{"user_id":12,"item_id":5,"behavior":"评论"}
{"user_id":13,"item_id":1,"behavior":"评论"}
{"user_id":14,"item_id":4,"behavior":"分享"}
{"user_id":15,"item_id":1,"behavior":"分享"}
{"user_id":16,"item_id":4,"behavior":"分享"}
{"user_id":17,"item_id":5,"behavior":"浏览"}
{"user_id":18,"item_id":5,"behavior":"点击"}
{"user_id":19,"item_id":4,"behavior":"评论"}
{"user_id":20,"item_id":3,"behavior":"分享"}
{"user_id":21,"item_id":3,"behavior":"评论"}
{"user_id":22,"item_id":2,"behavior":"分享"}
{"user_id":23,"item_id":1,"behavior":"分享"}
{"user_id":24,"item_id":5,"behavior":"分享"}
{"user_id":25,"item_id":4,"behavior":"评论"}
{"user_id":26,"item_id":2,"behavior":"点击"}
{"user_id":27,"item_id":3,"behavior":"评论"}
{"user_id":28,"item_id":2,"behavior":"评论"}
{"user_id":29,"item_id":1,"behavior":"点击"}
{"user_id":30,"item_id":5,"behavior":"浏览"}
{"user_id":31,"item_id":2,"behavior":"点击"}
{"user_id":32,"item_id":1,"behavior":"分享"}
{"user_id":33,"item_id":5,"behavior":"点击"}
{"user_id":34,"item_id":1,"behavior":"浏览"}
{"user_id":35,"item_id":3,"behavior":"点击"}
{"user_id":36,"item_id":4,"behavior":"评论"}
{"user_id":37,"item_id":3,"behavior":"评论"}
{"user_id":38,"item_id":2,"behavior":"浏览"}
{"user_id":39,"item_id":3,"behavior":"评论"}
{"user_id":40,"item_id":4,"behavior":"分享"}
{"user_id":41,"item_id":3,"behavior":"浏览"}
{"user_id":42,"item_id":2,"behavior":"点击"}
{"user_id":43,"item_id":3,"behavior":"浏览"}
{"user_id":44,"item_id":3,"behavior":"评论"}
{"user_id":45,"item_id":2,"behavior":"点击"}
{"user_id":46,"item_id":5,"behavior":"分享"}
{"user_id":47,"item_id":1,"behavior":"分享"}
{"user_id":48,"item_id":5,"behavior":"浏览"}
{"user_id":49,"item_id":2,"behavior":"评论"}
{"user_id":50,"item_id":5,"behavior":"点击"}
{"user_id":51,"item_id":4,"behavior":"评论"}
{"user_id":52,"item_id":4,"behavior":"分享"}
{"user_id":53,"item_id":4,"behavior":"浏览"}
{"user_id":54,"item_id":1,"behavior":"评论"}
{"user_id":55,"item_id":1,"behavior":"评论"}
{"user_id":56,"item_id":2,"behavior":"浏览"}
{"user_id":57,"item_id":1,"behavior":"分享"}
{"user_id":58,"item_id":4,"behavior":"点击"}
{"user_id":59,"item_id":1,"behavior":"评论"}
{"user_id":60,"item_id":2,"behavior":"点击"}
{"user_id":61,"item_id":1,"behavior":"评论"}
{"user_id":62,"item_id":2,"behavior":"浏览"}
{"user_id":63,"item_id":1,"behavior":"点击"}
{"user_id":64,"item_id":3,"behavior":"点击"}
{"user_id":65,"item_id":1,"behavior":"浏览"}
{"user_id":66,"item_id":3,"behavior":"点击"}
{"user_id":67,"item_id":1,"behavior":"点击"}
  1. 创建 kafka 表可以使用的元数据信息作为字段
数据类型 描述 R/W
topic STRING NOT NULL Kafka 记录的 Topic 名。 R
partition INT NOT NULL Kafka 记录的 partition ID。 R
headers MAP NOT NULL 二进制 Map 类型的 Kafka 记录头(Header)。 R/W
leader-epoch INT NULL Kafka 记录的 Leader epoch(如果可用)。 R
offset BIGINT NOT NULL Kafka 记录在 partition 中的 offset。 R
timestamp TIMESTAMP_LTZ(3) NOT NULL Kafka 记录的时间戳。 R/W
timestamp-type STRING NOT NULL Kafka 记录的时间戳类型。可能的类型有 “NoTimestampType”, “CreateTime”(会在写入元数据时设置),或 “LogAppendTime”。 R
# hive
  1. 配置支持 hive 环境:要与 Hive 集成,您需要在 Flink 下的 /lib/ 目录中添加一些额外的依赖包, 以便通过 Table API 或 SQL Client 与 Hive 进行交互。

  2. 下载相关的 jar 包阿里云云效 maven 下载地址搜索 flink-sql-connector-hive-3.1.3_2.12

  3. Apache Hive 是基于 Hadoop 之上构建的,首先您需要在环境变量 Hadoop 的依赖。

    export HADOOP_CLASSPATH=/opt/module/hadoop-3.3.2
    
  4. Hive 的 lib 目录下找到如下两个包,复制到 flink 的 lib 目录下。

    /flink-1.17.2
       /lib
           // Flink's Hive connector
           flink-connector-hive_2.12-1.17.2.jar
           // Hive dependencies
           hive-exec-3.1.0.jar
           libfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately
           // add antlr-runtime if you need to use hive dialect
           antlr-runtime-3.5.2.jar
  5. 移动 planner jar 包

    FLINK_HOME/opt 下的 jar 包 flink-table-planner_2.12-1.17.2.jar 移动到 FLINK_HOME/lib 下,并且将 FLINK_HOME/lib 下的 jar 包 flink-table-planner-loader-1.17.2.jar 移出去。 具体原因请参见 FLINK-25128。你可以使用如下命令来完成移动 planner jar 包的工作:

    mv $FLINK_HOME/opt/flink-table-planner_2.12-1.17.2.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.17.2.jar
    mv $FLINK_HOME/lib/flink-table-planner-loader-1.17.2.jar $FLINK_HOME/opt/flink-table-planner-loader-1.17.2.jar

    NOTE: 只有当要使用 Hive 语法 或者 HiveServer2 endpoint, 你才需要做上述的 jar 包移动。 但是在集成 Hive 的时候,推荐进行上述的操作。

  6. 启动 hive,flink,创建 hive 的 catalog

    CREATE CATALOG myhive WITH (
        'type' = 'hive',
        'default-database' = 'test',
        'hive-conf-dir' = '/opt/module/hive/conf'
    );
    USE CATALOG myhive;
  7. 查询 hive 库的表

Snipaste_2024-11-21_14-20-48

  1. 查看 hive 表数据

Snipaste_2024-11-21_14-22-12