基于Flink 1.17版本的学习笔记,包括部署、原理、算子、开发代码等等。
集群部署
集群角色
- 客户端(Client):代码由客户端获取并转换,之后提交给JobManager。
- JobManager就是Flink集群里的“管事人”, 对作业进行中央调度管理;而它获取到要执行的作用后,会进一步处理转换,然后分发任务给众多的TaskManager。
- TaskManager是真正“干活的人”,数据的护理操作都是它们来做的。
Flink支持多种不同的部署场景,还可以和不同的资源管理平台方便的集成。
集群规划
节点服务器 | hadoop3 | hadoop2 | hadoop1 |
---|---|---|---|
角色 | JobManager TaskManager | Task Manager | Task Manager |
修改配置文件
$ vi flink-conf.yaml
jobmanager.rpc.address: pi3
jobmanager.bind-host: 0.0.0.0
taskmanager.bind-host: 0.0.0.0
### 更为各台主机的主机名
taskmanager.host: pi3
rest.address: pi3
rest.bind-address: 0.0.0.0
$ vi workers
pi3
pi2
pi4
$ vi masters
pi3
分发文件到各主机
$ scp -r /opt/flink-1.17.1 pi2:/opt/flink-1.17.1
$ scp -r /opt/flink-1.17.1 pi4:/opt/flink-1.17.1
启动集群
$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host pi3.
Starting taskexecutor daemon on host pi3.
Starting taskexecutor daemon on host pi2.
Starting taskexecutor daemon on host pi4.
问题1:Error: VM option ‘UseG1GC’ is experimental and must be enabled via -XX:+UnlockExperimentalVMOptions.
解决方法:
/opt/flink-1.17.1/bin$ vi taskmanager.sh
# 注释如下内容
if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
fi
scp taskmanager.sh pi2:/opt/flink-1.17.1/bin
scp taskmanager.sh pi3:/opt/flink-1.17.1/bin
scp taskmanager.sh pi4:/opt/flink-1.17.1/bin
手工打包工程
修改pom.xml, 在build.plugins中添加如下内容:
<!-- Flink官方打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions> <execution> <phase>package</phase>
<goals> <goal>shade</goal>
</goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact>
<excludes> <exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>my.programs.main.clazz</mainClass>
</transformer> </transformers> </configuration> </execution> </executions></plugin>
命令行运行代码
bin/flink run -m pi3:8081 -c com.stanleylog.WordCountStreamunboundedDemo ~/learning-flink-1.0-SNAPSHOT.jar
部署模式
- 会话模式(Session Mode)
- 单作业模式(Per-Job Mode) 需要资源管理平台才可支持。
- 应用模式(Application Mode) 通过standalone-job.sh start –job-classname xxx.xxx.xxx启动, 并且上传的Jar必须在lib目录下。 三种模式主要区别的集群的生命周期以及资源的分配方式上。
运行模式
- Standalone 模式, Flink自己管理资源,不依赖外部资源管理平台。没有自动扩展或重新分配资源的保证, 必须手动处理。默认使用会话模式, 可支持应用模式,不支持单作业模式。
- YARN模式, 最为常用的模式。
- K8S模式
Standalone模式
### 启动
$ mv ~/learning-flink-1.0-SNAPSHOT.jar /opt/flink-1.17.1/lib
$ bin/standalone-job.sh start --job-classname com.stanleylog.WordCountStreamunboundedDemo
$ bin/taskmanager.sh start ### taskmanager需要手工启动
### 停止
$ bin/taskmanager.sh stop
$ bin/standalone-job.sh stop
Yarn模式
$ vi /etc/profile
# Hadoop Env
export HADOOP_HOME=/opt/hadoop-2.7.7
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
# Flink Env
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
会话模式(Session)
# 启动
$ bin/yarn-session.sh -d -nm flink-test
# 停止
$ echo "stop" | ./bin/yarn-session.sh -id application_xxx
# 或者
$ yarn application -kill application_xxx
高版本的Flink中,无需去更改Flink配置文件,它会自己进行动态覆盖。
命令行运行代码
$ bin/flink run -d -c com.stanleylog.WordCountStreamunboundedDemo ~/learning-flink-1.0-SNAPSHOT.jar
单作业模式(Pre-Job)
### 启动
$ bin/flink run -d -t yarn-pre-job -c com.stanleylog.WordCountStreamunboundedDemo ~/learning-flink-1.0-SNAPSHOT.jar
### 关闭
# YARN UI界面
# 或者
$ bin/flink list -t yarn-pre-job -Dyarn.application.id=application_xxx #查看JOB_ID
$ bin/flink cancel -t yarn-pre-job -Dyarn.application.id=application_xxx <JOB_ID>
应用模式(Application)重点
### 启动
$ bin/flink run-application -d -t yarn-application -nm test -c com.stanleylog.WordCountStreamunboundedDemo ~/learning-flink-1.0-SNAPSHOT.jar
### 关闭
# YARN UI界面
# 或者
$ bin/flink list -t yarn-application -Dyarn.application.id=application_xxx #查看JOB_ID
$ bin/flink cancel -t yarn-application -Dyarn.application.id=application_xxx <JOB_ID>
使用HDFS文件的模式 —推荐生产环境使用
### 上传Flink类库
$ hadoop fs -mkdir /flink-dist
$ hadoop fs -put /opt/flink-1.17.1/lib/ /flink-dist
$ hadoop fs -put /opt/flink-1.17.1/plugins/ /flink-dist
### 上传Flink程序
$ hadoop fs -mkdir /flink-jars
$ hadoop fs -put ~/learning-flink-1.0-SNAPSHOT.jar /flink-jars
### 启动
$ bin/flink run-application -d -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hadoop1:9000/flink-dist" -c com.stanleylog.WordCountStreamunboundedDemo hdfs://hadoop1:9000/flink-jars/learning-flink-1.0-SNAPSHOT.jar
历史服务器
$ hadoop fs -mkdir -p /logs/flink-job
### 修改flink配置
$ vi conf/flink-conf.yaml
jobmanager.archive.fs.dir: hdfs://hadoop1:9000/logs/flink-job
historyserver.web.address: hadoop2
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://hadoop1:9000/logs/flink-job
historyserver.archive.fs.refresh-interval: 5000
### 启动
bin/historyserver.sh start
### 停止
bin/historyserver.sh stop
可以通过 http://hadoop2:8082 访问
运行时架构
![[Pasted image 20231117110403.png]]
重要组件:
- JobMaster
- 资源管理器(ResourceManager)
- 分发器(Dispatcher)
- 任务管理器(TaskManager)
核心概念
并行度
Flink的并行度代表最大并行度数量,可以通过.setParallelism()进行设置。 设置方式, 执行优先级如下:
- 代码中通过setParallelism设置并行度
- env整体设置并行度
- 命令行提交代码时通过-p参数进行设置并行度
- WebUI 提交页面设置并行度
- Flink配置文件中的默认并行度
算子链
- 一对一(One to One)
- 重分区(Redistributing)
关闭算子链
- 算子链可以通过env.disableOperatorChaining()进行全局禁用,禁用之后所有的算子将不会被合并。
- 某个算子不参与链化:算子A.disableChaining(),算子A不会与前面和后面的算子进行合并。
- 从某个算子开启新链条:算子A.startNewChain(), 算子A不参与前面的合并,从A开始正常链化。
任务槽(Slot)
主要按照内存划分资源,每个slot专门给具体的task使用,不会进行资源的调配。可以通过修改配置参数更改默认的TaskManager分配的Slot数量。
taskmanager.numberOfTaskSlots: 1
CPU资源不会进行资源隔离的,所以建议使用所在主机的CPU核心数划分此参数。 在同一作业中,不同任务节点的并行任务可以放在同一slot上执行。 可以通过.slotSharingGroup设置算子的不用共享组,默认共享组为default.
任务槽和并行度的关系
任务槽是静态的概念,是指TaskManager具有的并发执行能力,可以通过taskmanager.numberOfTaskSlots进行配置; 而并行度是动态的概念,也就是TaskManager运行程序时实际使用的并发能力,可通过参数paralleism.default配置。 slot数量必须>= job并行度
作业提交流程
Standalone会话模式作业提交流程
- 逻辑流图(Stream Graph)
- 作业流图(Job Graph)
- 执行图(Execution Graph) 最为重要
- 物理图(Physical Graph)
Yarn应用模式作业提交流程
DataStream API
五大类API:
- Execution Environment(获取执行环境)
- Source(读取数据源)
- Transformation(转换操作)
- Sink(输出)
- Execution(触发执行)
执行环境(Execution Environment)
创建执行环境
自动获取环境
它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
创建本地环境
这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
创建远程环境
这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
"host", // JobManager主机名
1234, // JobManager进程端口号
"path/to/jarFile.jar" // 提交给JobManager的JAR包
);
执行模式(Execution Mode)
流执行模式(Streaming)
这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。
批执行模式(Batch)
专门用于批处理的执行模式。
自动模式(AutoMatic)
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。 批执行模式的使用。主要有两种方式:
- 通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。
- 通过代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。 实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。
触发程序执行
写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。
env.execute();
源算子(Source)
从集合中读取数据
List<Integer> data = Arrays.asList(1, 22, 3);
DataStreamSource<Integer> ds = env.fromCollection(data);
从文件中读取数据
POM.xml中添加
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
FileSource<String> fileSource = FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("input/word.txt")
).build();
env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource").print();
env.execute();
从Kafka中读取数据
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop3:9092")
.setGroupId("stanley")
.setTopics("topic_1")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.latest())
.build();
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource").print();
env.execute();
从数据生成器中读取数据
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataGeneratorSource<String> datageneratorSource = new DataGeneratorSource<>(
new GeneratorFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "Number: " + value;
}
},
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(3),
Types.STRING
);
env.fromSource(datageneratorSource,WatermarkStrategy.noWatermarks(), "data-generator").print();
env.execute();
转换算子(Transformation)
基本转换算子
映射(map)
用于数据流中的数据进行转换,形成新的数据流。简单的说,就是一个“一一映射”,消费一个元素就产生一个元素。
SingleOutputStreamOperator<String> map = sensorDS.map(new MapFunction<WaterSensor, String>() {
@Override
public String map(WaterSensor value) throws Exception {
return value.getId();
}
});
过滤(Filter)
对数据执行一个过滤,通过布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
SingleOutputStreamOperator<WaterSensor> filter = sensorDS.filter(new FilterFunction<WaterSensor>() {
@Override
public boolean filter(WaterSensor value) throws Exception {
return "s1".equals(value.getId());
}
});
扁平映射(flatMap)
将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素,简单的说就是”一进多出“。flatmap可以认为是“扁平化”flatten和“映射”map两步操作的结合。就是按某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
SingleOutputStreamOperator<String> flatMap = sensorDS.flatMap(new FlatMapFunction<WaterSensor, String>() {
@Override
public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
if ("s1".equals(value.getId())) {
out.collect(value.getVc() + "");
} else if ("s2".equals(value.getId())) {
out.collect(value.getVc() + "");
out.collect(value.getTs() + "");
}
}
});
聚合算子
按键分区(keyBy)
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流逻辑上划分成不同的分区(partition)。这里说的分区,其实就是并行处理的子任务。
基于不同的key,流中的数据将被分到不同的分区中去;这样一来,所有具有相同的key的数据,都被发往同一个分区。
/**
* 1. 返回的是一个KeyedStream 键控流
* 2. keyby不是转换算子,只是对数据进行重分区, 也不能设置并行度
* 3. keyby 分组和分区的关系:
* 1). keyby是对数据分组,保证相同的key的数据 在同一个分区
* 2). 分区:一个子任务,可以理解为一个分区. 一个分区(子任务)可以存在多个分组(key)
**/
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
});
sensorKS.print();
简单聚合(sum/min/max/minBy/maxBy)
/**
* 简单聚合算子
* 1。 keyby之后才能调用
**/
// 传位置索引的,不适用pojo类型,适用于tuple类型。
// SingleOutputStreamOperator<WaterSensor> result= sensorKS.sum(2);
// SingleOutputStreamOperator<WaterSensor> result= sensorKS.sum("vc");
// SingleOutputStreamOperator<WaterSensor> result= sensorKS.min("vc");
// SingleOutputStreamOperator<WaterSensor> result= sensorKS.max("vc");
/**
* maxBy: 会将返回比较字段的最大值,非比较字段也保留最大值记录的值
* max值返回比较字段的最大值,而非比较字段保留第一次值
*
* min/minBy也是如此规则
**/
SingleOutputStreamOperator<WaterSensor> result= sensorKS.maxBy("vc");
归约聚合(reduce)
SingleOutputStreamOperator<WaterSensor> reduce = sensorKS.reduce(new ReduceFunction<WaterSensor>() {
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println("value1: " + value1);
System.out.println("value2: " + value2);
return new WaterSensor(value1.getId(), value1.getTs(), value1.getVc() + value2.getVc());
}
});
reduce.print();
用户自定义函数(UDF)
用户自定义函数分为:函数类、匿名函数、富函数类。
函数类(Function Classes)
public class FilterFunctionImpl implements FilterFunction<WaterSensor> {
public String id;
public FilterFunctionImpl(String id) {
this.id = id;
}
@Override
public boolean filter(WaterSensor value) throws Exception {
return this.id.equals(value.getId());
}
}
匿名类
SingleOutputStreamOperator<String> flatMap = sensorDS.flatMap(new FlatMapFunction<WaterSensor, String>() {
@Override
public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
if ("s1".equals(value.getId())) {
out.collect(value.getVc() + "");
} else if ("s2".equals(value.getId())) {
out.collect(value.getVc() + "");
out.collect(value.getTs() + "");
}
}
});
富函数类(RichFunction Classes)
SingleOutputStreamOperator<Integer> map = source.map(new RichMapFunction<Integer, Integer>() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext runtimeContext = getRuntimeContext();
int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
String taskNameWithSubtasks = runtimeContext.getTaskNameWithSubtasks();
System.out.println(indexOfThisSubtask + ": " + taskNameWithSubtasks + "调用Open...");
}
@Override
public void close() throws Exception {
super.close();
RuntimeContext runtimeContext = getRuntimeContext();
int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
String taskNameWithSubtasks = runtimeContext.getTaskNameWithSubtasks();
System.out.println(indexOfThisSubtask + ": " + taskNameWithSubtasks + "调用Close...");
}
@Override
public Integer map(Integer value) throws Exception {
return value + 1;
}
});
物理分区算子(Physical Partitioning)
常见的物理分区算子包括:随机分配(Random)、轮训分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)
global : 全部发往第一个子任务中
keyby: 按指定key去发送, 相同可以发往同一个子任务
one-to-one: forward分区器
总结:flink提供7种分区器+1中自定义
随机分配(shuffle)
stream.shuffle()
轮询分区(round-robin)
stream.rebalance()
重缩放分区(rescale)
stream.rescale()
广播(broadcast)
stream.broadcast()
全局分区(global)
stream.global()
自定义分区(Custom)
public class MyPartitioner implements Partitioner<String> {
@Override
public int partition(String key, int numPartitions) {
return Integer.parseInt(key) % numPartitions;
}
}
DataStream<String> myDS = socketDS
.partitionCustom( new MyPartitioner(), value -> value);
分流
使用Filter实现分流效果
但是存在缺点,因为数据都会被所有的filter所处理。
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 7777);
/**
* 使用filter实现分流效果 * 缺点:相同的数据会被处理多次
**/
socketDS.filter(value -> Integer.parseInt(value) % 2 == 0).print("偶数流");
socketDS.filter(value -> Integer.parseInt(value) % 2 == 1).print("奇数流");
侧输出流
解决了Filter重复处理的问题
OutputTag<WaterSensor> s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
OutputTag<WaterSensor> s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));
SingleOutputStreamOperator<WaterSensor> process = map.process(new ProcessFunction<WaterSensor, WaterSensor>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
String id = value.getId();
if ("s1".equals(id)) { //放到侧输出流s1中
ctx.output(s1Tag, value);
} else if ("s2".equals(id)) { //放到侧输出流s2中
ctx.output(s2Tag, value);
} else {
out.collect(value);
}
}
});
合流
联合(Union)
union的流之间必须是相同类型
DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
DataStreamSource<Integer> source2 = env.fromElements(4, 5, 6);
DataStreamSource<String> source3 = env.fromElements("7", "8", "9");
source1.union(source2).union(source3.map(value -> Integer.parseInt(value))).print();
连接(Connect)
连接后得到的不是DataStream,连接的流是形式上放在同一个流中, 事实上内部各自保留各自的数据形式不变, 彼此相互独立。一次只能连接两条流。
DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
DataStreamSource<String> source2 = env.fromElements("a", "b", "c");
ConnectedStreams<Integer, String> connect = source1.connect(source2);
SingleOutputStreamOperator<String> map = connect.map(new CoMapFunction<Integer, String, String>() {
@Override
public String map1(Integer value) throws Exception {
return String.valueOf(value);
}
@Override
public String map2(String value) throws Exception {
return value;
}
});
输出算子(Sink)
Flink1.2开始使用stream.sinkTo(…)
FileSink
FileSink<String> fileSink = FileSink.<String>forRowFormat(new Path("output"), new SimpleStringEncoder<>("UTF-8"))
// 输出文件的一些配置:文件前缀、后缀...
.withOutputFileConfig(OutputFileConfig.builder()
.withPartPrefix("filesink")
.withPartSuffix(".log")
.build()
)
// 文件分桶
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
// 文件滚动策略
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofSeconds(10))
.withMaxPartSize(new MemorySize(1024 * 1024))
.build()
)
.build();
KafkaSink
// 必须开启checkpoint, 否则无法写入kafka
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 7777);
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
// 指定kafka的地址和端口
.setBootstrapServers("hadoop3:9092")
// 指定序列化器, 指定topic名称、具体的序列化
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("ws")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
// 写到kafka的一致性级别:精准一次、至少一次
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// 如果是精准一次 必须要设置事务前缀
.setTransactionalIdPrefix("stanley-")
// 如果是精准一次 必须设置事务超时时间 , 要大于checkpoint时间, 小于max 15分钟
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, String.valueOf(10 * 60 * 1000))
.build();
socketDS.sinkTo(kafkaSink);
KafkaSinkWithKey
// 必须开启checkpoint, 否则无法写入kafka
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 7777);
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
// 指定kafka的地址和端口
.setBootstrapServers("hadoop3:9092")
// 指定序列化器, 指定topic名称、具体的序列化
.setRecordSerializer(
new KafkaRecordSerializationSchema<String>() {
@Nullable
@Override public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
String[] datas = element.split(",");
byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
byte[] value = element.getBytes(StandardCharsets.UTF_8);
return new ProducerRecord<>("ws", key, value);
}
}
)
// 写到kafka的一致性级别:精准一次、至少一次
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// 如果是精准一次 必须要设置事务前缀
.setTransactionalIdPrefix("stanley-")
// 如果是精准一次 必须设置事务超时时间 , 要大于checkpoint时间, 小于max 15分钟
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, String.valueOf(10 * 60 * 1000))
.build();
socketDS.sinkTo(kafkaSink);
MySQLSink
/**
* 1. 只能使用老式的addsink写法
* 2. JdbcSink的四个参数
* 1) 执行的SQL
* 2) 预编译SQL
* 3) 执行选项, 攒批,重试
* 4) 连接选项, url, 用户名,密码
**/
SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
"insert into ws values(?, ?, ?)",
new JdbcStatementBuilder<WaterSensor>() {
@Override
public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
preparedStatement.setString(1, waterSensor.getId());
preparedStatement.setLong(2, waterSensor.getTs());
preparedStatement.setInt(3, waterSensor.getVc());
}
},
JdbcExecutionOptions.builder()
.withMaxRetries(3)
.withBatchSize(100)
.withBatchIntervalMs(3000)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
.withUsername("root")
.withPassword("12345")
.withConnectionCheckTimeoutSeconds(60)
.build()
);
sensorDS.addSink(jdbcSink)
自定义Sink
建议优先使用官方提供的Sink,除非没有途径才考虑自定义Sink。
stream.addSink(new MySinkFunction<String>());
Flink中的时间和窗口
窗口
所谓窗口就是划定的一段时间范围,对范围内的数据进行处理,就是所谓的窗口计算。
Flink是一种流式计算引擎,主要用来处理无界数据流,数据源源不断,无穷无尽。想要更加方便高效的处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”。
窗口的分类
按驱动类型划分
- 时间窗口(Time Window)
- 计数窗口(Count Window)
按窗口分配数据的规则分类
- 滚动窗口(Tumbling Window)
滚动窗口有固定的大小, 是一种对数据进行“均匀切片”的划分方式,窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。每个数据都会被分配到一个窗口,而且只会属于一个窗口。
- 滑动窗口(Siding Window)
滑动窗口的大小也是固定的,但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置。
定义滑动窗口的参数有两个,除去窗口大小(Window Size)之外,还有一个滑动步长(Window slide), 它其实代表窗口计算的频率。
滑动窗口会出现“重叠”,数据也可能会被同时分配到多个窗口中。
- 会话窗口(Session Window)
是基于会话来对数据进行分组的,会话窗口只能基于时间来定义。
会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。会话窗口的长度不固定,起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。会话窗口之间一定是不会重叠的。
- 全局窗口(Global Window)
这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时候,默认是不会作为触发计算的
窗口分配器
定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。所以可以说,窗口分配器其实就是在指定窗口的类型。 窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner作为参数,返回WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是AllWindowedStream。 窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。
时间窗口
- 滚动时间窗口(TumblingProcessingTimeWindows/TumblingEventTimeWindows)
sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
// 滚动时间窗口, 窗口长度为10s
- 滑动时间窗口(SlidingProcessingTimeWindows/SlidingEventTimeWindows)
sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));
// 滑动时间窗口, 窗口长度为10s, 滑动步长2s
- 会话时间窗口(ProcessingTimeSessionWindows/EventTimeSessionWindows)
sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
// 会话窗口, 间隔5s
计数窗口
- 滚动窗口(countWindow)
sensorKS.countWindow(5);
// 滚动窗口,窗口长度为5个元素
- 滑动窗口(countWindow)
sensorKS.countWindow(5, 2);
// 滑动窗口, 窗口长度为5个元素,窗口步长为2个元素
- 全局窗口(window)
sensorKS.window(GlobalWindows.create());
// 全局窗口,计数窗口的底层就是使用的这个, 需要自定义触发器,很少使用
窗口函数
增量聚合函数
归约函数(ReduceFunction)
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());
// 窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
//窗口函数
/**
* 窗口的reduce
* 1. 相同key的第一条数据来的时候,不会调用reduce方法
* 2. 增量聚合:来一条数据后, 就会计算一次,但不会输出
* 3. 在窗口触发的时候才会输出整个窗口的最终计算结果 */
SingleOutputStreamOperator<WaterSensor> reduce = sensorWS.reduce(new ReduceFunction<WaterSensor>() {
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println("调用reduce方法, value1=" + value1 + ", value2=" + value2);
return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
}
});
增量聚合函数(AggregateFunction)
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());
// 窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
//窗口函数
/**
* 窗口的Aggregate
* 三个参数:第一个类型输入数来据的类型,第二个类型累加器的类型,存储中间计算结果的类型, 第三个类型输出数据的类型
* 1. 属于本窗口的第一条数据来, 创建窗口,创建累加器
* 2. 增量聚合:来一条计算一条,调研一次add方法
* 3. 窗口输出时调研一次getresult方法
* 4. 输入、中间累加器、输出 类型可以不一样,非常灵活 */
SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {
@Override
public Integer createAccumulator() {
System.out.println("创建累加器");
return 0;
}
// 聚合逻辑
@Override
public Integer add(WaterSensor value, Integer accumulator) {
System.out.println("调研add方法, value=" + value);
return accumulator + value.getVc();
}
// 获取最终结果,窗口触发输出
@Override
public String getResult(Integer accumulator) {
System.out.println("调研getResult方法");
return accumulator.toString();
}
@Override
public Integer merge(Integer a, Integer b) {
System.out.println("调研merge方法");
return null; }
});
全窗口函数(ProcessWindowFunction/ApplyFunction)
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());
// 窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
// sensorWS.apply(new WindowFunction<WaterSensor, String, String, TimeWindow>() {
// /**
// * 老写法
// * @param s 分组key
// * @param window 窗口对象
// * @param input 存的数据
// * @param out 采集器
// * @throws Exception
// */
// @Override
// public void apply(String s, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {
//
// }
// })
SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
/**
* 全窗口函数计算逻辑(新写法):窗口触发时才调用一次, 统一计算窗口所有数据 * @param s 分组key
* @param context 上下文
* @param elements 存的数据
* @param out 采集器
* @throws Exception
*/
@Override
public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
}
});
增量聚合窗口结合(最为全面)
// WindowAggregateAndProcessDemo
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());
// 窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
//窗口函数
SingleOutputStreamOperator<String> result = sensorWS.aggregate(new MyAgg(), new MyProc());
result.print();
env.execute();
public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String> {
@Override
public Integer createAccumulator() {
System.out.println("创建累加器");
return 0;
}
@Override
public Integer add(WaterSensor value, Integer accumulator) {
System.out.println("调研add方法, value=" + value);
return accumulator + value.getVc();
}
@Override
public String getResult(Integer accumulator) {
System.out.println("调研getResult方法");
return accumulator.toString();
}
@Override
public Integer merge(Integer a, Integer b) {
System.out.println("调研merge方法");
return null; }
}
public static class MyProc extends ProcessWindowFunction<String, String, String, TimeWindow> {
@Override
public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
}
}
// WindowReduceAndProcessDemo
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());
// 窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
//窗口函数
/**
* 窗口的reduce
* 1. 相同key的第一条数据来的时候,不会调用reduce方法
* 2. 增量聚合:来一条数据后, 就会计算一次,但不会输出
* 3. 在窗口触发的时候才会输出整个窗口的最终计算结*/
SingleOutputStreamOperator<String> reduce = sensorWS.reduce(new MyReduce(), new MyProc());
reduce.print();
env.execute();
public static class MyReduce implements ReduceFunction<WaterSensor> {
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println("调研reduce方法, value1=" + value1 + ", value2=" + value2);
return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
}
}
public static class MyProc extends ProcessWindowFunction<WaterSensor, String, String, TimeWindow> {
@Override
public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
}
}
其他API
- 触发器 触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。 基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())
- 移除器 移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。
stream.keyBy(...)
.window(...)
.evictor(new MyEvictor())
时间语义
- 事件时间
- 处理时间
水位线
在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。
设置事件时间语义及水位线
- 单调上升水位线策略(有序流)
// 1. 定义watermark策略
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// 升序watermark, 没有等待时间
.<WaterSensor>forMonotonousTimestamps()
// 指定时间分配器,从数据中提取
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
// 返回的时间戳要为毫秒
System.out.println("数据=" + element + ", recordTs=" + recordTimestamp);
return element.getTs() * 1000L;
}
});
// 2. 指定watermark策略
SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
sensorDSwithWatermark.keyBy(value -> value.getId())
// 指定使用事件事件语义窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
}
}).print();
- 乱序水位线策略
// 1. 定义watermark策略
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// 乱序watermark, 等待3s
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
// 指定时间分配器,从数据中提取
.withTimestampAssigner((element, recordTimestamp) -> {
// 返回的时间戳要为毫秒
System.out.println("数据=" + element + ", recordTs=" + recordTimestamp);
return element.getTs() * 1000L;
});
// 2. 指定watermark策略
SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
sensorDSwithWatermark.keyBy(value -> value.getId())
// 指定使用事件事件语义窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
}
}).print();
- 自定义水位线
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// .forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {
// @Override
// public WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {
// return new MyPeriodWatermarkGenerator<>(3000L);
// }
// })
// 指定自定义的生成器, 周期性生成// .<WaterSensor>forGenerator(context -> new MyPeriodWatermarkGenerator<>(3000L))
// 指定自定义的生成器,断点式生成 .<WaterSensor>forGenerator(context -> new MyPuntuatedWatermarkGenerator<>(3000L))
.withTimestampAssigner((element, recordTimestamp) -> {
System.out.println("数据=" + element + ", recordTs=" + recordTimestamp);
return element.getTs() * 1000L;
});
public class MyPeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {
// 用来保存乱序等待时间
private long delayTs;
// 用来保存当前最大的事件时间
private long maxTs;
public MyPeriodWatermarkGenerator(long delayTs) {
this.delayTs = delayTs;
this.maxTs = Long.MIN_VALUE + this.delayTs + 1;
}
/**
* 每条数据来都会被调用一次,用来提取最大事件事件,保存下来 * @param event
* @param eventTimestamp 提取到的数据的 事件时间
* @param output
*/
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTs = Math.max(maxTs, eventTimestamp);
System.out.println("调用了onEvent方法,获取目前为止的最大时间戳=" + maxTs);
}
/**
* 周期性调用:发射 watermark * @param output
*/
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTs - delayTs - 1));
System.out.println("调用了onPeriodicEmit方法,生成watermark=" + (maxTs - delayTs - 1));
}
}
public class MyPuntuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {
// 用来保存乱序等待时间
private long delayTs;
// 用来保存当前最大的事件时间
private long maxTs;
public MyPuntuatedWatermarkGenerator(long delayTs) {
this.delayTs = delayTs;
this.maxTs = Long.MIN_VALUE + this.delayTs + 1;
}
/**
* 每条数据来都会被调用一次,用来提取最大事件事件,保存下来, 并发射watermark * @param event
* @param eventTimestamp 提取到的数据的 事件时间
* @param output
*/
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTs = Math.max(maxTs, eventTimestamp);
output.emitWatermark(new Watermark(maxTs - delayTs - 1));
System.out.println("调用了onEvent方法,获取目前为止的最大时间戳=" + maxTs + ", watermark = " + (maxTs - delayTs - 1));
}
/**
* 周期性调用:不需要 * @param output
*/
@Override
public void onPeriodicEmit(WatermarkOutput output) {
}
}
- 源算子中设置
env.fromSource(
kafkaSource,
// 使用源算子中的水位线策略, 乱序,提取字符串中关键字
WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> {
return Long.valueOf(element.split(",")[1]);
}),
"kafkasource"
).print();
水位线的传递
在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。
延迟数据处理
-
问题1:水位线在多并行度中传递取最小
通过设置延迟等待时间,防止某个分区一直没有数据造成整体水位无法提升的问题
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 自定义分区器,数据%分区数,只输入奇数都知会去往map的一个子任务
SingleOutputStreamOperator<Integer> socketDS = env
.socketTextStream("localhost", 7777)
.partitionCustom(new MyPartitioner(), r -> r) //以自身数据为key,通过奇偶性判定分区
.map(r -> Integer.parseInt(r))
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Integer>forMonotonousTimestamps()
.withTimestampAssigner((r,rs) -> r * 1000L)
.withIdleness(Duration.ofSeconds(5)) //设置空闲等待时间,防止某个分区一直没有数据造成整体水位无法提升的问题
);
// 分成两组,奇数一组,偶数一组, 开10s的事件时间滚动窗口
socketDS.keyBy( r -> r % 2)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {
@Override
public void process(Integer s, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
}
})
.print();
- 问题2:乱序中的迟到数据
- 解决方法1:设置推迟水位推进
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)
- 解决方法2:设置窗口延迟关闭(触发计算后来的迟到数据会再次触发计算)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(2)) // 推迟2s关窗
- 解决方法3:使用侧输出流
OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));
SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(value -> value.getId())
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(2)) // 推迟2s关窗
.sideOutputLateData(lateTag) // 关窗后的迟到数据,放到侧输出流
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
}
});
process.print();
// 从主流获取侧输出流,打印
process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");
基于时间的合流–双向联结(join)
窗口连接(window join)
DataStream<String> join = ds1.join(ds2)
.where(r1 -> r1.f0)
.equalTo(r2 -> r2.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
/**
* 关联上的数据,调用join方法 * @param first ds1的数据
* @param second ds2的数据
* @return
* @throws Exception
*/ @Override
public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
return first + "<--->" + second;
}
});
join.print();
间隔联结(interval join)
只能支持事件时间
KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r -> r.f0);
KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r -> r.f0);
ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2))
处理函数(Process)
-
ProcessFunction
最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。
-
KeyedProcessFunction
对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。
-
ProcessWindowFunction
开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。
-
ProcessAllWindowFunction
同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。
-
CoProcessFunction
合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
-
ProcessJoinFunction
间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。
-
BroadcastProcessFunction
广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。
-
KeyedBroadcastProcessFunction
按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。