# 一、Flink 相关名词解析
- DataStream API
- DataSet API
# 二、FLink 程序剖析
Flink 程序看起来像是转换的常规程序。每程序由相同的基本部分组成: DataStreams
- 获取执行环境 ,
execution environment - 加载 / 创建初始数据,
- 指定此数据的转换,
- 指定计算结果的放置位置,
- 触发程序执行
执行环境方法:
getExecutionEnvironment() | |
createLocalEnvironment() | |
createRemoteEnvironment(String host, int port, String... jarFiles) |
数据源:
基于文件:
-
readTextFile(path)- 逐行读取文本文件,即遵守规范的文件,并将它们作为字符串返回。`` -
readFile(fileInputFormat, path)- 根据指定的文件输入格式读取(一次)文件。 -
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)- 这是前两个方法内部调用的方法。它根据给定的。根据所提供的 ,该源可以定期监视(每毫秒)新数据的路径(),或者处理当前路径中的数据并退出()。使用 ,用户可以进一步排除正在处理的文件。``实现:
在底层,Flink 将文件读取过程拆分为两个子任务,即目录监控和数据读取。这些子任务中的每一个都由一个单独的实体实现。监视由单个非并行(并行度 = 1)任务实现,而读取由并行运行的多个任务执行。后者的并行度等于作业并行度。单个监控任务的作用是扫描目录(定期或仅扫描一次,具体取决于目录),找到需要处理的文件,将其拆分,并将这些拆分分配给下游读取器。读者是读取实际数据的人。每个拆分只能由一个读取器读取,而一个读取器可以逐个读取多个拆分。
重要提示:
- 如果 设置为 ,则在修改文件时,将完全重新处理其内容。这可能会破坏 “恰好一次” 的语义,因为在文件末尾附加数据将导致重新处理其所有内容。
- 如果 设置为 ,则源扫描路径一次并退出,而无需等待读取器完成对文件内容的读取。当然,读者将继续阅读,直到读取所有文件内容。关闭源会导致在此点之后不再有检查点。这可能会导致节点故障后的恢复速度变慢,因为作业将从最后一个检查点恢复读取。
基于套接字:
socketTextStream- 从套接字读取。元素可以用分隔符分隔。
基于集合:
fromCollection(Collection)- 从 Java Java.util.Collection 创建数据流。所有元素 在集合中必须属于同一类型。fromCollection(Iterator, Class)- 从迭代器创建数据流。该类指定 迭代器返回的元素的数据类型。fromElements(T ...)- 从给定的对象序列创建数据流。所有对象都必须是 相同类型。fromParallelCollection(SplittableIterator, Class)- 从迭代器创建数据流,在 平行。该类指定迭代器返回的元素的数据类型。generateSequence(from, to)- 在给定的时间间隔内生成数字序列,在 平行。
习惯:
addSource- 附加新的源函数【或者自定义数据源】。例如,要从 Apache Kafka 读取数据,可以使用。有关更多详细信息,请参阅连接器。addSource(new FlinkKafkaConsumer<>(...))
transfrom 算子
有关可用流转换的概述,请参阅运算符。
数据接收器
Flink 自带了多种内置的输出格式,这些格式封装在 数据流:
writeAsText()/TextOutputFormat- 将元素按行写入字符串。字符串是 通过调用每个元素的 toString() 方法获得。writeAsCsv(...)/CsvOutputFormat- 将元组写入逗号分隔值文件。行和字段 分隔符是可配置的。每个字段的值来自对象的 toString() 方法。print()/printToErr()- 打印 toString() 值 标准输出 / 标准误差流上的每个元素。或者,可以提供前缀 (msg),即 在输出之前。这有助于区分不同的打印调用。如果并行度为 大于 1,则输出还将在前面加上生成输出的任务的标识符。writeUsingOutputFormat()/FileOutputFormat- 自定义文件输出的方法和基类。支持 自定义对象到字节的转换。writeToSocket- 根据SerializationSchemaaddSink- 调用自定义接收器函数。Flink 捆绑了其他系统的连接器(例如 Apache Kafka),作为接收器函数实现。
# 三、执行模式
DataStream API 支持不同的运行时执行模式,你可以根据你的用例需要和作业特点进行选择。
DataStream API 有一种” 经典 “的执行行为,我们称之为 流(STREAMING) 执行模式。这种模式适用于需要连续增量处理,而且预计无限期保持在线的无边界作业。
此外,还有一种批式执行模式,我们称之为 批(BATCH) 执行模式。这种执行作业的方式更容易让人联想到批处理框架,比如 MapReduce。这种执行模式适用于有一个已知的固定输入,而且不会连续运行的有边界作业。
Apache Flink 对流处理和批处理统一方法,意味着无论配置何种执行模式,在有界输入上执行的 DataStream 应用都会产生相同的最终 结果。重要的是要注意最终 在这里是什么意思:一个在 流 模式执行的作业可能会产生增量更新(想想数据库中的插入(upsert)操作),而 批 作业只在最后产生一个最终结果。尽管计算方法不同,只要呈现方式得当,最终结果会是相同的。
可以通过设置配置执行模式。 有三个可能的值: execution.runtime-mode
STREAMING:经典 DataStream 执行模式(默认)BATCH:在 DataStream API 上进行批处理式执行AUTOMATIC:让系统根据源的有界性来决定
这可以通过命令行参数进行配置 ,或者 在创建 / 配置 . bin/flink run ...``StreamExecutionEnvironment
以下是通过命令行配置执行模式的方法:
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar |
此示例演示如何在代码中配置执行模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setRuntimeMode(RuntimeExecutionMode.BATCH); |
我们建议用户不要在程序中设置运行时模式,而是 提交应用程序时使用命令行进行设置。保持 无需配置应用程序代码,即可实现更大的灵活性 应用程序可以在任何执行模式下执行。
# 四、执行行为
# 五、任务调度与网络 Shuffle
Flink 作业由不同的操作组成,这些操作在数据流图中连接在一起。系统决定如何在不同的进程 / 机器(TaskManager)上调度这些操作的执行,以及如何在它们之间 shuffle (发送)数据。
将多个操作 / 算子链接在一起的功能称为链。Flink 称一个调度单位的一组或多个(链接在一起的)算子为一个 任务。通常,子任务 用来指代在多个 TaskManager 上并行运行的单个任务实例,但我们在这里只使用 任务 (task) 一词。
任务调度和网络 shuffle 对于 批 和 流 执行模式的执行方式不同。这主要是由于在 批 执行模式中,当知道输入数据是有边界的时候,Flink 可以使用更高效的数据结构和算法。
我们将用这个例子来解释任务调度和网络传输的差异。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
DataStreamSource<String> source = env.fromElements(...); | |
source.name("source") | |
.map(...).name("map1") | |
.map(...).name("map2") | |
.rebalance() | |
.map(...).name("map3") | |
.map(...).name("map4") | |
.keyBy((value) -> value) | |
.map(...).name("map5") | |
.map(...).name("map6") | |
.sinkTo(...).name("sink"); |
包含 1-to-1 连接模式的操作,比如 map() 、 flatMap() 或 filter() ,可以直接将数据转发到下一个操作,这使得这些操作可以被链接在一起。这意味着 Flink 一般不会在他们之间插入网络 shuffle。
而像 keyBy() 或者 rebalance() 这样需要在不同的任务并行实例之间进行数据 shuffle 的操作,就会引起网络 shuffle。
对于上面的例子,Flink 会将操作分组为这些任务:
- 任务 1:
source、map1和map2 - 任务 2:
map3和map4 - 任务 3:
map5、map6和sink
我们在任务 1 到任务 2、任务 2 到任务 3 之间各有一次网络 shuffle。这是该作业的可视化表示:
# 流执行模式
在 流 执行模式下,所有任务需要一直在线 / 运行。这使得 Flink 可以通过整个管道立即处理新的记录,以达到我们需要的连续和低延迟的流处理。这同样意味着分配给某个作业的 TaskManagers 需要有足够的资源来同时运行所有的任务。
网络 shuffle 是 流水线 式的,这意味着记录会立即发送给下游任务,在网络层上进行一些缓冲。同样,这也是必须的,因为当处理连续的数据流时,在任务(或任务管道)之间没有可以实体化的自然数据点(时间点)。这与 批 执行模式形成了鲜明的对比,在 批 执行模式下,中间的结果可以被实体化,如下所述。
# 批执行模式
在 批 执行模式下,一个作业的任务可以被分离成可以一个接一个执行的阶段。我们之所以能做到这一点,是因为输入是有边界的,因此 Flink 可以在进入下一个阶段之前完全处理管道中的一个阶段。在上面的例子中,工作会有三个阶段,对应着被 shuffle 界线分开的三个任务。
不同于上文所介绍的 流 模式立即向下游任务发送记录,分阶段处理要求 Flink 将任务的中间结果实体化到一些非永久存储中,让下游任务在上游任务已经下线后再读取。这将增加处理的延迟,但也会带来其他有趣的特性。其一,这允许 Flink 在故障发生时回溯到最新的可用结果,而不是重新启动整个任务。其二, 批 作业可以在更少的资源上执行(就 TaskManagers 的可用槽而言),因为系统可以一个接一个地顺序执行任务。
TaskManagers 将至少在下游任务开始消费它们前保留中间结果(从技术上讲,它们将被保留到消费的流水线区域产生它们的输出为止)。在这之后,只要空间允许,它们就会被保留,以便在失败的情况下,可以回溯到前面涉及的结果。
# 六、事件时间
-
** 处理时间:** 处理时间是指机器的系统时间,即执行相应的操作的时间。
当流式处理程序在处理时间上运行时,所有基于时间的操作 (如时间窗口)将使用运行各自的运算符。每小时的处理时间窗口将包括所有 在系统到达特定操作员的时间之间的记录 时钟显示整点。例如,如果应用程序开始运行 上午 9:15,第一个每小时处理时间窗口将包括事件 在上午 9:15 至上午 10:00 之间处理,下一个窗口将包含事件 在上午 10:00 至上午 11:00 之间处理,依此类推。
处理时间是最简单的时间概念,不需要协调 在流和计算机之间。它提供了最佳性能和 最低延迟。但是,在分布式和异步环境中 处理时间不提供确定性,因为它容易受到 记录到达系统的速度(例如,从消息 queue),以记录在 系统,以及中断(计划或其他方式)。
-
** 事件时间:** 事件时间是每个事件发生的时间 其生产装置。此时间通常嵌入在记录中 在他们进入 Flink 之前,可以从中提取该事件时间戳 每条记录。在事件时间中,时间的进展取决于数据,而不是 任何挂钟。事件时间程序必须指定如何生成事件时间 水印,这是在事件时间中发出进度信号的机制。这 水印机制将在后面的部分中介绍。
在理想情况下,事件时间处理将产生完全一致的结果 和确定性结果,无论事件何时到达,或者其 订购。但是,除非已知事件按顺序到达(通过 timestamp),事件时间处理在等待时会产生一些延迟 乱序事件。由于只能等待有限的时间,这限制了事件时间应用程序的确定性。
假设所有数据都已到达,则事件时间操作将表现为 预期,即使在使用 无序或延迟事件,或重新处理历史数据时。例如 每小时事件时间窗口将包含承载事件的所有记录 落入该小时的时间戳,无论它们的顺序如何 到达,或当它们被处理时。
-
摄取时间: