基于Flink 1.17版本的学习笔记,包括部署、原理、算子、开发代码等等。

集群部署

集群角色

image-20250305154040135

  • 客户端(Client):代码由客户端获取并转换,之后提交给JobManager。
  • JobManager就是Flink集群里的“管事人”, 对作业进行中央调度管理;而它获取到要执行的作用后,会进一步处理转换,然后分发任务给众多的TaskManager。
  • TaskManager是真正“干活的人”,数据的护理操作都是它们来做的。

Flink支持多种不同的部署场景,还可以和不同的资源管理平台方便的集成。

集群规划

节点服务器 hadoop3 hadoop2 hadoop1
角色 JobManager TaskManager Task Manager Task Manager

修改配置文件

  1. $ vi flink-conf.yaml
  2. jobmanager.rpc.address: pi3
  3. jobmanager.bind-host: 0.0.0.0
  4. taskmanager.bind-host: 0.0.0.0
  5. ### 更为各台主机的主机名
  6. taskmanager.host: pi3
  7. rest.address: pi3
  8. rest.bind-address: 0.0.0.0
  9. $ vi workers
  10. pi3
  11. pi2
  12. pi4
  13. $ vi masters
  14. pi3

分发文件到各主机

  1. $ scp -r /opt/flink-1.17.1 pi2:/opt/flink-1.17.1
  2. $ scp -r /opt/flink-1.17.1 pi4:/opt/flink-1.17.1

启动集群

  1. $ bin/start-cluster.sh
  2. Starting cluster.
  3. Starting standalonesession daemon on host pi3.
  4. Starting taskexecutor daemon on host pi3.
  5. Starting taskexecutor daemon on host pi2.
  6. Starting taskexecutor daemon on host pi4.
问题1:Error: VM option ‘UseG1GC’ is experimental and must be enabled via -XX:+UnlockExperimentalVMOptions.

解决方法:

  1. /opt/flink-1.17.1/bin$ vi taskmanager.sh
  2. # 注释如下内容
  3. if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
  4. export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
  5. fi
  6. scp taskmanager.sh pi2:/opt/flink-1.17.1/bin
  7. scp taskmanager.sh pi3:/opt/flink-1.17.1/bin
  8. scp taskmanager.sh pi4:/opt/flink-1.17.1/bin

手工打包工程

  1. 修改pom.xml, 在build.plugins中添加如下内容:
  2. <!-- Flink官方打包插件 -->
  3. <plugin>
  4. <groupId>org.apache.maven.plugins</groupId>
  5. <artifactId>maven-shade-plugin</artifactId>
  6. <version>3.2.4</version>
  7. <executions> <execution> <phase>package</phase>
  8. <goals> <goal>shade</goal>
  9. </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude>
  10. <exclude>org.slf4j:*</exclude>
  11. <exclude>log4j:*</exclude>
  12. </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder.
  13. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact>
  14. <excludes> <exclude>META-INF/*.SF</exclude>
  15. <exclude>META-INF/*.DSA</exclude>
  16. <exclude>META-INF/*.RSA</exclude>
  17. </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  18. <mainClass>my.programs.main.clazz</mainClass>
  19. </transformer> </transformers> </configuration> </execution> </executions></plugin>

命令行运行代码

  1. bin/flink run -m pi3:8081 -c com.stanleylog.WordCountStreamunboundedDemo ~/learning-flink-1.0-SNAPSHOT.jar

部署模式

  • 会话模式(Session Mode)
  • 单作业模式(Per-Job Mode) 需要资源管理平台才可支持。
  • 应用模式(Application Mode) 通过standalone-job.sh start –job-classname xxx.xxx.xxx启动, 并且上传的Jar必须在lib目录下。 三种模式主要区别的集群的生命周期以及资源的分配方式上。

运行模式

  • Standalone 模式, Flink自己管理资源,不依赖外部资源管理平台。没有自动扩展或重新分配资源的保证, 必须手动处理。默认使用会话模式, 可支持应用模式,不支持单作业模式。
  • YARN模式, 最为常用的模式。
  • K8S模式
Standalone模式
  1. ### 启动
  2. $ mv ~/learning-flink-1.0-SNAPSHOT.jar /opt/flink-1.17.1/lib
  3. $ bin/standalone-job.sh start --job-classname com.stanleylog.WordCountStreamunboundedDemo
  4. $ bin/taskmanager.sh start ### taskmanager需要手工启动
  5. ### 停止
  6. $ bin/taskmanager.sh stop
  7. $ bin/standalone-job.sh stop
Yarn模式
  1. $ vi /etc/profile
  2. # Hadoop Env
  3. export HADOOP_HOME=/opt/hadoop-2.7.7
  4. export PATH=$PATH:$HADOOP_HOME/bin
  5. export PATH=$PATH:$HADOOP_HOME/sbin
  6. # Flink Env
  7. export HADOOP_CLASSPATH=`hadoop classpath`
  8. export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
会话模式(Session)
  1. # 启动
  2. $ bin/yarn-session.sh -d -nm flink-test
  3. # 停止
  4. $ echo "stop" | ./bin/yarn-session.sh -id application_xxx
  5. # 或者
  6. $ yarn application -kill application_xxx

高版本的Flink中,无需去更改Flink配置文件,它会自己进行动态覆盖。

命令行运行代码
  1. $ bin/flink run -d -c com.stanleylog.WordCountStreamunboundedDemo ~/learning-flink-1.0-SNAPSHOT.jar
单作业模式(Pre-Job)
  1. ### 启动
  2. $ bin/flink run -d -t yarn-pre-job -c com.stanleylog.WordCountStreamunboundedDemo ~/learning-flink-1.0-SNAPSHOT.jar
  3. ### 关闭
  4. # YARN UI界面
  5. # 或者
  6. $ bin/flink list -t yarn-pre-job -Dyarn.application.id=application_xxx #查看JOB_ID
  7. $ bin/flink cancel -t yarn-pre-job -Dyarn.application.id=application_xxx <JOB_ID>
应用模式(Application)重点
  1. ### 启动
  2. $ bin/flink run-application -d -t yarn-application -nm test -c com.stanleylog.WordCountStreamunboundedDemo ~/learning-flink-1.0-SNAPSHOT.jar
  3. ### 关闭
  4. # YARN UI界面
  5. # 或者
  6. $ bin/flink list -t yarn-application -Dyarn.application.id=application_xxx #查看JOB_ID
  7. $ bin/flink cancel -t yarn-application -Dyarn.application.id=application_xxx <JOB_ID>

使用HDFS文件的模式 —推荐生产环境使用

  1. ### 上传Flink类库
  2. $ hadoop fs -mkdir /flink-dist
  3. $ hadoop fs -put /opt/flink-1.17.1/lib/ /flink-dist
  4. $ hadoop fs -put /opt/flink-1.17.1/plugins/ /flink-dist
  5. ### 上传Flink程序
  6. $ hadoop fs -mkdir /flink-jars
  7. $ hadoop fs -put ~/learning-flink-1.0-SNAPSHOT.jar /flink-jars
  8. ### 启动
  9. $ bin/flink run-application -d -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hadoop1:9000/flink-dist" -c com.stanleylog.WordCountStreamunboundedDemo hdfs://hadoop1:9000/flink-jars/learning-flink-1.0-SNAPSHOT.jar
历史服务器
  1. $ hadoop fs -mkdir -p /logs/flink-job
  2. ### 修改flink配置
  3. $ vi conf/flink-conf.yaml
  4. jobmanager.archive.fs.dir: hdfs://hadoop1:9000/logs/flink-job
  5. historyserver.web.address: hadoop2
  6. historyserver.web.port: 8082
  7. historyserver.archive.fs.dir: hdfs://hadoop1:9000/logs/flink-job
  8. historyserver.archive.fs.refresh-interval: 5000
  9. ### 启动
  10. bin/historyserver.sh start
  11. ### 停止
  12. bin/historyserver.sh stop

可以通过 http://hadoop2:8082 访问

运行时架构

![[Pasted image 20231117110403.png]]

重要组件:

  • JobMaster
  • 资源管理器(ResourceManager)
  • 分发器(Dispatcher)
  • 任务管理器(TaskManager)

核心概念

并行度

Flink的并行度代表最大并行度数量,可以通过.setParallelism()进行设置。 设置方式, 执行优先级如下:

  • 代码中通过setParallelism设置并行度
  • env整体设置并行度
  • 命令行提交代码时通过-p参数进行设置并行度
  • WebUI 提交页面设置并行度
  • Flink配置文件中的默认并行度

算子链

  1. 一对一(One to One)
  2. 重分区(Redistributing)

关闭算子链

  1. 算子链可以通过env.disableOperatorChaining()进行全局禁用,禁用之后所有的算子将不会被合并。
  2. 某个算子不参与链化:算子A.disableChaining(),算子A不会与前面和后面的算子进行合并。
  3. 从某个算子开启新链条:算子A.startNewChain(), 算子A不参与前面的合并,从A开始正常链化。

任务槽(Slot)

主要按照内存划分资源,每个slot专门给具体的task使用,不会进行资源的调配。可以通过修改配置参数更改默认的TaskManager分配的Slot数量。

  1. taskmanager.numberOfTaskSlots: 1

CPU资源不会进行资源隔离的,所以建议使用所在主机的CPU核心数划分此参数。 在同一作业中,不同任务节点的并行任务可以放在同一slot上执行。 可以通过.slotSharingGroup设置算子的不用共享组,默认共享组为default.

任务槽和并行度的关系

任务槽是静态的概念,是指TaskManager具有的并发执行能力,可以通过taskmanager.numberOfTaskSlots进行配置; 而并行度是动态的概念,也就是TaskManager运行程序时实际使用的并发能力,可通过参数paralleism.default配置。 slot数量必须>= job并行度

作业提交流程

Standalone会话模式作业提交流程

image-20250305154637570

  1. 逻辑流图(Stream Graph)
  2. 作业流图(Job Graph)
  3. 执行图(Execution Graph) 最为重要
  4. 物理图(Physical Graph)

Yarn应用模式作业提交流程

image-20250305165129856

DataStream API

五大类API:

  • Execution Environment(获取执行环境)
  • Source(读取数据源)
  • Transformation(转换操作)
  • Sink(输出)
  • Execution(触发执行)

执行环境(Execution Environment)

创建执行环境

自动获取环境

它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
创建本地环境

这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。

  1. StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
创建远程环境

这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。

  1. StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
  2. .createRemoteEnvironment(
  3. "host", // JobManager主机名
  4. 1234, // JobManager进程端口号
  5. "path/to/jarFile.jar" // 提交给JobManager的JAR包
  6. );

执行模式(Execution Mode)

流执行模式(Streaming)

这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。

批执行模式(Batch)

专门用于批处理的执行模式。

自动模式(AutoMatic)

在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。 批执行模式的使用。主要有两种方式:

  • 通过命令行配置
  1. bin/flink run -Dexecution.runtime-mode=BATCH ...

在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。

  • 通过代码配置
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setRuntimeMode(RuntimeExecutionMode.BATCH);

在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。 实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。

触发程序执行

写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。

  1. env.execute();

源算子(Source)

从集合中读取数据

  1. List<Integer> data = Arrays.asList(1, 22, 3);
  2. DataStreamSource<Integer> ds = env.fromCollection(data);

从文件中读取数据

POM.xml中添加

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-files</artifactId>
  4. <version>${flink.version}</version>
  5. <scope>provided</scope>
  6. </dependency>
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. FileSource<String> fileSource = FileSource.forRecordStreamFormat(
  4. new TextLineInputFormat(),
  5. new Path("input/word.txt")
  6. ).build();
  7. env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource").print();
  8. env.execute();

从Kafka中读取数据

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka</artifactId>
  4. <version>${flink.version}</version>
  5. <scope>provided</scope>
  6. </dependency>
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
  4. .setBootstrapServers("hadoop3:9092")
  5. .setGroupId("stanley")
  6. .setTopics("topic_1")
  7. .setValueOnlyDeserializer(new SimpleStringSchema())
  8. .setStartingOffsets(OffsetsInitializer.latest())
  9. .build();
  10. env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource").print();
  11. env.execute();

从数据生成器中读取数据

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-datagen</artifactId>
  4. <version>${flink.version}</version>
  5. <scope>provided</scope>
  6. </dependency>
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. DataGeneratorSource<String> datageneratorSource = new DataGeneratorSource<>(
  4. new GeneratorFunction<Long, String>() {
  5. @Override
  6. public String map(Long value) throws Exception {
  7. return "Number: " + value;
  8. }
  9. },
  10. Long.MAX_VALUE,
  11. RateLimiterStrategy.perSecond(3),
  12. Types.STRING
  13. );
  14. env.fromSource(datageneratorSource,WatermarkStrategy.noWatermarks(), "data-generator").print();
  15. env.execute();

转换算子(Transformation)

基本转换算子

映射(map)

image-20250305165215532 用于数据流中的数据进行转换,形成新的数据流。简单的说,就是一个“一一映射”,消费一个元素就产生一个元素。

  1. SingleOutputStreamOperator<String> map = sensorDS.map(new MapFunction<WaterSensor, String>() {
  2. @Override
  3. public String map(WaterSensor value) throws Exception {
  4. return value.getId();
  5. }
  6. });
过滤(Filter)

对数据执行一个过滤,通过布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。 image-20250305165236051

  1. SingleOutputStreamOperator<WaterSensor> filter = sensorDS.filter(new FilterFunction<WaterSensor>() {
  2. @Override
  3. public boolean filter(WaterSensor value) throws Exception {
  4. return "s1".equals(value.getId());
  5. }
  6. });
扁平映射(flatMap)

image-20250305165254715 将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素,简单的说就是”一进多出“。flatmap可以认为是“扁平化”flatten和“映射”map两步操作的结合。就是按某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

  1. SingleOutputStreamOperator<String> flatMap = sensorDS.flatMap(new FlatMapFunction<WaterSensor, String>() {
  2. @Override
  3. public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
  4. if ("s1".equals(value.getId())) {
  5. out.collect(value.getVc() + "");
  6. } else if ("s2".equals(value.getId())) {
  7. out.collect(value.getVc() + "");
  8. out.collect(value.getTs() + "");
  9. }
  10. }
  11. });

聚合算子

按键分区(keyBy)

keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流逻辑上划分成不同的分区(partition)。这里说的分区,其实就是并行处理的子任务。 基于不同的key,流中的数据将被分到不同的分区中去;这样一来,所有具有相同的key的数据,都被发往同一个分区。 image-20250305165315960

  1. /**
  2. * 1. 返回的是一个KeyedStream 键控流
  3. * 2. keyby不是转换算子,只是对数据进行重分区, 也不能设置并行度
  4. * 3. keyby 分组和分区的关系:
  5. * 1). keyby是对数据分组,保证相同的key的数据 在同一个分区
  6. * 2). 分区:一个子任务,可以理解为一个分区. 一个分区(子任务)可以存在多个分组(key)
  7. **/
  8. KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(new KeySelector<WaterSensor, String>() {
  9. @Override
  10. public String getKey(WaterSensor value) throws Exception {
  11. return value.getId();
  12. }
  13. });
  14. sensorKS.print();
简单聚合(sum/min/max/minBy/maxBy)
  1. /**
  2. * 简单聚合算子
  3. * 1。 keyby之后才能调用
  4. **/
  5. // 传位置索引的,不适用pojo类型,适用于tuple类型。
  6. // SingleOutputStreamOperator<WaterSensor> result= sensorKS.sum(2);
  7. // SingleOutputStreamOperator<WaterSensor> result= sensorKS.sum("vc");
  8. // SingleOutputStreamOperator<WaterSensor> result= sensorKS.min("vc");
  9. // SingleOutputStreamOperator<WaterSensor> result= sensorKS.max("vc");
  10. /**
  11. * maxBy: 会将返回比较字段的最大值,非比较字段也保留最大值记录的值
  12. * max值返回比较字段的最大值,而非比较字段保留第一次值
  13. *
  14. * min/minBy也是如此规则
  15. **/
  16. SingleOutputStreamOperator<WaterSensor> result= sensorKS.maxBy("vc");
归约聚合(reduce)
  1. SingleOutputStreamOperator<WaterSensor> reduce = sensorKS.reduce(new ReduceFunction<WaterSensor>() {
  2. @Override
  3. public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
  4. System.out.println("value1: " + value1);
  5. System.out.println("value2: " + value2);
  6. return new WaterSensor(value1.getId(), value1.getTs(), value1.getVc() + value2.getVc());
  7. }
  8. });
  9. reduce.print();

用户自定义函数(UDF)

用户自定义函数分为:函数类、匿名函数、富函数类。

函数类(Function Classes)
  1. public class FilterFunctionImpl implements FilterFunction<WaterSensor> {
  2. public String id;
  3. public FilterFunctionImpl(String id) {
  4. this.id = id;
  5. }
  6. @Override
  7. public boolean filter(WaterSensor value) throws Exception {
  8. return this.id.equals(value.getId());
  9. }
  10. }
匿名类
  1. SingleOutputStreamOperator<String> flatMap = sensorDS.flatMap(new FlatMapFunction<WaterSensor, String>() {
  2. @Override
  3. public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
  4. if ("s1".equals(value.getId())) {
  5. out.collect(value.getVc() + "");
  6. } else if ("s2".equals(value.getId())) {
  7. out.collect(value.getVc() + "");
  8. out.collect(value.getTs() + "");
  9. }
  10. }
  11. });
富函数类(RichFunction Classes)
  1. SingleOutputStreamOperator<Integer> map = source.map(new RichMapFunction<Integer, Integer>() {
  2. @Override
  3. public void open(Configuration parameters) throws Exception {
  4. super.open(parameters);
  5. RuntimeContext runtimeContext = getRuntimeContext();
  6. int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
  7. String taskNameWithSubtasks = runtimeContext.getTaskNameWithSubtasks();
  8. System.out.println(indexOfThisSubtask + ": " + taskNameWithSubtasks + "调用Open...");
  9. }
  10. @Override
  11. public void close() throws Exception {
  12. super.close();
  13. RuntimeContext runtimeContext = getRuntimeContext();
  14. int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
  15. String taskNameWithSubtasks = runtimeContext.getTaskNameWithSubtasks();
  16. System.out.println(indexOfThisSubtask + ": " + taskNameWithSubtasks + "调用Close...");
  17. }
  18. @Override
  19. public Integer map(Integer value) throws Exception {
  20. return value + 1;
  21. }
  22. });

物理分区算子(Physical Partitioning)

常见的物理分区算子包括:随机分配(Random)、轮训分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)

global : 全部发往第一个子任务中
keyby: 按指定key去发送, 相同可以发往同一个子任务
one-to-one: forward分区器
总结:flink提供7种分区器+1中自定义

随机分配(shuffle)

image-20250305165346940

  1. stream.shuffle()
轮询分区(round-robin)

image-20250305165406779

  1. stream.rebalance()
重缩放分区(rescale)

image-20250305171006662

  1. stream.rescale()
广播(broadcast)
  1. stream.broadcast()
全局分区(global)
  1. stream.global()
自定义分区(Custom)
  1. public class MyPartitioner implements Partitioner<String> {
  2. @Override
  3. public int partition(String key, int numPartitions) {
  4. return Integer.parseInt(key) % numPartitions;
  5. }
  6. }
  7. DataStream<String> myDS = socketDS
  8. .partitionCustom( new MyPartitioner(), value -> value);

分流

image-20250305171028002

使用Filter实现分流效果

但是存在缺点,因为数据都会被所有的filter所处理。

  1. DataStreamSource<String> socketDS = env.socketTextStream("localhost", 7777);
  2. /**
  3. * 使用filter实现分流效果 * 缺点:相同的数据会被处理多次
  4. **/
  5. socketDS.filter(value -> Integer.parseInt(value) % 2 == 0).print("偶数流");
  6. socketDS.filter(value -> Integer.parseInt(value) % 2 == 1).print("奇数流");
侧输出流

解决了Filter重复处理的问题

  1. OutputTag<WaterSensor> s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
  2. OutputTag<WaterSensor> s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));
  3. SingleOutputStreamOperator<WaterSensor> process = map.process(new ProcessFunction<WaterSensor, WaterSensor>() {
  4. @Override
  5. public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
  6. String id = value.getId();
  7. if ("s1".equals(id)) { //放到侧输出流s1中
  8. ctx.output(s1Tag, value);
  9. } else if ("s2".equals(id)) { //放到侧输出流s2中
  10. ctx.output(s2Tag, value);
  11. } else {
  12. out.collect(value);
  13. }
  14. }
  15. });

合流

联合(Union)

union的流之间必须是相同类型 image-20250305171048186

  1. DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
  2. DataStreamSource<Integer> source2 = env.fromElements(4, 5, 6);
  3. DataStreamSource<String> source3 = env.fromElements("7", "8", "9");
  4. source1.union(source2).union(source3.map(value -> Integer.parseInt(value))).print();
连接(Connect)

连接后得到的不是DataStream,连接的流是形式上放在同一个流中, 事实上内部各自保留各自的数据形式不变, 彼此相互独立。一次只能连接两条流。 image-20250305171108382

  1. DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
  2. DataStreamSource<String> source2 = env.fromElements("a", "b", "c");
  3. ConnectedStreams<Integer, String> connect = source1.connect(source2);
  4. SingleOutputStreamOperator<String> map = connect.map(new CoMapFunction<Integer, String, String>() {
  5. @Override
  6. public String map1(Integer value) throws Exception {
  7. return String.valueOf(value);
  8. }
  9. @Override
  10. public String map2(String value) throws Exception {
  11. return value;
  12. }
  13. });

输出算子(Sink)

Flink1.2开始使用stream.sinkTo(…) image-20250305171129988 image-20250305171148038

FileSink

  1. FileSink<String> fileSink = FileSink.<String>forRowFormat(new Path("output"), new SimpleStringEncoder<>("UTF-8"))
  2. // 输出文件的一些配置:文件前缀、后缀...
  3. .withOutputFileConfig(OutputFileConfig.builder()
  4. .withPartPrefix("filesink")
  5. .withPartSuffix(".log")
  6. .build()
  7. )
  8. // 文件分桶
  9. .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
  10. // 文件滚动策略
  11. .withRollingPolicy(
  12. DefaultRollingPolicy.builder()
  13. .withRolloverInterval(Duration.ofSeconds(10))
  14. .withMaxPartSize(new MemorySize(1024 * 1024))
  15. .build()
  16. )
  17. .build();

KafkaSink

  1. // 必须开启checkpoint, 否则无法写入kafka
  2. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
  3. DataStreamSource<String> socketDS = env.socketTextStream("localhost", 7777);
  4. KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
  5. // 指定kafka的地址和端口
  6. .setBootstrapServers("hadoop3:9092")
  7. // 指定序列化器, 指定topic名称、具体的序列化
  8. .setRecordSerializer(
  9. KafkaRecordSerializationSchema.<String>builder()
  10. .setTopic("ws")
  11. .setValueSerializationSchema(new SimpleStringSchema())
  12. .build())
  13. // 写到kafka的一致性级别:精准一次、至少一次
  14. .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  15. // 如果是精准一次 必须要设置事务前缀
  16. .setTransactionalIdPrefix("stanley-")
  17. // 如果是精准一次 必须设置事务超时时间 , 要大于checkpoint时间, 小于max 15分钟
  18. .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, String.valueOf(10 * 60 * 1000))
  19. .build();
  20. socketDS.sinkTo(kafkaSink);

KafkaSinkWithKey

  1. // 必须开启checkpoint, 否则无法写入kafka
  2. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
  3. DataStreamSource<String> socketDS = env.socketTextStream("localhost", 7777);
  4. KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
  5. // 指定kafka的地址和端口
  6. .setBootstrapServers("hadoop3:9092")
  7. // 指定序列化器, 指定topic名称、具体的序列化
  8. .setRecordSerializer(
  9. new KafkaRecordSerializationSchema<String>() {
  10. @Nullable
  11. @Override public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
  12. String[] datas = element.split(",");
  13. byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
  14. byte[] value = element.getBytes(StandardCharsets.UTF_8);
  15. return new ProducerRecord<>("ws", key, value);
  16. }
  17. }
  18. )
  19. // 写到kafka的一致性级别:精准一次、至少一次
  20. .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  21. // 如果是精准一次 必须要设置事务前缀
  22. .setTransactionalIdPrefix("stanley-")
  23. // 如果是精准一次 必须设置事务超时时间 , 要大于checkpoint时间, 小于max 15分钟
  24. .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, String.valueOf(10 * 60 * 1000))
  25. .build();
  26. socketDS.sinkTo(kafkaSink);

MySQLSink

  1. /**
  2. * 1. 只能使用老式的addsink写法
  3. * 2. JdbcSink的四个参数
  4. * 1) 执行的SQL
  5. * 2) 预编译SQL
  6. * 3) 执行选项, 攒批,重试
  7. * 4) 连接选项, url, 用户名,密码
  8. **/
  9. SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
  10. "insert into ws values(?, ?, ?)",
  11. new JdbcStatementBuilder<WaterSensor>() {
  12. @Override
  13. public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
  14. preparedStatement.setString(1, waterSensor.getId());
  15. preparedStatement.setLong(2, waterSensor.getTs());
  16. preparedStatement.setInt(3, waterSensor.getVc());
  17. }
  18. },
  19. JdbcExecutionOptions.builder()
  20. .withMaxRetries(3)
  21. .withBatchSize(100)
  22. .withBatchIntervalMs(3000)
  23. .build(),
  24. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  25. .withUrl("jdbc:mysql://localhost:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
  26. .withUsername("root")
  27. .withPassword("12345")
  28. .withConnectionCheckTimeoutSeconds(60)
  29. .build()
  30. );
  31. sensorDS.addSink(jdbcSink)

自定义Sink

建议优先使用官方提供的Sink,除非没有途径才考虑自定义Sink。

  1. stream.addSink(new MySinkFunction<String>());

Flink中的时间和窗口

窗口

所谓窗口就是划定的一段时间范围,对范围内的数据进行处理,就是所谓的窗口计算。 Flink是一种流式计算引擎,主要用来处理无界数据流,数据源源不断,无穷无尽。想要更加方便高效的处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”。 image-20250305171218556

窗口的分类

按驱动类型划分
  • 时间窗口(Time Window)
  • 计数窗口(Count Window)
按窗口分配数据的规则分类
  • 滚动窗口(Tumbling Window)

image-20250305171246738

  1. 滚动窗口有固定的大小, 是一种对数据进行“均匀切片”的划分方式,窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。每个数据都会被分配到一个窗口,而且只会属于一个窗口。
  • 滑动窗口(Siding Window)

image-20250305171309348

  1. 滑动窗口的大小也是固定的,但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置。
  2. 定义滑动窗口的参数有两个,除去窗口大小(Window Size)之外,还有一个滑动步长(Window slide), 它其实代表窗口计算的频率。
  3. 滑动窗口会出现“重叠”,数据也可能会被同时分配到多个窗口中。
  • 会话窗口(Session Window)

image-20250305171331510

  1. 是基于会话来对数据进行分组的,会话窗口只能基于时间来定义。
  2. 会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。会话窗口的长度不固定,起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。会话窗口之间一定是不会重叠的。
  • 全局窗口(Global Window)

image-20250305171349144

  1. 这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时候,默认是不会作为触发计算的

窗口分配器

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。所以可以说,窗口分配器其实就是在指定窗口的类型。 窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner作为参数,返回WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是AllWindowedStream。 窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

时间窗口

  • 滚动时间窗口(TumblingProcessingTimeWindows/TumblingEventTimeWindows)
  1. sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
  2. // 滚动时间窗口, 窗口长度为10s
  • 滑动时间窗口(SlidingProcessingTimeWindows/SlidingEventTimeWindows)
  1. sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));
  2. // 滑动时间窗口, 窗口长度为10s, 滑动步长2s
  • 会话时间窗口(ProcessingTimeSessionWindows/EventTimeSessionWindows)
  1. sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
  2. // 会话窗口, 间隔5s

计数窗口

  • 滚动窗口(countWindow)
  1. sensorKS.countWindow(5);
  2. // 滚动窗口,窗口长度为5个元素
  • 滑动窗口(countWindow)
  1. sensorKS.countWindow(5, 2);
  2. // 滑动窗口, 窗口长度为5个元素,窗口步长为2个元素
  • 全局窗口(window)
  1. sensorKS.window(GlobalWindows.create());
  2. // 全局窗口,计数窗口的底层就是使用的这个, 需要自定义触发器,很少使用

窗口函数

image-20250305171619199

增量聚合函数
归约函数(ReduceFunction)
  1. KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());
  2. // 窗口分配器
  3. WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
  4. //窗口函数
  5. /**
  6. * 窗口的reduce
  7. * 1. 相同key的第一条数据来的时候,不会调用reduce方法
  8. * 2. 增量聚合:来一条数据后, 就会计算一次,但不会输出
  9. * 3. 在窗口触发的时候才会输出整个窗口的最终计算结果 */
  10. SingleOutputStreamOperator<WaterSensor> reduce = sensorWS.reduce(new ReduceFunction<WaterSensor>() {
  11. @Override
  12. public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
  13. System.out.println("调用reduce方法, value1=" + value1 + ", value2=" + value2);
  14. return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
  15. }
  16. });
增量聚合函数(AggregateFunction)
  1. KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());
  2. // 窗口分配器
  3. WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
  4. //窗口函数
  5. /**
  6. * 窗口的Aggregate
  7. * 三个参数:第一个类型输入数来据的类型,第二个类型累加器的类型,存储中间计算结果的类型, 第三个类型输出数据的类型
  8. * 1. 属于本窗口的第一条数据来, 创建窗口,创建累加器
  9. * 2. 增量聚合:来一条计算一条,调研一次add方法
  10. * 3. 窗口输出时调研一次getresult方法
  11. * 4. 输入、中间累加器、输出 类型可以不一样,非常灵活 */
  12. SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {
  13. @Override
  14. public Integer createAccumulator() {
  15. System.out.println("创建累加器");
  16. return 0;
  17. }
  18. // 聚合逻辑
  19. @Override
  20. public Integer add(WaterSensor value, Integer accumulator) {
  21. System.out.println("调研add方法, value=" + value);
  22. return accumulator + value.getVc();
  23. }
  24. // 获取最终结果,窗口触发输出
  25. @Override
  26. public String getResult(Integer accumulator) {
  27. System.out.println("调研getResult方法");
  28. return accumulator.toString();
  29. }
  30. @Override
  31. public Integer merge(Integer a, Integer b) {
  32. System.out.println("调研merge方法");
  33. return null; }
  34. });
全窗口函数(ProcessWindowFunction/ApplyFunction)
  1. KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());
  2. // 窗口分配器
  3. WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
  4. // sensorWS.apply(new WindowFunction<WaterSensor, String, String, TimeWindow>() {
  5. // /**
  6. // * 老写法
  7. // * @param s 分组key
  8. // * @param window 窗口对象
  9. // * @param input 存的数据
  10. // * @param out 采集器
  11. // * @throws Exception
  12. // */
  13. // @Override
  14. // public void apply(String s, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {
  15. //
  16. // }
  17. // })
  18. SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
  19. /**
  20. * 全窗口函数计算逻辑(新写法):窗口触发时才调用一次, 统一计算窗口所有数据 * @param s 分组key
  21. * @param context 上下文
  22. * @param elements 存的数据
  23. * @param out 采集器
  24. * @throws Exception
  25. */
  26. @Override
  27. public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  28. long start = context.window().getStart();
  29. long end = context.window().getEnd();
  30. String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
  31. String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
  32. long count = elements.spliterator().estimateSize();
  33. out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
  34. }
  35. });
增量聚合窗口结合(最为全面)
  1. // WindowAggregateAndProcessDemo
  2. KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());
  3. // 窗口分配器
  4. WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
  5. //窗口函数
  6. SingleOutputStreamOperator<String> result = sensorWS.aggregate(new MyAgg(), new MyProc());
  7. result.print();
  8. env.execute();
  9. public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String> {
  10. @Override
  11. public Integer createAccumulator() {
  12. System.out.println("创建累加器");
  13. return 0;
  14. }
  15. @Override
  16. public Integer add(WaterSensor value, Integer accumulator) {
  17. System.out.println("调研add方法, value=" + value);
  18. return accumulator + value.getVc();
  19. }
  20. @Override
  21. public String getResult(Integer accumulator) {
  22. System.out.println("调研getResult方法");
  23. return accumulator.toString();
  24. }
  25. @Override
  26. public Integer merge(Integer a, Integer b) {
  27. System.out.println("调研merge方法");
  28. return null; }
  29. }
  30. public static class MyProc extends ProcessWindowFunction<String, String, String, TimeWindow> {
  31. @Override
  32. public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
  33. long start = context.window().getStart();
  34. long end = context.window().getEnd();
  35. String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
  36. String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
  37. long count = elements.spliterator().estimateSize();
  38. out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
  39. }
  40. }
  41. // WindowReduceAndProcessDemo
  42. KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());
  43. // 窗口分配器
  44. WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
  45. //窗口函数
  46. /**
  47. * 窗口的reduce
  48. * 1. 相同key的第一条数据来的时候,不会调用reduce方法
  49. * 2. 增量聚合:来一条数据后, 就会计算一次,但不会输出
  50. * 3. 在窗口触发的时候才会输出整个窗口的最终计算结*/
  51. SingleOutputStreamOperator<String> reduce = sensorWS.reduce(new MyReduce(), new MyProc());
  52. reduce.print();
  53. env.execute();
  54. public static class MyReduce implements ReduceFunction<WaterSensor> {
  55. @Override
  56. public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
  57. System.out.println("调研reduce方法, value1=" + value1 + ", value2=" + value2);
  58. return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
  59. }
  60. }
  61. public static class MyProc extends ProcessWindowFunction<WaterSensor, String, String, TimeWindow> {
  62. @Override
  63. public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  64. long start = context.window().getStart();
  65. long end = context.window().getEnd();
  66. String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
  67. String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
  68. long count = elements.spliterator().estimateSize();
  69. out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
  70. }
  71. }
其他API
  • 触发器 触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。 基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
  1. stream.keyBy(...)
  2. .window(...)
  3. .trigger(new MyTrigger())
  • 移除器 移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。
  1. stream.keyBy(...)
  2. .window(...)
  3. .evictor(new MyEvictor())

时间语义

image-20250305171702406

  • 事件时间
  • 处理时间 image-20250305171719793

水位线

image-20250305171736540 在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。

设置事件时间语义及水位线
  • 单调上升水位线策略(有序流)
  1. // 1. 定义watermark策略
  2. WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
  3. // 升序watermark, 没有等待时间
  4. .<WaterSensor>forMonotonousTimestamps()
  5. // 指定时间分配器,从数据中提取
  6. .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
  7. @Override
  8. public long extractTimestamp(WaterSensor element, long recordTimestamp) {
  9. // 返回的时间戳要为毫秒
  10. System.out.println("数据=" + element + ", recordTs=" + recordTimestamp);
  11. return element.getTs() * 1000L;
  12. }
  13. });
  14. // 2. 指定watermark策略
  15. SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
  16. sensorDSwithWatermark.keyBy(value -> value.getId())
  17. // 指定使用事件事件语义窗口
  18. .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  19. .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
  20. @Override
  21. public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  22. long start = context.window().getStart();
  23. long end = context.window().getEnd();
  24. String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
  25. String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
  26. long count = elements.spliterator().estimateSize();
  27. out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
  28. }
  29. }).print();
  • 乱序水位线策略
  1. // 1. 定义watermark策略
  2. WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
  3. // 乱序watermark, 等待3s
  4. .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  5. // 指定时间分配器,从数据中提取
  6. .withTimestampAssigner((element, recordTimestamp) -> {
  7. // 返回的时间戳要为毫秒
  8. System.out.println("数据=" + element + ", recordTs=" + recordTimestamp);
  9. return element.getTs() * 1000L;
  10. });
  11. // 2. 指定watermark策略
  12. SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
  13. sensorDSwithWatermark.keyBy(value -> value.getId())
  14. // 指定使用事件事件语义窗口
  15. .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  16. .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
  17. @Override
  18. public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  19. long start = context.window().getStart();
  20. long end = context.window().getEnd();
  21. String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
  22. String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
  23. long count = elements.spliterator().estimateSize();
  24. out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
  25. }
  26. }).print();
  • 自定义水位线
  1. WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
  2. // .forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {
  3. // @Override
  4. // public WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {
  5. // return new MyPeriodWatermarkGenerator<>(3000L);
  6. // }
  7. // })
  8. // 指定自定义的生成器, 周期性生成// .<WaterSensor>forGenerator(context -> new MyPeriodWatermarkGenerator<>(3000L))
  9. // 指定自定义的生成器,断点式生成 .<WaterSensor>forGenerator(context -> new MyPuntuatedWatermarkGenerator<>(3000L))
  10. .withTimestampAssigner((element, recordTimestamp) -> {
  11. System.out.println("数据=" + element + ", recordTs=" + recordTimestamp);
  12. return element.getTs() * 1000L;
  13. });
  14. public class MyPeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {
  15. // 用来保存乱序等待时间
  16. private long delayTs;
  17. // 用来保存当前最大的事件时间
  18. private long maxTs;
  19. public MyPeriodWatermarkGenerator(long delayTs) {
  20. this.delayTs = delayTs;
  21. this.maxTs = Long.MIN_VALUE + this.delayTs + 1;
  22. }
  23. /**
  24. * 每条数据来都会被调用一次,用来提取最大事件事件,保存下来 * @param event
  25. * @param eventTimestamp 提取到的数据的 事件时间
  26. * @param output
  27. */
  28. @Override
  29. public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
  30. maxTs = Math.max(maxTs, eventTimestamp);
  31. System.out.println("调用了onEvent方法,获取目前为止的最大时间戳=" + maxTs);
  32. }
  33. /**
  34. * 周期性调用:发射 watermark * @param output
  35. */
  36. @Override
  37. public void onPeriodicEmit(WatermarkOutput output) {
  38. output.emitWatermark(new Watermark(maxTs - delayTs - 1));
  39. System.out.println("调用了onPeriodicEmit方法,生成watermark=" + (maxTs - delayTs - 1));
  40. }
  41. }
  42. public class MyPuntuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {
  43. // 用来保存乱序等待时间
  44. private long delayTs;
  45. // 用来保存当前最大的事件时间
  46. private long maxTs;
  47. public MyPuntuatedWatermarkGenerator(long delayTs) {
  48. this.delayTs = delayTs;
  49. this.maxTs = Long.MIN_VALUE + this.delayTs + 1;
  50. }
  51. /**
  52. * 每条数据来都会被调用一次,用来提取最大事件事件,保存下来, 并发射watermark * @param event
  53. * @param eventTimestamp 提取到的数据的 事件时间
  54. * @param output
  55. */
  56. @Override
  57. public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
  58. maxTs = Math.max(maxTs, eventTimestamp);
  59. output.emitWatermark(new Watermark(maxTs - delayTs - 1));
  60. System.out.println("调用了onEvent方法,获取目前为止的最大时间戳=" + maxTs + ", watermark = " + (maxTs - delayTs - 1));
  61. }
  62. /**
  63. * 周期性调用:不需要 * @param output
  64. */
  65. @Override
  66. public void onPeriodicEmit(WatermarkOutput output) {
  67. }
  68. }
  • 源算子中设置
  1. env.fromSource(
  2. kafkaSource,
  3. // 使用源算子中的水位线策略, 乱序,提取字符串中关键字
  4. WatermarkStrategy
  5. .<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  6. .withTimestampAssigner((element, recordTimestamp) -> {
  7. return Long.valueOf(element.split(",")[1]);
  8. }),
  9. "kafkasource"
  10. ).print();
水位线的传递

image-20250305171842090 在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。 水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。

延迟数据处理
  • 问题1:水位线在多并行度中传递取最小

    通过设置延迟等待时间,防止某个分区一直没有数据造成整体水位无法提升的问题

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(2);
  3. // 自定义分区器,数据%分区数,只输入奇数都知会去往map的一个子任务
  4. SingleOutputStreamOperator<Integer> socketDS = env
  5. .socketTextStream("localhost", 7777)
  6. .partitionCustom(new MyPartitioner(), r -> r) //以自身数据为key,通过奇偶性判定分区
  7. .map(r -> Integer.parseInt(r))
  8. .assignTimestampsAndWatermarks(WatermarkStrategy
  9. .<Integer>forMonotonousTimestamps()
  10. .withTimestampAssigner((r,rs) -> r * 1000L)
  11. .withIdleness(Duration.ofSeconds(5)) //设置空闲等待时间,防止某个分区一直没有数据造成整体水位无法提升的问题
  12. );
  13. // 分成两组,奇数一组,偶数一组, 开10s的事件时间滚动窗口
  14. socketDS.keyBy( r -> r % 2)
  15. .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  16. .process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {
  17. @Override
  18. public void process(Integer s, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
  19. long start = context.window().getStart();
  20. long end = context.window().getEnd();
  21. String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
  22. String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
  23. long count = elements.spliterator().estimateSize();
  24. out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
  25. }
  26. })
  27. .print();
  • 问题2:乱序中的迟到数据
    • 解决方法1:设置推迟水位推进
  1. WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)
  • 解决方法2:设置窗口延迟关闭(触发计算后来的迟到数据会再次触发计算)
  1. .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  2. .allowedLateness(Time.seconds(2)) // 推迟2s关窗
  • 解决方法3:使用侧输出流
  1. OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));
  2. SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(value -> value.getId())
  3. .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  4. .allowedLateness(Time.seconds(2)) // 推迟2s关窗
  5. .sideOutputLateData(lateTag) // 关窗后的迟到数据,放到侧输出流
  6. .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
  7. @Override
  8. public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  9. long start = context.window().getStart();
  10. long end = context.window().getEnd();
  11. String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
  12. String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
  13. long count = elements.spliterator().estimateSize();
  14. out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);
  15. }
  16. });
  17. process.print();
  18. // 从主流获取侧输出流,打印
  19. process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");

基于时间的合流–双向联结(join)

窗口连接(window join)

  1. DataStream<String> join = ds1.join(ds2)
  2. .where(r1 -> r1.f0)
  3. .equalTo(r2 -> r2.f0)
  4. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  5. .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
  6. /**
  7. * 关联上的数据,调用join方法 * @param first ds1的数据
  8. * @param second ds2的数据
  9. * @return
  10. * @throws Exception
  11. */ @Override
  12. public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
  13. return first + "<--->" + second;
  14. }
  15. });
  16. join.print();

间隔联结(interval join)

只能支持事件时间 image-20250305172028549

  1. KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r -> r.f0);
  2. KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r -> r.f0);
  3. ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2))

处理函数(Process)

  • ProcessFunction

    最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。

  • KeyedProcessFunction

    对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。

  • ProcessWindowFunction

    开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。

  • ProcessAllWindowFunction

    同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。

  • CoProcessFunction

    合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。

  • ProcessJoinFunction

    间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。

  • BroadcastProcessFunction

    广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。

  • KeyedBroadcastProcessFunction

    按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。