# 一、集群规划

nodev2001 nodev2002 nodev2003 nodev2004
master worker worker

适配的 java 版本

# 二、安装流程

前往 下载页面 获取可运行的软件包。

在下载完最新的发布版本后,复制压缩文件到 master 节点并解压:

tar xzf flink-1.14.0-bin-scala_2.12.tgz
mv flink-1.14.0-bin-scala_2.12.tgz /opt/module

配置 Flink:

在解压完文件后,编辑 conf/flink-conf.yaml 文件来为集群配置 Flink。

  1. 设置 jobmanager.rpc.address 配置项指向 master 节点

  2. 设置 jobmanager.memory.process.sizetaskmanager.memory.process.size 配置项来定义 Flink 允许在每个节点上分配的最大内存值。

  3. 如果 worker 节点上有你想分配到 Flink 系统的多余内存,你可以在这些特定节点的 conf/flink-conf.yaml 文件中重写 taskmanager.memory.process.sizetaskmanager.memory.flink.size 的默认值。

  4. 编辑文件 conf/masters 并输入 master 节点的 IP 或主机名。

  5. 编辑文件 conf/workers 并输入每个 worker 节点的 IP 或主机名。

  6. Flink 目录复制到每个 worker 节点上。

  7. 参考 配置参数页面 获取更多细节以及额外的配置项。

  8. 特别地,

    • 每个 JobManager 的可用内存值( jobmanager.memory.process.size ),
    • 每个 TaskManager 的可用内存值 ( taskmanager.memory.process.size ,并检查 内存调优指南),
    • 每台机器的可用 CPU 数( taskmanager.numberOfTaskSlots ),
    • 集群中所有 CPU 数( parallelism.default )和
    • 临时目录( io.tmp.dirs

    的值都是非常重要的配置项。

# 三、启动 Flink 集群

  1. 本地节点启动了一个 JobManager 并通过 SSH 连接到 workers 文件中所有的 worker 节点,master 节点的 flink 目录下
bin/start-cluster.sh
  1. 关闭 Flink,这里同样有一个 stop-cluster.sh 脚本。

  2. 为集群添加 JobManager[1]/TaskManager[2] 实例

    添加 JobManager

    bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all

    添加 TaskManager

    bin/taskmanager.sh start|start-foreground|stop|stop-all

    确保在你想启动 / 关闭相应实例的主机上执行这些脚本。

# 四、使用 Yarn 管理集群

Flink 的 3 种部署模式

  1. 应用程序模式:
  2. 作业模式:Per-Job 模式使用可用的资源提供程序 框架(例如 YARN、Kubernetes)为每个提交的作业启动一个集群。此群集可用于 只有那份工作。作业完成后,群集将被拆除,任何延迟的资源(文件等)都会被拆除 清除了。
  3. 会话模式:多个 flink Application 向同一个 JobManager 提交,相互之间竞争资源;好处:不需要重复的启动集群。
部署模式 启动集群 资源隔离 JM 数量 应用程序的方法 适合情况
应用程序模式 1 次 隔离 1/per job jM 【作业模式,会话模式折中】
作业模式 1/per job 隔离 1/per job client 长时间运行的作业
会话模式 1 次 不隔离 1 client 低延迟,运行时间短

# 五、使用问题

  1. 会话模式,per-job 模式启动两个任务会失败

# 脚注

JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:

  • ResourceManager

    ResourceManager 负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位(请参考 TaskManagers)。Flink 为不同的环境和资源提供者(例如 YARN、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。

  • Dispatcher

    Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。

  • JobMaster

    JobMaster 负责管理单个 JobGraph 的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。

始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby(请参考 高可用(HA))。

TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。

必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子(请参考 Tasks 和算子链)。

# 六、更换 flink 版本

由于 hive 的版本是 3.1.3,目前版本的 flink 不支持,所以升级 flink 版本为 1.17.2。官网推荐为 java11。添加 jdk11,并在启动脚本中临时切换 java 的版本为 java 11;

start-cluster.sh,yarn-session.sh 两个启动脚本中添加此代码
export JAVA_HOME=/opt/module/jdk-11.0.15.1
export PATH=$JAVA_HOME/bin:$PATH
# 执行 java -version 命令并保存输出
java_version=$( java -version 2>&1 )
# 输出 Java 版本信息
echo "Java version info:"
echo "$java_version"
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

  1. JobManager ↩︎

  2. TaskManagers ↩︎