# 一、为什么需要窗口?
在 Flink 中,流是一个无界数据集,当我们需要在流中进行一些聚合运算时,比如统计最近 30 秒的请求量或者异常次数,然后根据请求或者异常次数采取相应措施。无论是从时间还是空间上,在流中做聚合计算都不现实。所以必须将聚合计算回归到有界数据集上,因此 Flink 引入了窗口来作为框定一个有限数据集,窗口将流拆分成有限大小的 “桶”,允许开发者在这些桶中做聚合计算。Flink 的窗口可以是时间驱动(Time Window),也可以是事件驱动 (Event Window)。
# 二、 基本概念
窗口化的流处理程序一般有两种结构,第一种结构是基于分组的窗口化数据流,第二种结构基于非分组的窗口化数据流。这两种结构的唯一区别是分组的数据流调用 KeyBy 操作符和 Window 操作符,而非分组的窗口化数据流仅调用 WindowAll 操作符。
# 三、 Window(分组),WindowAll(非分组)对比
- Window: 将在已经分区的 KeyedStream 中定义一个窗口,窗口根据某些特征将具有相同 key 的数据分为一组,(例如最近 5 秒到达的数据)。
stream | |
.keyby(...) <- 根据指定的key将数据流中的数据进行分区 | |
.window(...) <- 必选:“窗口分配器” | |
[.trigger(...)] <- 可选:“窗口触发器”(默认使用各窗口分配器内置的触发器实现) | |
[.evictor(...)] <- 可选:“窗口剔除器”(默认没有实现) | |
[.allowedLateness(...)] <- 可选:“窗口接受迟到的元素的最大时间”(默认为0) | |
[.sideOutputLateData(...)] <- 可选:“侧端输出标签” | |
(将窗口中迟到的数据发送到给定的OutPutTag标识的侧端输出,默认窗口中迟到元素没有侧端输出) | |
.reduce/aggregate/fold/apply() <- 必选:“窗口函数” | |
[.getSideOutput()] <- 可选:“侧端输出” | |
(使用给定的OutputTag获取一个DataStream,其中包含操作符发送到侧端输出的元素) |
- WindowAll: 在常规的 DataStream 中调用 WIndowAll 操作符来定义一个窗口,窗口会根据某些特征对所有流数据进行分组。注:在大部分情况下使用 WIndowAll 操作符并不是并行转换,所有上游操作符发送的元素都将被收集在 WindowAll 操作符的任务中。
stream | |
.WindowAll(...) <- 必选:“窗口分配器” | |
[.trigger(...)] <- 可选:“窗口触发器”(默认使用各窗口分配器内置的触发器实现) | |
[.evictor(...)] <- 可选:“窗口剔除器”(默认没有实现) | |
[.allowedLateness(...)] <- 可选:“窗口接受迟到的元素的最大时间”(默认为0) | |
[.sideOutputLateData(...)] <- 可选:“侧端输出标签” | |
(将窗口中迟到的数据发送到给定的OutPutTag标识的侧端输出,默认窗口中迟到元素没有侧端输出) | |
.reduce/aggregate/fold/apply() <- 必选:“窗口函数” | |
[.getSideOutput()] <- 可选:“侧端输出” | |
(使用给定的OutputTag获取一个DataStream,其中包含操作符发送到侧端输出的元素) |
# 四、 窗口的生命周期
在数据流中定义好一个窗口后,当属于某个窗口的第一元素到达时,该窗口就会创建,当时间(事件时间或处理时间)超过该窗口的结束时间戳加用户指定的允许延迟时间的总和后,该窗口将被完全删除。此外每个窗口都带有一个 trigger(触发器)和一个窗口函数 ProcessWindowFunction、ReduceFunction、AggregateFunction 等。窗口函数包含要应用于窗口中元素的计算逻辑,而触发器则是确定窗口何时该被窗口函数处理。触发的策略可能类似于 “当元素超过 4 个” 或者 “当水印经过窗口末端” 等。
初此之外,Flink 还允许开发者指定一个 Evictor,它能够在触发器触发之后,窗口函数应用于窗口之前和 / 或之后从窗口中删除某些元素。
# 五、 窗口分配器
-
作用: 窗口分配器负责将每个传入的元素按照某种方式分配给一个或多个窗口。Flink 为最常见的几种应用场景(滚动窗口,滑动窗口,会话窗口,全局窗口)提供了预定义的以简化流处理程序的开发。还可以实现 windowAssigner 抽象类实现自定义窗口分配器。
注:所有预定义的窗口分配器除了全局窗口可以基于数据驱动的(根据元素数量来划分窗口),其它类型的窗口都是基于时间驱动的(窗口有一个开始时间戳(包括),和一个结束时间戳(不包括),共同描述窗口的大小)。这个时间可以是处理时间,也可以是事件时间。 -
分类:
- 滚动窗口(Tumbling Window): 将每个元素分配给指定窗口大小的窗口。 滚动窗口具有固定大小,不重叠。
DataStream<T> input = ...;
// tumbling event-time windowsinput
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windowsinput
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
示例代码参考 github 地址
- 滑动窗口(Sliding Window): 将元素分配给固定长度的窗口,窗口的大小由 TIme 配置。除此之外,滑动窗口分配器还提供一个参数用于控制滑动窗口的滑动间隙如果滑动窗口的大小小于滑动间隙,则滑动窗口可能会重叠。
DataStream<T> input = ...;
// sliding event-time windowsinput
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windowsinput
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows offset by -8 hoursinput
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
源码:第一次启动的窗口大小
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
示例代码参考 github 地址
- 会话窗口(Session Window):会话窗口分配器根据活动的会话对元素进行分组,与滚动窗口和滑动窗口相反,会话窗口既不会重叠,也没有固定的开始时间和结束时间。当会话窗口在一段时间内没有收到元素时,他就会关闭。会话窗口分配器既可以配置静态间隙,也可以通过会话间隙提取器函数来配置动态会话间隙。
DataStream<T> input = ...;
// event-time session windows with static gapinput
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gapinput
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gapinput
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gapinput
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap}))
.<windowed transformation>(<window function>);
示例代码参考 github 地址
- 全局窗口(Global Window):Flink 中的窗口既可以是时间驱动的,也可以是事件驱动的。全局窗口就是被事件驱动的(这里不太准确,你可以实现自定义的触发器支持时间)全局窗口分配器将具有相同 key 的所有元素分配给一个全局窗口。要使全局窗口模式可用,还必须为该窗口指定触发器,否则触发器不执行任何计算,这是因为全局窗口没有可以处理聚合元素的自然终点。例如指定一个大小为 3 个元素的窗口,每当窗口中的元素达到 3 个就会开启一个新的窗口。
DataStream<T> input = ...; | |
input | |
.keyBy(<key selector>) | |
.window(GlobalWindows.create()) | |
.<windowed transformation>(<window function>); |
示例代码参考 github 地址
# 六、窗口函数
在窗口分配器生成窗口后,需要为每个窗口指定在该窗口中执行的计算策略。这就是窗口函数的职责。一旦系统确定一个窗口以准备好进行处理,窗口函数将用于处理每个(也可能是键控)的窗口中的每个元素。
-
RecuceFunction
- 作用: reduce 操作符应用于用户定义的 ReduceFunction 窗口函数将一个窗口内的元素组合为单个值,而且总是将两个元素合并为一个元素,具体细节为将上一个合并过的值和当前输入的元素结合,产生新的值并发出。ReduceFunction 窗口函数会连续应用于同一个窗口内的所有值,知道仅剩一个值为止。
- 数据流转换: WindowedStream -> DataStream /AllWindowedStream -> DataStream
- 应用场景:
- 代码示例:
windowedStream.reduce(ReduceFunction<T> function)
-
# FlodFunction:(当前版本删除)
- 作用: 将用户定义的 FlodFuction 窗口函数指定如何将窗口的输入元素与输出类型的元素进行组合,添加到窗口的第一个元素将与输出类型的预定义的初始值进行组合。用户定义 FlodFunction 窗口函数的核心逻辑是将输入值和当前输出类型的值组合为一个指定输出类型的值。FlodFunction 窗口函数会连续应用于同一个窗口内的所有值,知道仅剩一个值为止。
- 数据流转换: WindowedStream -> DataStream /AllWindowedStream -> DataStream
- 应用场景:
- 代码示例:
windowedStream.fold(R initialValue,FoldFunction<T> function)
-
# AggregateFunction:
- 作用: 应用用户定义的 AggregateFunction 窗口函数对窗口内的元素进行聚合计算。AggregateFunction 是 ReduceFunction 的广义版本,它有三种类型:输入类型(汇总值的类型,IN)、累加器类型(中间聚合状态 ACC)和输出类型(汇总结果的类型,OUT),相比 ReduceFunction 窗口函数,AggregateFunction 窗口函数可以为聚合计算的输入值,中间聚合值,和结果值使用不同的类型,以支持更广泛的聚合类型。为了简化开发者对于 AggregateFunction 窗口函数的使用,提供了一些预定义的实现逻辑,可以对一个窗口内的元素进行求和,求最大值,最小值等。
- 数据流转换: WindowedStream -> DataStream /AllWindowedStream -> DataStream
- 应用场景:
- 代码示例:
windowStream.aggregate(AggregateFunction <T,ACC,R> function)
- AggregateFunction 接口
- ACC createAccumulator () : 该方法将创建一个新的累加器以执行一个新的聚合操作。除非通过 add(…)方法添加值,否则创建新的累加器通常没有意义,累加器时正在执行聚合操作中的状态。
- ACC add (IN value,ACC accumulator): 该方法将给定的输入值添加到给定的累加器中,并返回新的累加器值。
- OUT getResult (ACC accumulator): 该方法将从累加器中获取聚合的结果。
- ACC merge (ACC a,ACC b): 该方法将合并两个累加器,返回一个具有合并状态的累加器。该方法可以重用任何给定的累加器作为合并的目标并返回。
-
# ProcessWindowFunction/ProcessAllWindowFunction
- 作用: process 操作符应用用户定义的 ProcessWindowFunction 和 ProcessAllWindowFunction 窗口函数对窗口内的元素进行计算,这两个窗口函数会获取一个包含窗口所有元素的迭代器,以及一个可以访问窗口和状态信息的上下文对象,因此这两个窗口函数具有比其他窗口函数更大的灵活性。但是以消耗性能和资源为代价的,因为窗口内的元素不能增量的聚合,需要在内部进行缓冲,直到认为窗口已经准备好进行处理,所以当开发者使用这两个函数时需要充分考虑内存的占用情况。
- 数据流转换: WindowedStream -> DataStream /AllWindowedStream -> DataStream
- 应用场景:
- 代码示例:
windowedStream.process(ProcessWindowFunction<T,R,K,W> function)
类型参数:
<T> – 输入值的类型。<R> – 输出值的类型。<K> – 密钥的类型。<W> – 可以应用此窗口函数的 Window 类型。
//AllWindowedStream.process(ProcessAllWindowFunction<T,R,W> function)
-
# 增量聚合的 ProcessWindowFunction/ProcessAllWindowFunction
- 作用: ProcessWindowFunction 或 ProcessAllWindowFunction 可以和 ReduceFunction、AggregateFunction、FoldFunction 组合使用,以便在元素到达窗口时增量的聚合它们。当窗口关闭时,ProcessWindowFunction 或 ProcessAllWindowFunction 将提供聚合的结果。这允许开发者在访问 ProcessWindowFunction 或 ProcessAllWindowFunction 提供的窗口元素据的同时可以对窗口内的元素进行增量的聚合计算。
- 数据流转换: WindowedStream -> DataStream /AllWindowedStream -> DataStream
- 应用场景:
- 代码示例:
ReduceFunction 和 ProcessWindowFunction组合
windowedStream.reduce(
ReduceFunction<T> reduceFunction
ProcessWindowFunction<T,R,K,W> function)
ReduceFunction 和 ProcessAllWindowFunction组合
AllWindowedStream.reduce(
ReduceFunction<T> reduceFunction
ProcessAllWindowFunction<T,R,W> function)