基于Flink 1.17版本的学习笔记,包括部署、原理、算子、开发代码等等。

集群部署

集群角色

image-20250305154040135

  • 客户端(Client):代码由客户端获取并转换,之后提交给JobManager。
  • JobManager就是Flink集群里的“管事人”, 对作业进行中央调度管理;而它获取到要执行的作用后,会进一步处理转换,然后分发任务给众多的TaskManager。
  • TaskManager是真正“干活的人”,数据的护理操作都是它们来做的。

Flink支持多种不同的部署场景,还可以和不同的资源管理平台方便的集成。

集群规划

节点服务器 hadoop3 hadoop2 hadoop1
角色 JobManager TaskManager Task Manager Task Manager

修改配置文件

$ vi flink-conf.yaml

jobmanager.rpc.address: pi3
jobmanager.bind-host: 0.0.0.0

taskmanager.bind-host: 0.0.0.0
### 更为各台主机的主机名
taskmanager.host: pi3

rest.address: pi3
rest.bind-address: 0.0.0.0

$ vi workers
pi3
pi2
pi4

$ vi masters
pi3

分发文件到各主机

$ scp -r /opt/flink-1.17.1 pi2:/opt/flink-1.17.1
$ scp -r /opt/flink-1.17.1 pi4:/opt/flink-1.17.1

启动集群

$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host pi3.
Starting taskexecutor daemon on host pi3.
Starting taskexecutor daemon on host pi2.
Starting taskexecutor daemon on host pi4.
问题1:Error: VM option ‘UseG1GC’ is experimental and must be enabled via -XX:+UnlockExperimentalVMOptions.

解决方法:

/opt/flink-1.17.1/bin$ vi taskmanager.sh
# 注释如下内容
if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
    export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
fi

scp taskmanager.sh pi2:/opt/flink-1.17.1/bin
scp taskmanager.sh pi3:/opt/flink-1.17.1/bin
scp taskmanager.sh pi4:/opt/flink-1.17.1/bin

手工打包工程

修改pom.xml, 在build.plugins中添加如下内容:

<!-- Flink官方打包插件 -->  
<plugin>  
    <groupId>org.apache.maven.plugins</groupId>  
    <artifactId>maven-shade-plugin</artifactId>  
    <version>3.2.4</version>  
    <executions>        <execution>            <phase>package</phase>  
            <goals>                <goal>shade</goal>  
            </goals>            <configuration>                <artifactSet>                    <excludes>                        <exclude>com.google.code.findbugs:jsr305</exclude>  
                        <exclude>org.slf4j:*</exclude>  
                        <exclude>log4j:*</exclude>  
                    </excludes>                </artifactSet>                <filters>                    <filter>                        <!-- Do not copy the signatures in the META-INF folder.  
                        Otherwise, this might cause SecurityExceptions when using the JAR. -->                        <artifact>*:*</artifact>  
                        <excludes>                            <exclude>META-INF/*.SF</exclude>  
                            <exclude>META-INF/*.DSA</exclude>  
                            <exclude>META-INF/*.RSA</exclude>  
                        </excludes>                    </filter>                </filters>                <transformers>                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">  
                        <mainClass>my.programs.main.clazz</mainClass>  
                    </transformer>                </transformers>            </configuration>        </execution>    </executions></plugin>
                    

命令行运行代码

bin/flink run -m pi3:8081 -c com.stanleylog.WordCountStreamunboundedDemo ~/learning-flink-1.0-SNAPSHOT.jar

部署模式

  • 会话模式(Session Mode)
  • 单作业模式(Per-Job Mode) 需要资源管理平台才可支持。
  • 应用模式(Application Mode) 通过standalone-job.sh start –job-classname xxx.xxx.xxx启动, 并且上传的Jar必须在lib目录下。 三种模式主要区别的集群的生命周期以及资源的分配方式上。

运行模式

  • Standalone 模式, Flink自己管理资源,不依赖外部资源管理平台。没有自动扩展或重新分配资源的保证, 必须手动处理。默认使用会话模式, 可支持应用模式,不支持单作业模式。
  • YARN模式, 最为常用的模式。
  • K8S模式
Standalone模式
### 启动
$ mv ~/learning-flink-1.0-SNAPSHOT.jar /opt/flink-1.17.1/lib
$ bin/standalone-job.sh start --job-classname com.stanleylog.WordCountStreamunboundedDemo
$ bin/taskmanager.sh start  ### taskmanager需要手工启动

### 停止
$ bin/taskmanager.sh stop
$ bin/standalone-job.sh stop
Yarn模式
$ vi /etc/profile

# Hadoop Env
export HADOOP_HOME=/opt/hadoop-2.7.7
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

# Flink Env
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

会话模式(Session)
# 启动
$ bin/yarn-session.sh -d -nm flink-test

# 停止
$ echo "stop" | ./bin/yarn-session.sh -id application_xxx
# 或者
$ yarn application -kill application_xxx

高版本的Flink中,无需去更改Flink配置文件,它会自己进行动态覆盖。

命令行运行代码
$ bin/flink run -d -c com.stanleylog.WordCountStreamunboundedDemo ~/learning-flink-1.0-SNAPSHOT.jar
单作业模式(Pre-Job)
### 启动
$ bin/flink run -d -t yarn-pre-job -c com.stanleylog.WordCountStreamunboundedDemo ~/learning-flink-1.0-SNAPSHOT.jar

### 关闭
# YARN UI界面 
# 或者
$ bin/flink list -t yarn-pre-job -Dyarn.application.id=application_xxx #查看JOB_ID
$ bin/flink cancel -t yarn-pre-job -Dyarn.application.id=application_xxx <JOB_ID>
应用模式(Application)重点
### 启动
$ bin/flink run-application -d -t yarn-application -nm test -c com.stanleylog.WordCountStreamunboundedDemo ~/learning-flink-1.0-SNAPSHOT.jar

### 关闭
# YARN UI界面 
# 或者
$ bin/flink list -t yarn-application -Dyarn.application.id=application_xxx #查看JOB_ID
$ bin/flink cancel -t yarn-application -Dyarn.application.id=application_xxx <JOB_ID>

使用HDFS文件的模式 —推荐生产环境使用

### 上传Flink类库
$ hadoop fs -mkdir /flink-dist
$ hadoop fs -put /opt/flink-1.17.1/lib/ /flink-dist
$ hadoop fs -put /opt/flink-1.17.1/plugins/ /flink-dist

### 上传Flink程序
$ hadoop fs -mkdir /flink-jars
$ hadoop fs -put ~/learning-flink-1.0-SNAPSHOT.jar /flink-jars

### 启动
$ bin/flink run-application -d -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hadoop1:9000/flink-dist" -c com.stanleylog.WordCountStreamunboundedDemo hdfs://hadoop1:9000/flink-jars/learning-flink-1.0-SNAPSHOT.jar
历史服务器
$ hadoop fs -mkdir -p /logs/flink-job

### 修改flink配置
$ vi conf/flink-conf.yaml
jobmanager.archive.fs.dir: hdfs://hadoop1:9000/logs/flink-job
historyserver.web.address: hadoop2
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://hadoop1:9000/logs/flink-job
historyserver.archive.fs.refresh-interval: 5000

### 启动
bin/historyserver.sh start

### 停止
bin/historyserver.sh stop

可以通过 http://hadoop2:8082 访问

运行时架构

![[Pasted image 20231117110403.png]]

重要组件:

  • JobMaster
  • 资源管理器(ResourceManager)
  • 分发器(Dispatcher)
  • 任务管理器(TaskManager)

核心概念

并行度

Flink的并行度代表最大并行度数量,可以通过.setParallelism()进行设置。 设置方式, 执行优先级如下:

  • 代码中通过setParallelism设置并行度
  • env整体设置并行度
  • 命令行提交代码时通过-p参数进行设置并行度
  • WebUI 提交页面设置并行度
  • Flink配置文件中的默认并行度

算子链

  1. 一对一(One to One)
  2. 重分区(Redistributing)

关闭算子链

  1. 算子链可以通过env.disableOperatorChaining()进行全局禁用,禁用之后所有的算子将不会被合并。
  2. 某个算子不参与链化:算子A.disableChaining(),算子A不会与前面和后面的算子进行合并。
  3. 从某个算子开启新链条:算子A.startNewChain(), 算子A不参与前面的合并,从A开始正常链化。

任务槽(Slot)

主要按照内存划分资源,每个slot专门给具体的task使用,不会进行资源的调配。可以通过修改配置参数更改默认的TaskManager分配的Slot数量。

taskmanager.numberOfTaskSlots: 1

CPU资源不会进行资源隔离的,所以建议使用所在主机的CPU核心数划分此参数。 在同一作业中,不同任务节点的并行任务可以放在同一slot上执行。 可以通过.slotSharingGroup设置算子的不用共享组,默认共享组为default.

任务槽和并行度的关系

任务槽是静态的概念,是指TaskManager具有的并发执行能力,可以通过taskmanager.numberOfTaskSlots进行配置; 而并行度是动态的概念,也就是TaskManager运行程序时实际使用的并发能力,可通过参数paralleism.default配置。 slot数量必须>= job并行度

作业提交流程

Standalone会话模式作业提交流程

image-20250305154637570

  1. 逻辑流图(Stream Graph)
  2. 作业流图(Job Graph)
  3. 执行图(Execution Graph) 最为重要
  4. 物理图(Physical Graph)

Yarn应用模式作业提交流程

image-20250305165129856

DataStream API

五大类API:

  • Execution Environment(获取执行环境)
  • Source(读取数据源)
  • Transformation(转换操作)
  • Sink(输出)
  • Execution(触发执行)

执行环境(Execution Environment)

创建执行环境

自动获取环境

它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
创建本地环境

这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
创建远程环境

这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
          .createRemoteEnvironment(
           "host",                   // JobManager主机名
           1234,                     // JobManager进程端口号
           "path/to/jarFile.jar"  // 提交给JobManager的JAR包
);

执行模式(Execution Mode)

流执行模式(Streaming)

这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。

批执行模式(Batch)

专门用于批处理的执行模式。

自动模式(AutoMatic)

在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。 批执行模式的使用。主要有两种方式:

  • 通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...

在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。

  • 通过代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。 实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。

触发程序执行

写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。

env.execute();

源算子(Source)

从集合中读取数据

List<Integer> data = Arrays.asList(1, 22, 3);
DataStreamSource<Integer> ds = env.fromCollection(data);

从文件中读取数据

POM.xml中添加

<dependency>  
    <groupId>org.apache.flink</groupId>  
    <artifactId>flink-connector-files</artifactId>  
    <version>${flink.version}</version>  
    <scope>provided</scope>  
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
env.setParallelism(1);  
  
FileSource<String> fileSource = FileSource.forRecordStreamFormat(  
        new TextLineInputFormat(),  
        new Path("input/word.txt")  
).build();  
  
env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource").print();  
  
  
env.execute();

从Kafka中读取数据

<dependency>  
    <groupId>org.apache.flink</groupId>  
    <artifactId>flink-connector-kafka</artifactId>  
    <version>${flink.version}</version>  
    <scope>provided</scope>  
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
env.setParallelism(1);  
  
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()  
        .setBootstrapServers("hadoop3:9092")  
        .setGroupId("stanley")  
        .setTopics("topic_1")  
        .setValueOnlyDeserializer(new SimpleStringSchema())  
        .setStartingOffsets(OffsetsInitializer.latest())  
        .build();  
  
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource").print();  
  
env.execute();

从数据生成器中读取数据

<dependency>  
    <groupId>org.apache.flink</groupId>  
    <artifactId>flink-connector-datagen</artifactId>  
    <version>${flink.version}</version>  
    <scope>provided</scope>  
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
env.setParallelism(1);  
  
DataGeneratorSource<String> datageneratorSource = new DataGeneratorSource<>(  
        new GeneratorFunction<Long, String>() {  
            @Override  
            public String map(Long value) throws Exception {  
                return "Number: " + value;  
            }  
        },  
        Long.MAX_VALUE,  
        RateLimiterStrategy.perSecond(3),  
        Types.STRING  
);  
  
env.fromSource(datageneratorSource,WatermarkStrategy.noWatermarks(), "data-generator").print();  
  
env.execute();

转换算子(Transformation)

基本转换算子

映射(map)

image-20250305165215532 用于数据流中的数据进行转换,形成新的数据流。简单的说,就是一个“一一映射”,消费一个元素就产生一个元素。

SingleOutputStreamOperator<String> map = sensorDS.map(new MapFunction<WaterSensor, String>() {  
    @Override  
    public String map(WaterSensor value) throws Exception {  
        return value.getId();  
    }  
});
过滤(Filter)

对数据执行一个过滤,通过布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。 image-20250305165236051

SingleOutputStreamOperator<WaterSensor> filter = sensorDS.filter(new FilterFunction<WaterSensor>() {  
    @Override  
    public boolean filter(WaterSensor value) throws Exception {  
        return "s1".equals(value.getId());  
    }  
});
扁平映射(flatMap)

image-20250305165254715 将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素,简单的说就是”一进多出“。flatmap可以认为是“扁平化”flatten和“映射”map两步操作的结合。就是按某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

SingleOutputStreamOperator<String> flatMap = sensorDS.flatMap(new FlatMapFunction<WaterSensor, String>() {  
    @Override  
    public void flatMap(WaterSensor value, Collector<String> out) throws Exception {  
        if ("s1".equals(value.getId())) {  
            out.collect(value.getVc() + "");  
        } else if ("s2".equals(value.getId())) {  
            out.collect(value.getVc() + "");  
            out.collect(value.getTs() + "");  
        }  
    }  
});

聚合算子

按键分区(keyBy)

keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流逻辑上划分成不同的分区(partition)。这里说的分区,其实就是并行处理的子任务。 基于不同的key,流中的数据将被分到不同的分区中去;这样一来,所有具有相同的key的数据,都被发往同一个分区。 image-20250305165315960

/**  
 * 1. 返回的是一个KeyedStream 键控流 
 * 2. keyby不是转换算子,只是对数据进行重分区, 也不能设置并行度 
 * 3. keyby 分组和分区的关系: 
 *  1). keyby是对数据分组,保证相同的key的数据 在同一个分区 
 *  2). 分区:一个子任务,可以理解为一个分区. 一个分区(子任务)可以存在多个分组(key) 
 **/
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(new KeySelector<WaterSensor, String>() {  
    @Override  
    public String getKey(WaterSensor value) throws Exception {  
        return value.getId();  
    }  
});  
  
sensorKS.print();

简单聚合(sum/min/max/minBy/maxBy)
/**  
* 简单聚合算子         
* 1。 keyby之后才能调用         
**/  
// 传位置索引的,不适用pojo类型,适用于tuple类型。  
//        SingleOutputStreamOperator<WaterSensor> result= sensorKS.sum(2);  
//        SingleOutputStreamOperator<WaterSensor> result= sensorKS.sum("vc");  
//        SingleOutputStreamOperator<WaterSensor> result= sensorKS.min("vc");  
//        SingleOutputStreamOperator<WaterSensor> result= sensorKS.max("vc");  
/**  
 * maxBy: 会将返回比较字段的最大值,非比较字段也保留最大值记录的值         
 * max值返回比较字段的最大值,而非比较字段保留第一次值         
 *         
 * min/minBy也是如此规则         
 **/        
SingleOutputStreamOperator<WaterSensor> result= sensorKS.maxBy("vc");
归约聚合(reduce)
SingleOutputStreamOperator<WaterSensor> reduce = sensorKS.reduce(new ReduceFunction<WaterSensor>() {  
    @Override  
    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {  
        System.out.println("value1: " + value1);  
        System.out.println("value2: " + value2);  
        return new WaterSensor(value1.getId(), value1.getTs(), value1.getVc() + value2.getVc());  
    }  
});  
  
reduce.print();

用户自定义函数(UDF)

用户自定义函数分为:函数类、匿名函数、富函数类。

函数类(Function Classes)
public class FilterFunctionImpl implements FilterFunction<WaterSensor> {  
  
    public String id;  
  
    public FilterFunctionImpl(String id) {  
        this.id = id;  
    }  
  
    @Override  
    public boolean filter(WaterSensor value) throws Exception {  
        return this.id.equals(value.getId());  
    }  
}
匿名类
SingleOutputStreamOperator<String> flatMap = sensorDS.flatMap(new FlatMapFunction<WaterSensor, String>() {  
    @Override  
    public void flatMap(WaterSensor value, Collector<String> out) throws Exception {  
        if ("s1".equals(value.getId())) {  
            out.collect(value.getVc() + "");  
        } else if ("s2".equals(value.getId())) {  
            out.collect(value.getVc() + "");  
            out.collect(value.getTs() + "");  
        }  
    }  
});
富函数类(RichFunction Classes)
SingleOutputStreamOperator<Integer> map = source.map(new RichMapFunction<Integer, Integer>() {  
  
    @Override  
    public void open(Configuration parameters) throws Exception {  
        super.open(parameters);  
  
        RuntimeContext runtimeContext = getRuntimeContext();  
        int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();  
        String taskNameWithSubtasks = runtimeContext.getTaskNameWithSubtasks();  
  
        System.out.println(indexOfThisSubtask + ": " + taskNameWithSubtasks + "调用Open...");  
    }  
  
    @Override  
    public void close() throws Exception {  
        super.close();  
  
        RuntimeContext runtimeContext = getRuntimeContext();  
        int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();  
        String taskNameWithSubtasks = runtimeContext.getTaskNameWithSubtasks();  
  
        System.out.println(indexOfThisSubtask + ": " + taskNameWithSubtasks + "调用Close...");  
    }  
  
    @Override  
    public Integer map(Integer value) throws Exception {  
        return value + 1;  
    }  
});

物理分区算子(Physical Partitioning)

常见的物理分区算子包括:随机分配(Random)、轮训分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)

global : 全部发往第一个子任务中
keyby: 按指定key去发送, 相同可以发往同一个子任务
one-to-one: forward分区器
总结:flink提供7种分区器+1中自定义

随机分配(shuffle)

image-20250305165346940

stream.shuffle()
轮询分区(round-robin)

image-20250305165406779

stream.rebalance()
重缩放分区(rescale)

image-20250305171006662

stream.rescale()
广播(broadcast)
stream.broadcast()
全局分区(global)
stream.global()
自定义分区(Custom)
public class MyPartitioner implements Partitioner<String> {
    @Override
    public int partition(String key, int numPartitions) {
        return Integer.parseInt(key) % numPartitions;
    }
}

DataStream<String> myDS = socketDS
	           .partitionCustom( new MyPartitioner(), value -> value);

分流

image-20250305171028002

使用Filter实现分流效果

但是存在缺点,因为数据都会被所有的filter所处理。

DataStreamSource<String> socketDS = env.socketTextStream("localhost", 7777);  
/**  
 * 使用filter实现分流效果 * 缺点:相同的数据会被处理多次 
 **/
socketDS.filter(value -> Integer.parseInt(value) % 2 == 0).print("偶数流");  
  
socketDS.filter(value -> Integer.parseInt(value) % 2 == 1).print("奇数流");
侧输出流

解决了Filter重复处理的问题

OutputTag<WaterSensor> s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));  
OutputTag<WaterSensor> s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));  
  
SingleOutputStreamOperator<WaterSensor> process = map.process(new ProcessFunction<WaterSensor, WaterSensor>() {  
    @Override  
    public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {  
  
        String id = value.getId();  
  
        if ("s1".equals(id)) { //放到侧输出流s1中  
            ctx.output(s1Tag, value);  
        } else if ("s2".equals(id)) { //放到侧输出流s2中  
            ctx.output(s2Tag, value);  
        } else {  
            out.collect(value);  
        }  
    }  
});

合流

联合(Union)

union的流之间必须是相同类型 image-20250305171048186

DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);  
DataStreamSource<Integer> source2 = env.fromElements(4, 5, 6);  
DataStreamSource<String> source3 = env.fromElements("7", "8", "9");  
  
source1.union(source2).union(source3.map(value -> Integer.parseInt(value))).print();
连接(Connect)

连接后得到的不是DataStream,连接的流是形式上放在同一个流中, 事实上内部各自保留各自的数据形式不变, 彼此相互独立。一次只能连接两条流。 image-20250305171108382

DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);  
DataStreamSource<String> source2 = env.fromElements("a", "b", "c");  
  
ConnectedStreams<Integer, String> connect = source1.connect(source2);  
SingleOutputStreamOperator<String> map = connect.map(new CoMapFunction<Integer, String, String>() {  
    @Override  
    public String map1(Integer value) throws Exception {  
        return String.valueOf(value);  
    }  
  
    @Override  
    public String map2(String value) throws Exception {  
        return value;  
    }  
});

输出算子(Sink)

Flink1.2开始使用stream.sinkTo(…) image-20250305171129988 image-20250305171148038

FileSink


FileSink<String> fileSink = FileSink.<String>forRowFormat(new Path("output"), new SimpleStringEncoder<>("UTF-8"))  
        // 输出文件的一些配置:文件前缀、后缀...  
        .withOutputFileConfig(OutputFileConfig.builder()  
                .withPartPrefix("filesink")  
                .withPartSuffix(".log")  
                .build()  
        )  
        // 文件分桶  
        .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))  
        // 文件滚动策略  
        .withRollingPolicy(  
                DefaultRollingPolicy.builder()  
                        .withRolloverInterval(Duration.ofSeconds(10))  
                        .withMaxPartSize(new MemorySize(1024 * 1024))  
                        .build()  
        )  
        .build();

KafkaSink

// 必须开启checkpoint, 否则无法写入kafka  
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);  
  
  
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 7777);  
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()  
        // 指定kafka的地址和端口  
        .setBootstrapServers("hadoop3:9092")  
        // 指定序列化器, 指定topic名称、具体的序列化  
        .setRecordSerializer(  
                KafkaRecordSerializationSchema.<String>builder()  
                        .setTopic("ws")  
                        .setValueSerializationSchema(new SimpleStringSchema())  
                        .build())  
        // 写到kafka的一致性级别:精准一次、至少一次  
        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)  
        // 如果是精准一次 必须要设置事务前缀  
        .setTransactionalIdPrefix("stanley-")  
        // 如果是精准一次 必须设置事务超时时间 , 要大于checkpoint时间, 小于max 15分钟  
        .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, String.valueOf(10 * 60 * 1000))  
        .build();  
  
socketDS.sinkTo(kafkaSink);

KafkaSinkWithKey


// 必须开启checkpoint, 否则无法写入kafka  
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);  
  
  
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 7777);  
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()  
        // 指定kafka的地址和端口  
        .setBootstrapServers("hadoop3:9092")  
        // 指定序列化器, 指定topic名称、具体的序列化  
        .setRecordSerializer(  
                new KafkaRecordSerializationSchema<String>() {  
                    @Nullable  
                    @Override                    public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {  
  
                        String[] datas = element.split(",");  
                        byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);  
                        byte[] value = element.getBytes(StandardCharsets.UTF_8);  
  
                        return new ProducerRecord<>("ws", key, value);  
                    }  
                }  
        )  
        // 写到kafka的一致性级别:精准一次、至少一次  
        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)  
        // 如果是精准一次 必须要设置事务前缀  
        .setTransactionalIdPrefix("stanley-")  
        // 如果是精准一次 必须设置事务超时时间 , 要大于checkpoint时间, 小于max 15分钟  
        .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, String.valueOf(10 * 60 * 1000))  
        .build();  
  
socketDS.sinkTo(kafkaSink);

MySQLSink

/**  
 * 1. 只能使用老式的addsink写法 
 * 2. JdbcSink的四个参数 
 *  1) 执行的SQL 
 *  2) 预编译SQL 
 *  3) 执行选项, 攒批,重试 
 *  4) 连接选项, url, 用户名,密码 
 **/
 SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(  
        "insert into ws values(?, ?, ?)",  
        new JdbcStatementBuilder<WaterSensor>() {  
            @Override  
            public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {  
                preparedStatement.setString(1, waterSensor.getId());  
                preparedStatement.setLong(2, waterSensor.getTs());  
                preparedStatement.setInt(3, waterSensor.getVc());  
            }  
        },  
        JdbcExecutionOptions.builder()  
                .withMaxRetries(3)  
                .withBatchSize(100)  
                .withBatchIntervalMs(3000)  
                .build(),  
        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()  
                .withUrl("jdbc:mysql://localhost:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")  
                .withUsername("root")  
                .withPassword("12345")  
                .withConnectionCheckTimeoutSeconds(60)  
                .build()  
  
);  
  
sensorDS.addSink(jdbcSink)

自定义Sink

建议优先使用官方提供的Sink,除非没有途径才考虑自定义Sink。

stream.addSink(new MySinkFunction<String>());

Flink中的时间和窗口

窗口

所谓窗口就是划定的一段时间范围,对范围内的数据进行处理,就是所谓的窗口计算。 Flink是一种流式计算引擎,主要用来处理无界数据流,数据源源不断,无穷无尽。想要更加方便高效的处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”。 image-20250305171218556

窗口的分类

按驱动类型划分
  • 时间窗口(Time Window)
  • 计数窗口(Count Window)
按窗口分配数据的规则分类
  • 滚动窗口(Tumbling Window)

image-20250305171246738

滚动窗口有固定的大小, 是一种对数据进行“均匀切片”的划分方式,窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。每个数据都会被分配到一个窗口,而且只会属于一个窗口。
  • 滑动窗口(Siding Window)

image-20250305171309348

滑动窗口的大小也是固定的,但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置。
定义滑动窗口的参数有两个,除去窗口大小(Window Size)之外,还有一个滑动步长(Window slide), 它其实代表窗口计算的频率。
滑动窗口会出现“重叠”,数据也可能会被同时分配到多个窗口中。
  • 会话窗口(Session Window)

image-20250305171331510

是基于会话来对数据进行分组的,会话窗口只能基于时间来定义。
会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。会话窗口的长度不固定,起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。会话窗口之间一定是不会重叠的。
  • 全局窗口(Global Window)

image-20250305171349144

这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时候,默认是不会作为触发计算的

窗口分配器

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。所以可以说,窗口分配器其实就是在指定窗口的类型。 窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner作为参数,返回WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是AllWindowedStream。 窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

时间窗口

  • 滚动时间窗口(TumblingProcessingTimeWindows/TumblingEventTimeWindows)
sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))); 
// 滚动时间窗口, 窗口长度为10s
  • 滑动时间窗口(SlidingProcessingTimeWindows/SlidingEventTimeWindows)
sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))); 
// 滑动时间窗口, 窗口长度为10s, 滑动步长2s
  • 会话时间窗口(ProcessingTimeSessionWindows/EventTimeSessionWindows)
sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))); 
// 会话窗口, 间隔5s

计数窗口

  • 滚动窗口(countWindow)
sensorKS.countWindow(5); 
// 滚动窗口,窗口长度为5个元素
  • 滑动窗口(countWindow)
sensorKS.countWindow(5, 2); 
// 滑动窗口, 窗口长度为5个元素,窗口步长为2个元素
  • 全局窗口(window)
sensorKS.window(GlobalWindows.create()); 
// 全局窗口,计数窗口的底层就是使用的这个, 需要自定义触发器,很少使用

窗口函数

image-20250305171619199

增量聚合函数
归约函数(ReduceFunction)
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());  
  
// 窗口分配器  
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));  

//窗口函数  
/**  
 * 窗口的reduce 
 * 1. 相同key的第一条数据来的时候,不会调用reduce方法 
 * 2. 增量聚合:来一条数据后, 就会计算一次,但不会输出 
 * 3. 在窗口触发的时候才会输出整个窗口的最终计算结果 */
   
   SingleOutputStreamOperator<WaterSensor> reduce = sensorWS.reduce(new ReduceFunction<WaterSensor>() {  
    @Override  
    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {  
        System.out.println("调用reduce方法, value1=" + value1 + ", value2=" + value2);  
        return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());  
    }  
});
增量聚合函数(AggregateFunction)
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());  
  
// 窗口分配器  
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));  
  
//窗口函数  
/**  
 * 窗口的Aggregate 
 * 三个参数:第一个类型输入数来据的类型,第二个类型累加器的类型,存储中间计算结果的类型, 第三个类型输出数据的类型 
 * 1. 属于本窗口的第一条数据来, 创建窗口,创建累加器 
 * 2. 增量聚合:来一条计算一条,调研一次add方法 
 * 3. 窗口输出时调研一次getresult方法 
 * 4. 输入、中间累加器、输出 类型可以不一样,非常灵活 */
SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {  
    @Override  
    public Integer createAccumulator() {  
        System.out.println("创建累加器");  
        return 0;  
    }  
  
    // 聚合逻辑  
    @Override  
    public Integer add(WaterSensor value, Integer accumulator) {  
        System.out.println("调研add方法, value=" + value);  
        return accumulator + value.getVc();  
    }  
  
    // 获取最终结果,窗口触发输出  
    @Override  
    public String getResult(Integer accumulator) {  
        System.out.println("调研getResult方法");  
        return accumulator.toString();  
    }  
  
    @Override  
    public Integer merge(Integer a, Integer b) {  
        System.out.println("调研merge方法");  
        return null;    }  
});
全窗口函数(ProcessWindowFunction/ApplyFunction)
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());  
  
        // 窗口分配器  
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));  
  
//        sensorWS.apply(new WindowFunction<WaterSensor, String, String, TimeWindow>() {  
//            /**  
//             * 老写法  
//             * @param s 分组key  
//             * @param window 窗口对象  
//             * @param input 存的数据  
//             * @param out 采集器  
//             * @throws Exception  
//             */  
//            @Override  
//            public void apply(String s, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {  
//  
//            }  
//        })  
  
        SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {  
            /**  
             * 全窗口函数计算逻辑(新写法):窗口触发时才调用一次, 统一计算窗口所有数据             * @param s 分组key  
             * @param context 上下文  
             * @param elements 存的数据  
             * @param out 采集器  
             * @throws Exception  
             */            
             @Override  
            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {  
  
                long start = context.window().getStart();  
                long end = context.window().getEnd();  
  
                String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");  
                String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");  
  
                long count = elements.spliterator().estimateSize();  
                out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);  
  
            }  
        });
增量聚合窗口结合(最为全面)
// WindowAggregateAndProcessDemo
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());  
  
// 窗口分配器  
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));  
  
//窗口函数  
SingleOutputStreamOperator<String> result = sensorWS.aggregate(new MyAgg(), new MyProc());  
  
result.print();  
  
env.execute();

public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String> {  
    @Override  
    public Integer createAccumulator() {  
        System.out.println("创建累加器");  
        return 0;  
    }  
  
    @Override  
    public Integer add(WaterSensor value, Integer accumulator) {  
        System.out.println("调研add方法, value=" + value);  
        return accumulator + value.getVc();  
    }  
  
    @Override  
    public String getResult(Integer accumulator) {  
        System.out.println("调研getResult方法");  
        return accumulator.toString();  
    }  
  
    @Override  
    public Integer merge(Integer a, Integer b) {  
        System.out.println("调研merge方法");  
        return null;    }  
  
}  
  
public static class MyProc extends ProcessWindowFunction<String, String, String, TimeWindow> {  
  
    @Override  
    public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {  
  
        long start = context.window().getStart();  
        long end = context.window().getEnd();  
  
        String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");  
        String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");  
  
        long count = elements.spliterator().estimateSize();  
        out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);  
  
    }  
}


// WindowReduceAndProcessDemo
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(value -> value.getId());  
  
// 窗口分配器  
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));  
  
//窗口函数  
/**  
 * 窗口的reduce 
 * 1. 相同key的第一条数据来的时候,不会调用reduce方法 
 * 2. 增量聚合:来一条数据后, 就会计算一次,但不会输出 
 * 3. 在窗口触发的时候才会输出整个窗口的最终计算结*/
SingleOutputStreamOperator<String> reduce = sensorWS.reduce(new MyReduce(), new MyProc());  
  
reduce.print();  
  
env.execute();


public static class MyReduce implements ReduceFunction<WaterSensor> {  
    @Override  
    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {  
        System.out.println("调研reduce方法, value1=" + value1 + ", value2=" + value2);  
        return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());  
    }  
}  
  
public static class MyProc extends ProcessWindowFunction<WaterSensor, String, String, TimeWindow> {  
  
    @Override  
    public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {  
  
        long start = context.window().getStart();  
        long end = context.window().getEnd();  
  
        String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");  
        String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");  
  
        long count = elements.spliterator().estimateSize();  
        out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);  
  
    }  
}
其他API
  • 触发器 触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。 基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
stream.keyBy(...)
       .window(...)
       .trigger(new MyTrigger())
  • 移除器 移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。
stream.keyBy(...)
       .window(...)
       .evictor(new MyEvictor())

时间语义

image-20250305171702406

  • 事件时间
  • 处理时间 image-20250305171719793

水位线

image-20250305171736540 在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。

设置事件时间语义及水位线
  • 单调上升水位线策略(有序流)
// 1. 定义watermark策略  
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy  
        // 升序watermark, 没有等待时间  
        .<WaterSensor>forMonotonousTimestamps()  
        // 指定时间分配器,从数据中提取  
        .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {  
            @Override  
            public long extractTimestamp(WaterSensor element, long recordTimestamp) {  
                // 返回的时间戳要为毫秒  
                System.out.println("数据=" + element + ", recordTs=" + recordTimestamp);  
                return element.getTs() * 1000L;  
            }  
        });  
// 2. 指定watermark策略  
SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);  

sensorDSwithWatermark.keyBy(value -> value.getId())  
        // 指定使用事件事件语义窗口  
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))  
        .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {  

            @Override  
            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {  
      
                long start = context.window().getStart();  
                long end = context.window().getEnd();  
      
                String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");  
                String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");  
      
                long count = elements.spliterator().estimateSize();  
                out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);  
      
            }  
        }).print();

  • 乱序水位线策略
// 1. 定义watermark策略  
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy  
        // 乱序watermark, 等待3s  
        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))  
        // 指定时间分配器,从数据中提取  
        .withTimestampAssigner((element, recordTimestamp) -> {  
            // 返回的时间戳要为毫秒  
            System.out.println("数据=" + element + ", recordTs=" + recordTimestamp);  
            return element.getTs() * 1000L;  
        });  
// 2. 指定watermark策略  
SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);  
  
sensorDSwithWatermark.keyBy(value -> value.getId())  
        // 指定使用事件事件语义窗口  
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))  
        .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {  
  
            @Override  
            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {  
  
                long start = context.window().getStart();  
                long end = context.window().getEnd();  
  
                String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");  
                String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");  
  
                long count = elements.spliterator().estimateSize();  
                out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);  
  
            }  
        }).print();
  • 自定义水位线
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy  
//                .forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {  
//                    @Override  
//                    public WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {  
//                        return new MyPeriodWatermarkGenerator<>(3000L);  
//                    }  
//                })  
                // 指定自定义的生成器, 周期性生成//                .<WaterSensor>forGenerator(context -> new MyPeriodWatermarkGenerator<>(3000L))  
                // 指定自定义的生成器,断点式生成                .<WaterSensor>forGenerator(context -> new MyPuntuatedWatermarkGenerator<>(3000L))  
                .withTimestampAssigner((element, recordTimestamp) -> {  
                    System.out.println("数据=" + element + ", recordTs=" + recordTimestamp);  
                    return element.getTs() * 1000L;  
                });
public class MyPeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {  

    // 用来保存乱序等待时间  
    private long delayTs;  
    // 用来保存当前最大的事件时间  
    private long maxTs;  
      
    public MyPeriodWatermarkGenerator(long delayTs) {  
        this.delayTs = delayTs;  
        this.maxTs = Long.MIN_VALUE + this.delayTs + 1;  
    }  
      
    /**  
     * 每条数据来都会被调用一次,用来提取最大事件事件,保存下来     * @param event  
     * @param eventTimestamp 提取到的数据的 事件时间  
     * @param output  
     */  
    @Override  
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {  
        maxTs = Math.max(maxTs, eventTimestamp);  
        System.out.println("调用了onEvent方法,获取目前为止的最大时间戳=" + maxTs);  
    }  
      
    /**  
     * 周期性调用:发射 watermark     * @param output  
     */  
    @Override  
    public void onPeriodicEmit(WatermarkOutput output) {  
        output.emitWatermark(new Watermark(maxTs - delayTs - 1));  
        System.out.println("调用了onPeriodicEmit方法,生成watermark=" + (maxTs - delayTs - 1));  
    }  
}

public class MyPuntuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {  

    // 用来保存乱序等待时间  
    private long delayTs;  
    // 用来保存当前最大的事件时间  
    private long maxTs;  
      
    public MyPuntuatedWatermarkGenerator(long delayTs) {  
        this.delayTs = delayTs;  
        this.maxTs = Long.MIN_VALUE + this.delayTs + 1;  
    }  
      
    /**  
     * 每条数据来都会被调用一次,用来提取最大事件事件,保存下来, 并发射watermark     * @param event  
     * @param eventTimestamp 提取到的数据的 事件时间  
     * @param output  
     */  
    @Override  
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {  
        maxTs = Math.max(maxTs, eventTimestamp);  
        output.emitWatermark(new Watermark(maxTs - delayTs - 1));  
        System.out.println("调用了onEvent方法,获取目前为止的最大时间戳=" + maxTs + ", watermark = " + (maxTs - delayTs - 1));  
    }  
      
    /**  
     * 周期性调用:不需要     * @param output  
     */  
    @Override  
    public void onPeriodicEmit(WatermarkOutput output) {  
      
    }  
}

  • 源算子中设置
env.fromSource(  
        kafkaSource,  
        // 使用源算子中的水位线策略, 乱序,提取字符串中关键字  
        WatermarkStrategy  
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))  
                .withTimestampAssigner((element, recordTimestamp) -> {  
                    return Long.valueOf(element.split(",")[1]);  
                }),  
        "kafkasource"  
).print();
水位线的传递

image-20250305171842090 在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。 水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。

延迟数据处理
  • 问题1:水位线在多并行度中传递取最小

    通过设置延迟等待时间,防止某个分区一直没有数据造成整体水位无法提升的问题

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
env.setParallelism(2);  
  
// 自定义分区器,数据%分区数,只输入奇数都知会去往map的一个子任务  
SingleOutputStreamOperator<Integer> socketDS = env  
        .socketTextStream("localhost", 7777)  
        .partitionCustom(new MyPartitioner(), r -> r) //以自身数据为key,通过奇偶性判定分区  
        .map(r -> Integer.parseInt(r))  
        .assignTimestampsAndWatermarks(WatermarkStrategy  
                .<Integer>forMonotonousTimestamps()  
                .withTimestampAssigner((r,rs) -> r * 1000L)  
                .withIdleness(Duration.ofSeconds(5)) //设置空闲等待时间,防止某个分区一直没有数据造成整体水位无法提升的问题  
        );  
  
// 分成两组,奇数一组,偶数一组, 开10s的事件时间滚动窗口  
socketDS.keyBy( r -> r % 2)  
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))  
        .process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {  
            @Override  
            public void process(Integer s, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {  
                long start = context.window().getStart();  
                long end = context.window().getEnd();  
  
                String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");  
                String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");  
  
                long count = elements.spliterator().estimateSize();  
                out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);  
  
            }  
        })  
        .print();
  • 问题2:乱序中的迟到数据
    • 解决方法1:设置推迟水位推进
	WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3) 
  • 解决方法2:设置窗口延迟关闭(触发计算后来的迟到数据会再次触发计算)
	.window(TumblingEventTimeWindows.of(Time.seconds(10)))  
	.allowedLateness(Time.seconds(2)) // 推迟2s关窗
  • 解决方法3:使用侧输出流
OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));  
SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(value -> value.getId())  
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))  
        .allowedLateness(Time.seconds(2)) // 推迟2s关窗  
        .sideOutputLateData(lateTag) // 关窗后的迟到数据,放到侧输出流  
        .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {  
  
            @Override  
            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {  
  
                long start = context.window().getStart();  
                long end = context.window().getEnd();  
  
                String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");  
                String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");  
  
                long count = elements.spliterator().estimateSize();  
                out.collect("key=" + s + "的窗口【" + windowStart + ", " + windowEnd + "]包含" + count + "条数据 ---> " + elements);  
  
            }  
        });  
process.print();  
  
  
// 从主流获取侧输出流,打印  
process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");

基于时间的合流–双向联结(join)

窗口连接(window join)

DataStream<String> join = ds1.join(ds2)  
        .where(r1 -> r1.f0)  
        .equalTo(r2 -> r2.f0)  
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))  
        .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {  
  
            /**  
             * 关联上的数据,调用join方法             * @param first ds1的数据  
             * @param second ds2的数据  
             * @return  
             * @throws Exception  
             */            @Override  
            public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {  
                return first + "<--->" + second;  
            }  
        });  
  
join.print();

间隔联结(interval join)

只能支持事件时间 image-20250305172028549

KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r -> r.f0);  
KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r -> r.f0);

ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2))

处理函数(Process)

  • ProcessFunction

    最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。

  • KeyedProcessFunction

    对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。

  • ProcessWindowFunction

    开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。

  • ProcessAllWindowFunction

    同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。

  • CoProcessFunction

    合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。

  • ProcessJoinFunction

    间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。

  • BroadcastProcessFunction

    广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。

  • KeyedBroadcastProcessFunction

    按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。