# 一、集群规划
| nodev2001 | nodev2002 | nodev2003 | nodev2004 |
|---|---|---|---|
| master | worker | worker |
适配的 java 版本
# 二、安装流程
前往 下载页面 获取可运行的软件包。
在下载完最新的发布版本后,复制压缩文件到 master 节点并解压:
tar xzf flink-1.14.0-bin-scala_2.12.tgz | |
mv flink-1.14.0-bin-scala_2.12.tgz /opt/module |
配置 Flink:
在解压完文件后,编辑 conf/flink-conf.yaml 文件来为集群配置 Flink。
-
设置
jobmanager.rpc.address配置项指向 master 节点 -
设置
jobmanager.memory.process.size和taskmanager.memory.process.size配置项来定义 Flink 允许在每个节点上分配的最大内存值。 -
如果 worker 节点上有你想分配到 Flink 系统的多余内存,你可以在这些特定节点的 conf/flink-conf.yaml 文件中重写
taskmanager.memory.process.size或taskmanager.memory.flink.size的默认值。 -
编辑文件 conf/masters 并输入 master 节点的 IP 或主机名。
-
编辑文件 conf/workers 并输入每个 worker 节点的 IP 或主机名。
-
Flink 目录复制到每个 worker 节点上。
-
参考 配置参数页面 获取更多细节以及额外的配置项。
-
特别地,
- 每个 JobManager 的可用内存值(
jobmanager.memory.process.size), - 每个 TaskManager 的可用内存值 (
taskmanager.memory.process.size,并检查 内存调优指南), - 每台机器的可用 CPU 数(
taskmanager.numberOfTaskSlots), - 集群中所有 CPU 数(
parallelism.default)和 - 临时目录(
io.tmp.dirs)
的值都是非常重要的配置项。
- 每个 JobManager 的可用内存值(
# 三、启动 Flink 集群
- 本地节点启动了一个 JobManager 并通过 SSH 连接到 workers 文件中所有的 worker 节点,master 节点的 flink 目录下
bin/start-cluster.sh |
-
关闭 Flink,这里同样有一个
stop-cluster.sh脚本。 -
为集群添加 JobManager[1]/TaskManager[2] 实例
添加 JobManager
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
添加 TaskManager
bin/taskmanager.sh start|start-foreground|stop|stop-all
确保在你想启动 / 关闭相应实例的主机上执行这些脚本。
# 四、使用 Yarn 管理集群
Flink 的 3 种部署模式
- 应用程序模式:
- 作业模式:Per-Job 模式使用可用的资源提供程序 框架(例如 YARN、Kubernetes)为每个提交的作业启动一个集群。此群集可用于 只有那份工作。作业完成后,群集将被拆除,任何延迟的资源(文件等)都会被拆除 清除了。
- 会话模式:多个 flink Application 向同一个 JobManager 提交,相互之间竞争资源;好处:不需要重复的启动集群。
| 部署模式 | 启动集群 | 资源隔离 | JM 数量 | 应用程序的方法 | 适合情况 |
|---|---|---|---|---|---|
| 应用程序模式 | 1 次 | 隔离 | 1/per job | jM | 【作业模式,会话模式折中】 |
| 作业模式 | 1/per job | 隔离 | 1/per job | client | 长时间运行的作业 |
| 会话模式 | 1 次 | 不隔离 | 1 | client | 低延迟,运行时间短 |
# 五、使用问题
- 会话模式,per-job 模式启动两个任务会失败
# 脚注
JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:
-
ResourceManager
ResourceManager 负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位(请参考 TaskManagers)。Flink 为不同的环境和资源提供者(例如 YARN、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。
-
Dispatcher
Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。
-
JobMaster
JobMaster 负责管理单个 JobGraph 的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。
始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby(请参考 高可用(HA))。
TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。
必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子(请参考 Tasks 和算子链)。
# 六、更换 flink 版本
由于 hive 的版本是 3.1.3,目前版本的 flink 不支持,所以升级 flink 版本为 1.17.2。官网推荐为 java11。添加 jdk11,并在启动脚本中临时切换 java 的版本为 java 11;
start-cluster.sh,yarn-session.sh 两个启动脚本中添加此代码 | |
export JAVA_HOME=/opt/module/jdk-11.0.15.1 | |
export PATH=$JAVA_HOME/bin:$PATH | |
# 执行 java -version 命令并保存输出 | |
java_version=$( java -version 2>&1 ) | |
# 输出 Java 版本信息 | |
echo "Java version info:" | |
echo "$java_version" | |
bin=`dirname "$0"` | |
bin=`cd "$bin"; pwd` |