前言: Flink 提供了丰富的算子来方便开发者处理各种应用场景。
# 一、基本转换类算子
-
# Map:
- 作用:将用户定义的 MapFunction 函数应用于数据流中的每个元素。数据流中的每个元素将作为输入元素进入用户定义的 MapFunction 函数,MapFunction 函数将对输入的元素进行转换并产生一个结果元素输出到新的数据流中。
- ** 数据流转换:**DataStream -> DataStream。
- ** 应用场景:** 解析元素,转换数据类型。
- 代码示例:
-
# FlatMap:
- ** 作用:** 将用户定义的 FlatMapFunction 函数应用于数据流中的每一个元素,数据流中的每个元素将作为输入元素进入 FlatMapFunction 函数,FlatMapFunction 函数将对输入的元素进行转换并产生 0 个,1 个,或者多个结果元素输出到新的数据流中。
- ** 数据流转换:**DataStream -> DataStream。
- ** 应用场景:** 拆分不需要的列表和数组。
- 代码示例:
-
# Filter:
- ** 作用:** 将用户定义的 FilterFunction 函数应用于数据流中的每个元素。数据流中的每个元素将作为输入元素进入 FilterFunction 函数,FilterFunction 函数对输入的元素进行判断来决定保留该元素还是丢弃该元素,返回 true 代表保留,返回 false 代表丢弃。
- ** 数据流转换:**DataStream -> DataStream。
- ** 应用场景:** 数据去重
- 代码示例:
-
# KeyBy:
- ** 作用:**KeyBy 操作符的执行逻辑是将一个数据流分成不相交的流分区(分区数量和并行度一致),所有具有相同 Key 的元素都被分配到相同的流分区,在 keyby 操作符内部通过散列算法来划分元素到对应的流分区。
- ** 数据流转换:**DataStream -> KeyedStream。
- ** 应用场景:** 聚合、状态管理、窗口操作
- 代码示例:
-
# Reduce:
- ** 作用:**Reduce 操作符应用用户定义的 ReduceFunction 函数将 KeyedStream 中具有相同 Key 元素合并为单个值,而且总是将两个元素合并为一个元素,具体细节为将上一个合并过的值和当前输入的元素结合,产生新的值发出。Reduce 操作符将 ReduceFunction 函数连续应用于同一个组的所有值,直到仅剩一个值为止。
- ** 数据流转换:**KeyedStream -> DataStream。
- ** 应用场景:** 聚合计算
- 代码示例:
# 二、聚合算子
-
# aggregations
-
** 作用:** 提供一系列内置的聚合逻辑,可以对 KeyStream 中具有相同 Key 的元素进行求和,求最大值或最小值等。提供两种方式对具有相同 key 的元素进行聚合计算。对于 POJO 类型的元素可以通过指定字段名称来指定聚合字段,对于元组类型的元素可以通过指定元素的索引来指定聚合字段。
-
** 数据流转换:**KeyedStream-> DataStream
-
分类:
- **sum:** 返回同一个 Key 分组下指定字段的和。
- **min:** 返回同一个 key 分组下指定字段的最小值。
- **minby:** 返回同一个 key 分组下指定字段中具有最小值的元素。
- **max:** 返回同一个 key 分组下指定字段的最大值。
- **maxby:** 返回同一个 key 分组下指定字段中具有最大值的元素。
注:类似关系型数据的 SQL 语法,Min 是取字段的最小值,MinBy 是取字段最小的一整行数据(max 同理)。
-
应用场景:
-
代码示例:
-
# 三、数据流操作算子
-
# keyBY(前面介绍过)
-
Split 和 Select(后续版本删除,作者使用版本为 1.14):
-
** 作用:** 这两个操作符通常是组合使用的,Split 操作符根据用户定义的标准将数据流拆分为两个或者更多的数据流,Select 操作符根据用户定义的标准获取对应的数据流,以便在获取的数据流中执行后续的转换操作。
-
** 数据流转换:**split:DataStream -> SplitStream
select:SplitStream -> DataStream -
** 应用场景:** 切分数据流,获取数据流
-
代码示例:
-
-
# Project
- ** 作用:**Project 操作符作用在元素的数据类型是元组的数据流中,它根据指定的索引从元组中选择对应的字段组成一个子集。该操作符的参数是一个变长参数,类型为 Int。参数指定保留的输入元组的字段索引,输出元组的字段顺序与字段索引的顺序相对应。
- ** 数据流转换:**DataStream ->DataStream
- ** 应用场景:** 按元组索引创建新流
- 代码示例:
-
# Union
- ** 作用:**Union 操作符负责将两个或者多个相同类型的数据流进行合并来创建一个包含数据流中所有元素的新数据流。该操作符的参数是一个变长参数,可以支持合并多个相同类型的数据流.
- ** 数据流转换:**DataStream -> DataStream
- ** 应用场景:** 合并不同数据源相同数据类型的数据流
- 代码示例:
-
# Connect
- ** 作用:** 连接两个保留其类型的数据流创建新的连接流,从而允许这两个数据流共享状态。
- ** 数据流转化:**DataStream+DataStream -> ConnectedStream
- 应用场景:
- 代码示例:
-
# CoMap,CoFlatMap
- 作用:
- CoMap:在 connectStream 中使用 Map 操作符将用户定义的 CoMapFunction 函数应用于 ConnectedStream 中的每个元素。数据流中的每个元素将作为输入元素进入 CoMapFunction 函数,对输入的元素进行转换并产生一个结果元素输出到新的数据流中。CoMapFunction 函数提供了 map1,map2 两种转换方式分别处理 connectStream 的两个不同类型的数据流。
- CoFlatMap:在 connectStream 中使用 FlatMap 操作符将用户定义的 CoFlatMapFunction 函数应用于 ConnectedStream 中的每个元素。数据流中的每个元素将作为输入元素进入 CoMapFunction 函数,对输入的元素进行转换并产生 0 个,1 个,或多个结果元素输出到新的数据流中。CoFlatMapFunction 函数提供了 flatmap1,flatmap2 两种转换方式分别处理 connectStream 的两个不同类型的数据流。
- ** 数据流转换:**ConnectedStream -> DataStream
- 应用场景:
- 代码示例:
- 作用:
-
# Iterate
- ** 作用:** 流处理程序的迭代计算实现了一个 Step 函数并将其嵌入 IterativeStream 类型的数据流,由于流处理程序可能永远不会完成,所以没有最大迭代次数。相反需要开发者指定数据流的那一部分被反馈回迭代操作中,以及那一部分使用 split 操作符或 filter 操作符转发到下游数据流。迭代操作将一个操作符的输出重定向到某个先前的操作符,在数据流中创建 “反馈” 循环。
- ** 数据流转换:**DataStream -> IterativeStream ->DataStream
- ** 应用场景:** 更新模型算法
- 代码示例:
# 四、窗口算子
有关窗口的介绍请参考文章Flink 的窗口
# 6. 代码示例
- 自定义数据源 (1 秒发送一条数据)
package com.shyl.bean; | |
import org.apache.flink.api.java.tuple.Tuple3; | |
import org.apache.flink.streaming.api.datastream.DataStreamSource; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.source.SourceFunction; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
public class SourceForWindow implements SourceFunction<Tuple3<String, Integer, String>> { | |
public static Logger LOG = LoggerFactory.getLogger(SourceForWindow.class); | |
private volatile boolean isRunning = true; | |
// 发送元素间隔时间 | |
private long sleepTime; | |
public static final String[] WORDS = new String[]{ | |
"shyl", | |
"shyl", | |
"shyl", | |
"shyl", | |
"shyl", | |
"java", | |
"flink", | |
"flink", | |
"flink", | |
"shyl", | |
"shyl", | |
"hadoop", | |
"hadoop", | |
"spark" | |
}; | |
public SourceForWindow(long sleepTime) { | |
this.sleepTime = sleepTime; | |
} | |
@Override | |
public void run(SourceContext<Tuple3<String, Integer, String>> ctx) throws Exception { | |
int count = 0; | |
while (isRunning) { | |
String word = WORDS[count % WORDS.length]; | |
String time = getHHmmss(System.currentTimeMillis()); | |
Tuple3<String, Integer, String> tuple2 = Tuple3.of(word, count, time); | |
ctx.collect(tuple2); | |
LOG.info("send data :" + tuple2); | |
Thread.sleep(sleepTime); | |
count++; | |
} | |
} | |
@Override | |
public void cancel() { | |
isRunning = false; | |
} | |
public static String getHHmmss(Long time) { | |
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); | |
String str = sdf.format(new Date(time)); | |
return "时间:" + str; | |
} | |
} |
富函数:是Flink的高级操作符,将RichFunction接口成为富函数。所有的操作符上应用的函数都有富函数版本,只需要在各种函数类名前面加上Rich前缀即可。富函数在基本函数提供的操作方法之外额外提供了一系列方法方便开发者丰富自己的业务逻辑。
1.🔴void open():执行基本操作方法前的初始化方法,它在基本的操作方法第一次被调用之前调用,因此适合在方法中进行编写初始化资源等一次性设置工作。该方法可能会转发运行时捕获的异常,当运行时捕获异常时,它将中止任务,并根据指定的重启策略决定是否重试任务。
2.🔴void close():在最后一次调用基本的操作方法之后调用它,换句话说就是在Flink程序结束前被调用,主要用于释放程序中的资源,该方法也可能转发运行时捕获的异常。
3.🔴RuntimeContext getRuntimeContext():获取有关用户定义的函数运行时的上下文信息,例如函数的并行度,函数的子任务索引或者执行函数的任务名称。RuntimeContext还提供对累加器,分布式缓存,计数器,当前配置信息,状态等对象的访问。
4.IterationRuntimeContext getIterationRuntimeContext():获取RuntimeContext的专用版本,该版本具有有关执行函数迭代的其他信息。仅当函数是迭代的一部分时,此IterationRuntimeContext才可用,否则调用此方法将引发异常。
5.void setRuntimeContext(RuntimeContext t):设置函数运行时的上下文,将用户定义的函数应用于操作符的并行实例时由Flink框架自动调用。