大数据_Flink理论篇
前言
Github:https://github.com/HealerJean
一、介绍 Flink
1、Flink 是什么
它是一个开源的、分布式、高性能的流处理框架,既能处理“实时数据流”(比如用户点击、传感器数据),也能处理“批数据”(比如昨天的日志文件)。
| 特性 | 说明 |
|---|---|
| 真正的流处理 | 不像 Spark Streaming 那样“微批”,Flink 是逐条处理,延迟可低至毫秒级 |
| 状态管理 | 自动保存中间结果(比如用户累计点击数),支持大状态(GB/TB 级) |
精确一次(Exactly-once) |
即使故障重启,结果也不会多算或少算 |
| 高吞吐 & 低延迟 | 每秒处理百万级事件,延迟 < 10ms |
2、系统架构

+--------------------------------------------------+
| Client (提交作业) |
+--------------------------------------------------+
↓
+--------------------------------------------------+
| JobManager (主节点) |
| - JobMaster:调度任务 |
| - ResourceManager:管理 Slot 资源 |
| - Dispatcher:接收作业并启动 JobMaster |
+--------------------------------------------------+
↓
+------------------+ +------------------+
| TaskManager 1 | | TaskManager 2 |
| - Task Slot 1 | | - Task Slot 1 |
| (运行 SubTask) | | (运行 SubTask) |
| - Task Slot 2 | | - Task Slot 2 |
+------------------+ +------------------+
↑ ↑
数据流分区 数据流分区
1)Client(客户端)
作用:作业提交入口,不参与运行时计算,
-
你写完代码打包提交的地方(比如
flink run -c MyJob my.jar)✅ -
关键补充:
-
Client负责:- 解析用户代码(
DataStream API / SQL) - 构建
**JobGraph**(逻辑执行图) - 将
JobGraph+JAR包上传给JobManager
- 解析用户代码(
-
Client提交完就退出了!它不常驻集群(除非用Application Mode) -
三种部署模式影响 Client 行为:
模式 Client生命周期适用场景 Session Mode 提交后退出 交互式开发 Per-Job Mode 提交后退出 生产常用 Application Mode 常驻 JM 内部 Kubernetes 推荐
-
2)JobManager (JM) —— 集群“大脑”
核心职责:集群主节点,负责调度、容错、协调,相当于“大脑”,负责协调整个作业
a、JobMaster(每个作业一个)
- 负责单个作业的全生命周期管理
- 具体职责:
- 将
JobGraph转为ExecutionGraph(物理执行图) - 向
ResourceManager申请Slot - 触发和协调
Checkpoint / Savepoint - 处理
TaskManager心跳和失败恢复 - 决定
Operator Chaining策略
- 将
b、ResourceManager(资源管理器)
- 分配
Task Slot - 关键补充:
Slot是逻辑资源单位,不是物理CPU/内存RM不直接管CPU/Memory,而是通过 SlotManager 管理 Slot 池- 在
YARN/K8s上,RM还会动态申请/释放 TaskManager 容器
c、Dispatcher(作业分发器)
- 接收作业,启动
JobMaster - 额外功能:
- 维护
JobGraph缓存(用于Web UI展示) - 支持
Savepoint触发(flink savepoint <jobId>) - 在
HA模式下,配合ZooKeeper做JM故障转移
- 维护
3)TaskManager (TM) —— “工人”节点
工作节点,执行实际计算任务,真正干活的机器,运行具体的计算任务(
SubTask)
a、Task Slot(任务槽位)
TM上的资源隔离单元(类似“虚拟CPU核心”)

-
Slot≠CPU核心!它是是 逻辑 资源单位,不是 物理隔离-
所有
Slot共享同一个TaskManager的JVM进程 -
所有
Slot共享同一个CPU资源池(由 OS 调度) -
所有
Slot共享同一个JVM堆内存(Heap)-
内存如何隔离?虽然默认不隔离,但
Flink支持细粒度内存管理-
# 控制每个 Slot 的堆内存上限 taskmanager.memory.task.heap.size: 1024mb -
# 每个 Slot 的堆外内存(用于网络缓冲、RocksDB 等) taskmanager.memory.task.off-heap.size: 512mb
-
-
-
-
Slotvs并行度- 最大支持并行度 = 所有
TaskManager的Slot总数 - 例如:3 个
TaskManager,每个有 2 个Slot→ 最大并行度 =6
- 最大支持并行度 = 所有
-
为什么需要
Slot?- 防止单个大作业占满整个
TM - 允许多个作业共享同一个
TM(提高资源利用率) - 每个
Slot可运行一个Task
- 防止单个大作业占满整个
b、Operator Chaining(算子链)优化
Flink会将满足条件的连续算子(如map → filter → keyBy)合并成一个Task- 这个
Task在 一个Slot中运行,内部无序列化/网络开销 - 条件:
same parallelism + no shuffle + no repartition
3、Flink 程序的基本结构
Flink 应用由 数据流(Data Streams) 和 算子(Operators) 构成,形成一个 有向无环图(DAG):
[Source] → [Operator1] → [Operator2] → ... → [Sink]
Source:数据来源,如fromElements(),readTextFile()、Kafka、文件、集合等。Transformation:数据转换,包括map(),flatMap(),filter(),keyBy(),reduce()等。Sink:数据输出目的地,例如print(),writeAsText()、如数据库、控制台、文等。
1)执行环境与程序骨架
a、创建执行环境
getExecutionEnvironment():自动适配本地/集群模式。- 可通过
setParallelism(n)设置默认并行度。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
b、触发执行
- 惰性执行:只有调用
execute()时,Flink才会构建JobGraph并提交到JobManager。 - 作业名称用于在
Web UI中识别任务。
env.execute("My Flink Job");
c、示例代码:
- 获取
Flink流处理执行环境 - 创建一个
DataStream(数据流),使用fromElements()方法从一组预定义的 Person 对象构造源流 - 对数据流进行转换:使用 filter() 算子过滤出年龄大于等于 18 岁的 Person 对象
- 添加
Sink:将过滤后的结果打印到标准输出(控制台) - 触发作业执行
public static void main(String[] args) throws Exception {
// 1. 获取 Flink 流处理执行环境
// 这是所有 Flink 程序的入口点,用于构建和提交数据流作业
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 创建一个 DataStream(数据流),使用 fromElements() 方法从一组预定义的 Person 对象构造源流
// 此处模拟了一个小型静态数据集,包含三个人的信息
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35), // 成年人
new Person("Wilma", 35), // 成年人
new Person("Pebbles", 2) // 小孩
);
// 3. 对数据流进行转换:使用 filter() 算子过滤出年龄大于等于 18 岁的 Person 对象
// filter 接收一个 Predicate(布尔函数),返回 true 的元素会被保留在结果流中
DataStream<Person> adults = flintstones.filter(person -> person.age >= 18);
// 4. 添加 Sink:将过滤后的结果打印到标准输出(控制台)
// print() 是一个内置的 sink 算子,常用于调试和演示
// 输出格式为 "<subtask_id> <element>",例如 "1> Fred: age 35"
adults.print();
// 5. 触发作业执行
// 至此,Flink 才会真正构建并提交数据流图(DAG)到本地或集群运行
// execute() 方法会阻塞主线程,直到作业完成(对于有界流)或被取消(对于无界流)
env.execute();
}
4、有界流 vs 无界流
Flink 主要且最擅长处理无界流(实时数据流),但它通过“流批一体”架构,也能高效处理有界流(批处理),且使用同一套 API。
jav
1)这两个流区别是什么
| 类型 | 描述 | 典型场景 | Flink处理方式 |
|---|---|---|---|
| 有界流 | 数据集有限,可完全加载 | 批处理(比如一天的数据) | 视为特殊流处理 |
| 无界流 | 数据持续到达,理论上无限 | 实时监控(比如每秒进来的订单) | 原生支持 |
2)Flink 如何统一处理有界与无界流
- 无界流 → 用
StreamingExecutionEnvironment - 有界流(纯批)→ 用
BatchExecutionEnvironment
| 特性 | 无界流(流模式) | 有界流(批模式) |
|---|---|---|
| 执行模式 | Streaming(默认) |
可选 Batch(Flink 1.12+ 引入) |
| 调度方式 | 流式 pipeline,持续运行 |
优化为批调度(如全量 shuffle 后再计算) |
| 状态管理 | 必须启用 checkpoint | 可关闭状态,提升性能 |
| 窗口触发 | 依赖 watermark + trigger | 数据读完自动触发所有窗口 |
| 资源效率 | 长期运行,注重稳定性 | 短期任务,注重吞吐和完成速度 |
3)为什么无界流容易 OOM,而有界流不容易?
| 场景 | 无界流 + ProcessWindowFunction |
有界流 + ProcessWindowFunction |
|---|---|---|
| 窗口数据生命周期 | 必须在状态后端中持续缓存直到窗口触发 | 数据在 单次 task 执行栈内处理,结束后立即 GC |
| 背压与流量控制 | 若数据突增,状态可能持续增长 | 输入总量已知,调度器可预分配资源 |
| 容错机制 | 依赖 checkpoint 持久化状态 → 内存+磁盘都要存 |
失败重试只需重跑该 task,无需 checkpoint 窗口数据 |
| 大窗口风险 | 高(如 1 小时窗口 + 高 QPS = 数千万条) | 低(即使大窗口,也是“已知总量”,可优化调度) |
a、启用了 BATCH 模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
Flink会启用 批调度器(Batch Scheduler)- 不再使用
checkpoint、watermark触发等流式机制 - 而是像
Spark一样:先shuffle数据,再分阶段计算
b、Join 和 GroupBy 都是“分阶段执行”
- 第一阶段:
keyBy(userId)→ 将订单和用户数据按keyshuffle到同一 task manager - 第二阶段:每个
task只处理属于自己key的数据子集 - 第三阶段:窗口聚合时,每个窗口的数据在单次 task 调用中加载 → 计算 → 释放
📌 即使总数据 1 亿条,只要
key分布均匀,每个 task 只处理几万~几十万条,内存完全可控。
c、Flink 批引擎支持磁盘溢写(Spill to Disk)
- 如果某个
key的数据特别大(如“user1”有1000万订单),Flink 会:- 在
reduce阶段自动将中间数据spill到磁盘 - 类似
MapReduce的 “combine→spill→merge” 流程
- 在
- 这是
Streaming模式不具备的能力!
d、没有长期状态缓存
- 在流模式中,
ProcessWindowFunction必须把窗口数据持久化到状态后端,直到watermark触发。 - 但在批模式中,所有数据已知,窗口一次性触发,无需缓存,计算完就释放。
场景:批处理 ETL 场景,数据量很大(比如 1 亿条订单),但不 OOM
- 订单数据:
(orderId, userId, amount) - 用户数据:
(userId, region) - 目标:统计每个区域的总订单金额
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class BoundedJoinExample {
public static void main(String[] args) throws Exception {
// 1. 创建环境,并显式设置为 BATCH 模式(关键!)
StreamExecution环境 env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH); // ← 启用批优化
// 2. 模拟两个有界数据源(实际可用 readTextFile / fromCollection)
DataStream<Tuple3<String, String, Double>> orders = env.fromCollection(java.util.Arrays.asList(
Tuple3.of("order1", "user1", 100.0),
Tuple3.of("order2", "user2", 200.0),
Tuple3.of("order3", "user1", 150.0),
Tuple3.of("order4", "user3", 300.0)
));
DataStream<Tuple2<String, String>> users = env.fromCollection(java.util.Arrays.asList(
Tuple2.of("user1", "beijing"),
Tuple2.of("user2", "shanghai"),
Tuple2.of("user3", "guangzhou")
));
// 3. 给数据加上“事件时间”(这里用固定时间,因为是有界流)
WatermarkStrategy<Tuple3<String, String, Double>> orderWm = WatermarkStrategy.noWatermarks();
WatermarkStrategy<Tuple2<String, String>> userWm = WatermarkStrategy.noWatermarks();
DataStream<Tuple3<String, String, Double>> ordersWithTs = orders.assignTimestampsAndWatermarks(orderWm);
DataStream<Tuple2<String, String>> usersWithTs = users.assignTimestampsAndWatermarks(userWm);
// 4. Join:按 userId 关联订单和用户
DataStream<Tuple3<String, String, Double>> enriched = ordersWithTs
.keyBy(order -> order.f1) // userId
.intervalJoin(usersWithTs.keyBy(user -> user.f0))
.between(Time.milliseconds(-1), Time.milliseconds(1)) // 因为是静态数据,时间窗口设为0
.process(new JoinFunction<...>() {
@Override
public Tuple3<String, String, Double> join(Tuple3<String, String, Double> order, Tuple2<String, String> user) {
return Tuple3.of(user.f1, order.f1, order.f2); // (region, userId, amount)
}
});
// 5. 按 region 分组,统计总金额(这里用滚动窗口,但因为是有界流,窗口会一次性触发)
enriched
.keyBy(record -> record.f0) // region
.window(TumblingEventTimeWindows.of(Time.days(1))) // 假设所有数据在同一天
.reduce((a, b) -> Tuple3.of(a.f0, "SUM", a.f2 + b.f2)) // 累加金额
.print();
// 6. 执行
env.execute("Bounded Join Example");
}
}
二、DataStream API核心组件
1、支持的数据类型
Flink 支持任何 可序列化 的Java/Scala 类型:
| 类型 | 示例 | 要求说明 |
|---|---|---|
| 基本类型 | String, Integer, boolean[] |
自动支持 |
POJO |
自定义 Person 类 |
需满足 POJO 规范(见下文) |
Tuple |
Tuple2<String, Integer> |
Flink 内置 |
Scala Case Class |
case class Event(id: String, ts: Long) |
Scala 专用 |
2、POJO 类规范(必须全部满足):
- 类是
public且非内部类; - 有
public无参构造函数; - 所有字段要么
public非final,要么有标准getter/setter(符合 JavaBean 命名); - 字段类型也需可序列化。
三、核心算子详解
1、Source 算子(数据输入)
| 方法 | 说明 |
|---|---|
fromElements(...) |
从内存对象创建流(测试用) |
fromCollection(list) |
从 Java Collection 创建 |
readTextFile(path) |
按行读取文本文件 |
socketTextStream(host, port) |
从 Socket 实时读取(调试用) |
addSource(SourceFunction) |
接入 Kafka、Pulsar 等外部系统 |
2、Transformation 算子(数据转换)
1)无状态转换(Stateless)
| 算子 | 功能 | 示例 |
|---|---|---|
map() |
1:1 映射 | stream.map(x -> x * 2) |
flatMap() |
1:N 映射 | 分词、展开嵌套结构 |
filter() |
条件过滤 | stream.filter(x -> x > 0) |
1)flatMap 示例:文本分词
DataStream<String> lines = env.fromElements("hello world", "flink stream");
DataStream<String> words = lines.flatMap((line, out) -> {
for (String word : line.split(" ")) {
out.collect(word); // 可输出多个
}
});
// 输出: hello, world, flink, stream
2)有状态转换(Stateful)
| 算子 | 功能 |
|---|---|
keyBy(keySelector) |
按 key 重分区(类似 SQL GROUP BY),触发 网络 shuffle,是昂贵操作,应谨慎使用 |
window(...) |
在 key 分组上定义窗口 |
reduce(), sum(), max() |
聚合操作 |
keyBy + reduce 示例:每组姓名的最大年龄
people.keyBy(p -> p.name)
.max("age") // 输出滚动最大值
.print();
3、Sink 算子(数据输出)
生产环境应使用
Checkpointed Sink(如KafkaSink)保证Exactly-Once。
| 方法 | 说明 |
|---|---|
print() |
控制台输出(开发调试) |
writeAsText(path) |
写入文本文件(不推荐生产用) |
addSink(SinkFunction) |
自定义输出(如写入 MySQL、Kafka) |
四、并行与分布式执行
Flink不是一个单线程程序,而是一个分布式流处理引擎。它把一个作业拆成多个任务,并让这些任务同时运行在多台机器(或多个CPU核心)上,从而实现高吞吐、低延迟。这个“同时运行多少个任务”的数量,就叫 并行度(Parallelism)。
1、什么是 并行度(Parallelism)?
1)核心定义:
并行度 = 每个算子(
operator)被拆分成多少个“子任务”(subtask)来并行执行。
- 每个
subtask是一个独立的线程(可能在不同机器上)。 Flink作业的总并行能力 = 各算子subtask数量之和(但受限于集群资源)。
2)设置方式:
注意:source 和 sink 也可以单独设置并行度!
| 方式 | 代码 | 作用范围 |
|---|---|---|
| 全局设置 | env.setParallelism(4) |
所有算子默认使用该并行度 |
| 算子级设置 | .map(...).setParallelism(2) |
仅对该算子生效 |
2、理解并行度如何分配数据
答案:Flink 默认使用 轮询(round-robin) 分发策略(对非 keyby 流):
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2); // 全局并行度 = 2
// 创建一个包含 4 个元素的有界流
DataStream<String> source = env.fromElements("A", "B", "C", "D");
// map 算子(未显式设置,并行度=2)
DataStream<Integer> mapped = source.map(s -> {
System.out.println("Thread: " + Thread.currentThread().getName() + " processing: " + s);
return s.length();
});
mapped.print(); // sink 并行度也=2
env.execute();
| 元素 | 分配给哪个 subtask? |
|---|---|
| “A” | source.subtask[0] → map.subtask[0] |
| “B” | source.subtask[1] → map.subtask[1] |
| “C” | source.subtask[0] → map.subtask[0] |
| “D” | source.subtask[1] → map.subtask[1] |
因为
fromElements的source也会按并行度拆成 2 个 subtask!
可能输出(简化版):
1> 1 // subtask ID = 1(即第1个子任务)处理了 "A" 或 "C"
2> 1 // subtask ID = 2 处理了 "B" 或 "D"
1> 1
2> 1
输出前的
1>、2>就是subtask的ID(从 1 开始编号)。
3、数据如何在算子之间传递
当一个算子的
subtask处理完数据后,需要把结果传给下一个算子的subtask。 怎么传?有两种主要方式:
| 模式 | 特点 | 示例 |
|---|---|---|
One-to-One |
保持分区和顺序(如 source → map) |
map 算子之间 |
Rebalancing |
重新分区(如 keyBy、broadcast) |
keyBy 后数据按 key 分发 |
1)One-to-One(直连,保持分区)
-
触发条件
-
上下游算子 并行度相同
-
且 没有触发重分区操作(如
keyBy,shuffle,rebalance)
-
- 特点:
- 分区保持:数据从上游子任务 直接传给下游对应编号的子任务。
- 顺序保持:同一分区内的事件顺序不变
- 零网络开销:如果两个
subtask在同一台机器(TaskManager),数据直接内存传递 - 高性能;性能极高,因为避免了序列化和网络传输。
- 生活例子:
- 4 个快递员(
source)分别负责 A、B、C、D 区。 他们把包裹直接交给同一个区域的分拣员(map)—— A区快递员 → A区分拣员,B区 →B区分拣员…… 没人跨区送包,效率高!
- 4 个快递员(
-
代码示例:这三个算子(
source+ 2个map)会被 自动链接(Chained) 成一个task,只启动 2 个线程!env.setParallelism(2); DataStream<String> ds = env.fromElements("X", "Y"); ds.map(x -> x + "-1") // 算子1 .map(x -> x + "-2") // 算子2 → 与算子1并行度相同,无 keyBy .print();
2)Rebalancing(重新分区,如 keyBy)
-
触发条件
- 使用了
keyBy(key) - 或显式调用
shuffle(),rebalance(),broadcast()等
- 使用了
- 特点:
- 打破原有分区:数据会根据
key重新分配到下游不同子任务。 - 需要网络传输:即使在同一机器,也可能序列化+反序列化
- 保证
key聚合:相同key的数据一定去同一个下游subtask - 性能开销大:是昂贵操作,但必要(如按用户
ID聚合)。
- 打破原有分区:数据会根据
- 生活例子:现在要把所有包裹按“收件人姓名”分类。 无论哪个快递员拿到包裹,都要送到对应姓名的分拣台。 比如:
- “张三”的包裹 → 全部送到 1 号台
- “李四”的包裹 → 全部送到 2 号台 这就需要 跨区域调度,甚至用车拉(网络传输)!
代码示例:
DataStream<Person> people = env.fromElements(
new Person("Alice", 25),
new Person("Bob", 30),
new Person("Alice", 28),
new Person("Charlie", 22)
);
// 假设并行度 = 2
KeyedStream<Person, String> keyed = people.keyBy(p -> p.name);
Flink 会对每个 name 计算 哈希值 % 并行度,决定去哪个 subtask:
| name | hashCode % 2 | 分配到 subtask |
|---|---|---|
| “Alice” | 0 | subtask[0] |
| “Bob” | 1 | subtask[1] |
| “Alice” | 0 | subtask[0] |
| “Charlie” | 1 | subtask[1] |
所有 “
Alice” 都去了subtask[0],后续的max("age")就能在同一个地方计算!
输出示例:
1> Alice: age 28 // subtask[0] 输出 Alice 的最大年龄
2> Bob: age 30 // subtask[1] 输出 Bob 的最大年龄
2> Charlie: age 22 // subtask[1] 输出 Charlie
注意:
1>和2>表示 窗口聚合结果由哪个子任务输出,而不是原始数据来源。
五、时间语义与水印
1、三种时间语义
在
Apache Flink中,时间语义决定了窗口何时触发计算、事件归属哪个窗口。共有三种:
推荐:使用 Event Time:结果可复现,适合金融、审计等场景。
| 特性 | Processing Time |
Ingestion Time |
Event Time(+ Watermark) |
|---|---|---|---|
| 时间来源 | 本地机器时钟 | Source 进入时间 |
数据中的 eventTime |
| 是否可复现 | 否 | 否 | 是 |
| 能否处理乱序 | 不能 | 不能 | 能(靠 Watermark) |
| 延迟容忍 | 无(立即处理) | 无 | 可配置(如 5s) |
| 窗口触发依据 | 系统时间 | Source 时间 |
Watermark 推进 |
| 适用场景 | 监控、告警 | 少量有序流 | 金融、审计、对账、精确统计 |
| 是否推荐 | 仅限低精度场景 | 不推荐 | 强烈推荐 |
问题背景:实时统计每分钟支付成功的订单数
class Order {
String orderId;
long eventTime; // 订单实际发生的时间(毫秒时间戳)
}
数据流可能是这样的(按到达顺序):
| 到达顺序 | eventTime(格式 HH:mm:ss) | 说明 |
|---|---|---|
| 1 | 10:00:05 | 正常到达 |
| 2 | 10:00:07 | 正常到达 |
| 3 | 10:00:02 | 延迟到达(10:01:20)!实际更早发生 |
| 4 | 10:01:10 | 属于下一分钟 |
问题来了:Flink 如何知道 “10:00:00 ~ 10:00:59” 这个窗口的数据已经收齐,可以关闭并输出结果?**
1)Processing Time(处理时间)
- 窗口划分依据:Flink 各算子使用本地机器的系统时间(即事件被处理时的当前时间)来分配窗口
- 例如:当某条订单在系统时间为
10:00:05时被窗口算子处理,它就会被归入[10:00:00, 10:01:00)的窗口。
- 例如:当某条订单在系统时间为
-
举例:
-
如果当前系统时间是
10:00:05,就把这个事件分到[10:00:00, 10:01:00)窗口。 -
窗口在系统时间到达
10:01:00时触发计算。
-
-
缺点:
- 结果不可复现:即使输入数据完全相同,只要处理时机不同,输出结果就可能不同
- 情况一(处理快):三条数据都在系统时间
10:00:59前被窗口算子处理 → 全部分到[10:00, 10:01)窗口 → 输出count = 3。 - 情况二(处理慢): 前两条在
10:00:59前处理,但第三条因GC或网络卡顿,直到10:01:02才被处理 →它被分到[10:01, 10:02)窗口 →[10:00, 10:01)窗口只输出count = 2。
- 情况一(处理快):三条数据都在系统时间
- 无法处理延迟或乱序数据:迟到的
10:00:02可能被分到10:01窗口(如果它在10:01之后才被处理)!
- 结果不可复现:即使输入数据完全相同,只要处理时机不同,输出结果就可能不同
-
适用场景:
-
实时监控大屏(如“当前在线用户数”)
-
告警系统(不要求精确,只要快)
-
对准确性要求低、但对延迟敏感的场景
-
2)Ingestion Time(摄入时间)
- 实现方式:
Source算子在事件刚进入Flink时,自动打上当前系统时间作为时间戳。 -
相当于
Processing Time的“统一版本”:所有算子看到的是同一个时间(Source打的时间),而不是各自机器的时间。 -
缺点:比
Processing Time稍好,但仍无法处理源头延迟 -
订单在业务系统中
10:00生成,但10:02才发到Kafka,Flink会在10:02给它打上时间戳 → 被分到10:02的窗口。 -
丢失了事件的真实发生时间。
-
适用场景:
Ingestion Time很少被使用,因为要么用简单的Processing Time,要么直接上Event Time。-
想要一定稳定性,但又不想处理
Watermark的复杂性 -
数据源基本有序、延迟极小(如内部日志流)
-
3)Event Time(事件时间) ← 推荐!
- 窗口划分依据:用数据自带的
eventTime判断时间。 -
关键机制:
Watermark(水印线) —— 用于告诉系统:“小于等于某个时间的事件,应该都到了”。 -
优点:
-
结果完全由数据决定,与处理速度、网络延迟无关
-
可重放、可复现:无论跑多少次,只要输入数据相同,结果就相同
-
正确处理乱序和延迟事件
-
-
挑战:
-
需要理解并配置
Watermark -
需要容忍一定的延迟(不能无限等下去)
-
2、Watermark:Event Time 的“进度条”
问题:在基于 Event Time 的流处理中,数据可能乱序到达(例如:10:00:02 的事件比 10:00:07 晚到)。那么 Flink 如何知道:“10:00 这一分钟的数据已经收齐,可以关闭窗口并输出结果了?”
答案就是:Watermark(水位线),它是 Flink 判断“窗口是否可以关闭”的依据。是一个单调递增的时间戳,表示:“我确信,不会再有 eventTime ≤ W 的事件来了。
1)Watermark 的核心特性
| 特性 | 说明 |
|---|---|
| 单调递增 | Watermark 时间戳永远不会减少,即使后续事件的 eventTime 更小 |
| 周期性生成 | 默认每 200ms 生成一次(可通过 .withWatermarkInterval(Duration) 调整) |
| 滞后于最大事件时间 | 故意延迟一段缓冲时间(如 5s),以容忍乱序 |
2)允许最大延迟(Bounded Out-of-Orderness)
// 允许最多 5 秒的乱序(即:事件最多延迟 5 秒到达)
WatermarkStrategy<Order> strategy = WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.eventTime);
-
Flink会持续跟踪当前看到的 最大eventTime(maxTs)。 -
每隔一段时间(默认
200ms),生成一个Watermark:Watermark= maxTs−allowedLateness -
例如:
- 当前最大
eventTime=10:01:10 - 允许延迟 =
5秒 - 则
Watermark=10:01:05 - →
Flink认为:所有eventTime ≤ 10:01:05的事件都已到达
- 当前最大
3)触发窗口的条件:
Flink的 事件时间(Event Time)窗口 触发依赖Watermark:
- 窗口
[10:00, 10:05)只有在Watermark≥10:05时才会触发计算并关闭。 - 而
Watermark是由 数据中的事件时间 推进的。 - 如果没有新数据 → 没有新事件时间 →
Watermark停止推进 → 窗口永远不触发
举例说明:你的订单场景(带详细 Watermark 推进)假设事件按以下顺序到达(括号内为 eventTime):
配置:allowedLateness = 5s
| 到达顺序 | eventTime |
Flink 处理逻辑(假设 maxDelay=5s) |
|---|---|---|
| 1 | 10:00:05 | 暂不触发窗口;Watermark 滞后 |
| 2 | 10:00:07 | 同上 |
| 3 | 10:00:02 | 虽然迟到,但仍在 [10:00:00, 10:01:00) 窗口内,且未超 5s 延迟 → 被正确计入 |
| 4 | 10:01:10 | 触发 Watermark = 10:01:05 → 10:00 窗口关闭!输出 count=3 |
-
最终结果:
count = 3(包含10:00:02的延迟事件)无论重放多少次,只要输入数据相同,结果始终一致。 -
当
Flink看到一个eventTime = 10:01:10的事件时,它会发出Watermark=10:01:10 - 5s = 10:01:05。此时,所有eventTime ≤ 10:01:05的窗口都可以安全关闭
3、迟到事件怎么办?
即使设置了
Watermark,仍可能有事件超过容忍延迟才到达(如10:00:01的事件在10:01:10之后才到)。
a、延长窗口生命周期(allowedLateness)
- 窗口在
Watermark触发后不会立即销毁,而是继续接收迟到事件(≤ 窗口结束时间 +10s) - 每收到一个有效迟到事件,会再次触发窗口计算并输出更新结果
- 适用于需要“最终一致性”的场景(如实时报表修正)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(10)) // 窗口关闭后,再等 10 秒
.aggregate(...)
b、旁路输出(Side Output)
默认行为,迟到事件直接丢弃(无警告!)
- 超出
allowedLateness的事件被路由到独立流 - 主流只处理“准时+容忍期内迟到”的数据
- 适合审计、监控、异常分析等场景
OutputTag<Order> lateTag = new OutputTag<Order>("late-orders") {};
SingleOutputStreamOperator<Long> result = stream
.window(...)
.sideOutputLateData(lateTag)
.aggregate(...);
// 单独处理迟到数据
result.getSideOutput(lateTag).addSink(...); // 如写入告警日志
3、代码示例:每分钟统计订单数(Event Time + Watermark)
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
public class OrderCountWithEventTime {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Flink 1.12+ 默认使用 Event Time,但显式设置更清晰
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000); // 开启 checkpoint,保障容错
// 模拟订单数据源
DataStream<Order> orders = env.addSource(new OrderSourceFunction());
// 定义 Watermark 策略:允许 5 秒乱序
WatermarkStrategy<Order> strategy = WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((order, ts) -> order.eventTime)
.withWatermarkInterval(Duration.ofMillis(200)); // 可选:调整生成频率
DataStream<Order> withWatermarks = orders.assignTimestampsAndWatermarks(strategy);
// 定义迟到数据的 Side Output
OutputTag<Order> lateOrdersTag = new OutputTag<>("late-orders"){};
// 每分钟滚动窗口统计
withWatermarks
.keyBy(order -> "global") // 全局统计(不分组)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(10)) // 允许额外 10 秒迟到
.sideOutputLateData(lateOrdersTag)
.aggregate(new CountAggregate()) // 自定义聚合函数
.print("On-time Result");
// 输出超时迟到事件
withWatermarks
.getSideOutput(lateOrdersTag)
.print("Late Order");
env.execute("Real-time Order Count per Minute (Event Time)");
}
}
4、FQA
1)延长周期和最大延迟配置一个行吗?
答案:两者作用完全不同,不能互相替代。
Watermark延迟(X)- 我等 最多 X 秒,就认为该分钟的数据基本齐了,先算一次!
- 只调大
Watermark延迟:结果更“完整”,但延迟高、可能卡住、无法更新
Allowed Lateness(Y)- 算完后,我还再等 Y 秒,万一有漏网之鱼,我就更新结果。
Watermark+allowedLateness:快速出初值 + 后续修正,兼顾时效性与准确性
| 特性 | forBoundedOutOfOrderness(X) |
.allowedLateness(Y) |
|---|---|---|
| 控制什么? | 何时触发窗口 | 窗口关闭后还能接收多久迟到数据 |
| 影响什么? | 决定“第一次输出结果的时间点” | 决定“是否允许多次更新结果” |
| 作用对象 | Watermark 生成逻辑 |
窗口生命周期 |
| 影响触发时间 | 是(Watermark = maxTs - X) | 否(窗口在 Watermark ≥ end 时立即触发) |
| 是否支持多次输出 | 否(窗口触发即关闭) | 是(每次有效迟到都重新计算) |
| 处理超 X 的迟到事件 | 不能(除非 X 足够大) | 能(只要在 Y 范围内) |
| 空闲流风险 | 可能卡住(无新事件 → Watermark 不推进) |
同样依赖 Watermark,但可配合 withIdleness() 缓解 |
2)延长周期和最大延迟配置建议
1)参数建议:
X(Watermark延迟)设小一点:- 目标:低延迟、快速出初值
- 比如
95%的数据在 5 秒内到达 → 设X = 5s
Y(allowedLateness)设大一点- 目标:捕获迟到数据,修正结果
- 覆盖极端延迟(如最长可能延迟 15 秒)→ 设
Y = 10~15s
2)两者配合使用:
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
// ...
.window(...).allowedLateness(Time.seconds(10))
3)防卡死:全局性兜底场景,对可能空闲的流,加上空闲检测:
Source级空闲,不是 用户(key)级空闲
.withIdleness(Duration.ofSeconds(30)) // 30秒无数据,Watermark 自动推进
六、窗口(Window)计算
1、为什么需要窗口?
无界流无法直接聚合(如“总和”),需划分为有限片段(窗口)进行计算。例如:
- “用户总点击数” → 随时间无限增长,永远算不完。
- “过去一小时的错误率” → 需要限定时间范围。
解决方案:窗口(Window):将无界流切分成有限大小的片段(buckets),在每个片段内进行聚合。
1)窗口原理
数据源 → WatermarkStrategy → WindowAssigner → Trigger → Evictor(可选) → WindowProcessFunction
- 数据进入系统;
- 根据事件时间生成 Watermark,保证时间语义;
- 将数据分配到对应的时间窗口;
- 当触发条件满足时,触发窗口计算;
- (可选)在触发前清理一些数据;
- 对窗口内数据进行最终处理并输出结果。
| 组件 | 作用 | 是否必要 | 主要职责 |
|---|---|---|---|
| 数据源 | 必须 | 提供输入数据 | |
WatermarkStrategy |
必须(若用 Event Time) |
处理乱序,生成水印 | |
WindowAssigner |
必须 | 分配数据到窗口(哪种窗口类型) | |
Trigger |
必须 | 决定何时触发窗口 | |
Evictor |
可选 | 清理窗口中部分数据 | |
WindowProcessFunction |
必须 | 执行窗口逻辑 |
- 如果你使用的是
sum()、count()等内置聚合函数,底层其实也封装了AggregateFunction和ProcessWindowFunction。 - 使用
EventTime时,Watermark是关键;否则用ProcessingTime则无需Watermark
a、WindowAssigner
窗口分配器,
WindowAssigner是一个抽象类,用于为每个输入元素分配一个或多个窗口(使用滑动窗口时,一个输入元素会属于多个窗口)。
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
/**
* 为每一个输入元素分配窗口.
*
* @param element 输入元素
* @param timestamp 元素的时间戳。如果没有通过assignTimestampsAndWatermarks方法分配时间戳,则为Long.MIN_VALUE
* @param context 窗口上下文
* @return 窗口集合
*/
public abstract Collection<W> assignWindows(T element, long timestamp,
WindowAssigner.WindowAssignerContext context);
/**
* 获取窗口的默认触发器.
*
* @param env 流模式环境
* @return 触发器
*/
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
}
b、Trigger
触发器,窗口何时触发计算。用于为每个输入元素分配完窗口后,需要指定窗口的触发逻辑,常见的有通过Timer定时触发和计数达到指定条数触发。
// 触发器枚举
public enum TriggerResult {
// 不做任何处理
CONTINUE(false, false),
// 触发窗口计算,并清空窗口内的数据
FIRE_AND_PURGE(true, true),
// 触发窗口计算,不清空窗口内的数据
FIRE(true, false),
// 清空窗口内的数据
PURGE(false, true);
// 是否触发计算
private final boolean fire;
// 是否清空窗口
private final boolean purge;
}
public abstract class Trigger<T, W extends Window> implements Serializable {
// 当接收到一条输入元素时,判断触发器的结果
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx); throws Exception;
// 到达处理时间Timer注册的时间,会判断触发器的结果
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
// 到达事件时间Timer注册的时间,会判断触发器的结果
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
}
2)窗口分类
不同窗口类型,主要区别在于
WindowAssigner和Trigger,较为常用的主要有两类:时间窗口和计数窗口。
| 维度 | 时间窗口 | 计数窗口 |
|---|---|---|
| 划分依据 | 基于时间划分窗口 | 基于元素数量划分窗口 |
| 窗口对象 | TimeWindow |
GlobalWindow |
| 是否需要 Watermark | 事件时间窗口需要 | 不需要 |
| 触发机制 | 时间到达(+ Watermark) | 条数达到阈值 |
| 状态管理 | 每个窗口独立状态 | 全局单一窗口状态 |
| 适用性 | 实时流处理主流选择 | 小批量、固定条数场景 |
| 适用场景 | 日志分析、实时监控、按小时/分钟聚合等 | 批处理式统计、固定条数触发计算 |
a、计数窗口
- 计数窗口不区分事件时间 or 处理时间,因为它不依赖时间。
- 由于所有数据在一个窗口,状态会不断增长,需确保及时触发清理。
- 不适合高吞吐长期运行场景(可能
OOM),建议配合evictor或使用reduce/aggregate减少状态。
b、时间窗口
- 每个
TimeWindow有明确的[start, end)区间。 maxTimestamp()返回end - 1,因为 Flink 的窗口是左闭右开区间。
public class TimeWindow extends Window {
private final long start;
private final long end;
public long maxTimestamp() {
return end - 1L; // 窗口内允许的最大事件时间戳
}
}
| 窗口类型 | 是否固定大小 | 是否重叠 | 划分依据 |
|---|---|---|---|
TumblingWindow |
是 | 否 | 固定时间长度 |
SlidingWindow |
是 | 是 | 固定长度 + 滑动步长 |
SessionWindow |
否 | 否 | 事件间隔(gap) |
// 滚动事件时间窗口:每 5 分钟统计一次
stream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("value");
// 滑动事件时间窗口:每 1 分钟统计最近 5 分钟的数据
stream
.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.sum("value");
3、计数窗口
1)本质:GlobalWindow + CountTrigger
Flink 没有独立的 CountWindow 类,而是通过以下组合实现:
WindowAssigner:GlobalWindows(所有元素分配到同一个全局窗口)Trigger:CountTrigger(当窗口内元素数量达到阈值时触发计算)
stream.countWindow(100);
// 实际是:window(GlobalWindows.create()).trigger(CountTrigger.of(100))
a、GlobalWindow 特点
- 所有元素都被分配到同一个全局窗口(理论上无限大)。
- 必须自定义
Trigger,否则永远不会触发(因为没有自然结束时间)。
b、CountTrigger 行为
| 方法 | 行为 |
|---|---|
onElement |
每来一个元素,计数器 +1;达到阈值则触发清空并重置 |
onProcessingTime / onEventTime |
不做任何事(跳过) |
c、使用场景:
计数窗口适用于 “按固定条数触发计算” 的场景,尤其适合:
| 场景 | 说明 |
|---|---|
| 批量处理式实时计算 | 每收集 1000 条日志就做一次聚合,而不是等时间到了 |
| 样本采样或批处理模拟 | 每 N 条数据作为一个批次进行模型推理或写入 |
| 低频数据流的及时响应 | 数据来得慢,用时间窗口会很久不触发,改用计数可更快响应 |
| 测试/调试 | 快速验证逻辑,无需等待时间窗口结束 |
d、示例:每 3 条订单就统计一次总金额
keyBy().window()→ 每个 key 独立计数(如每个用户每 3 条触发)windowAll()→ 全局统一计数(所有用户合起来每 3 条触发)
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
public class CountWindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟订单流:(orderId, amount)
DataStream<Order> orderStream = env.fromElements(
new Order("1", 10.0),
new Order("2", 20.0),
new Order("3", 30.0), // 触发第1次计算(3条)
new Order("4", 5.0),
new Order("5", 15.0),
new Order("6", 25.0), // 触发第2次计算(又3条)
new Order("7", 40.0)
);
// 使用计数窗口:每3条触发一次
orderStream
.keyBy(order -> "global") // 如果不需要 keyBy,也可用 allWindow(见下方说明)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(3))
.sum("amount")
.print(); // 输出:60.0, 45.0, ...
env.execute("Count Window Example");
}
public static class Order {
public String id;
public double amount;
public Order(String id, double amount) {
this.id = id;
this.amount = amount;
}
}
}
4)为什么状态会无限增长
a、默认行为:Flink 会缓存窗口内所有元素
.window(GlobalWindows.create())
.trigger(CountTrigger.of(100))
.process(new ProcessWindowFunction<...>())
Flink 会在状态后端中 完整保存这 100 条原始数据,直到窗口触发计算。
b、问题在于:触发 ≠ 清理
原因:Flink 无法确定你是否还需要这些数据(比如用于后续合并、回溯等),所以保守地保留。
ProcessWindowFunction执行完后,Flink默认不会自动清除这些已处理的数据!- 下一批 100 条到来时,状态中可能还留着上一批的 100 条 → 状态翻倍!
- 长期运行 →
OOM(内存溢出)或Checkpoint失败
5)解决状态无限增长
a、优先使用增量聚合函数(Reduce / Aggregate)
强烈建议:只要能用
reduce或aggregate实现逻辑,就不要用ProcessWindowFunction。
这是 最高效、最安全 的方式,从根本上避免缓存原始数据。
示例:用 reduce() 替代 process()
stream
.keyBy(...)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(100))
.reduce((value1, value2) -> new Order(value1.amount + value2.amount))
.print();
优点:
- 状态只保存 一个中间结果(如
sum、count、avg),不是 100 条原始数据 - 每来一条就更新状态,触发时直接输出,无需额外清理
- 内存占用恒定(
O(1))
b:使用 Evictor 主动清理已处理数据
如果你必须用
ProcessWindowFunction(比如需要访问窗口元信息、做复杂逻辑),可以配合Evictor在触发前清空数据。
stream
.keyBy(...)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(100)) // 触发前移除最近100条
.process(new ProcessWindowFunction<...>() { ... });
注意:CountEvictor.of(100) 表示“移除最近 100 条”,刚好匹配窗口大小。
工作流程:
- 第 100 条数据到达 → 触发器决定触发
Evictor先执行:从窗口中移除最近的 100 条数据(通常是全部)ProcessWindowFunction处理 剩余数据(此时为空或部分)- 状态被清空
缺点:
- 仍需先缓存
100条数据 → 内存峰值存在 - 如果
Evictor和Trigger不匹配,可能导致逻辑错误
c、最佳实践总结
| 场景 | 推荐方案 |
|---|---|
能用 sum/count/max 等简单聚合 |
➤ 用 .sum()、.reduce()、.aggregate() |
| 需要复杂逻辑,但可增量计算 | ➤ 自定义 AggregateFunction |
必须用 ProcessWindowFunction |
➤ 配合 CountEvictor 清理数据 |
| 高吞吐、长期运行 | ➤ 绝对避免 缓存原始数据 |
// 安全:状态恒定,无增长风险
stream
.keyBy("userId")
.window(GlobalWindows.create())
.trigger(CountTrigger.of(1000))
.reduce(
(order1, order2) -> new Order(order1.userId, order1.amount + order2.amount),
(key, window, reduced, out) -> out.collect(reduced)
)
.print();
2、时间窗口
- 滚动窗口认为:“世界是周期性的”,每 N 秒就该结算一次,不管数据是否连续。
- 滑动窗口认为:“我们需要更细腻的观察”,每隔一小段时间,就回看最近一段历史。
- 会话窗口则说:“别预设节奏,让数据自己说话”——只有当行为中断足够久,才视为一个会话结束
| 类型 | 特点 | 适用场景 | 是否重叠 |
|---|---|---|---|
Tumbling Window(滚动窗口) |
固定大小、不重叠 | 每分钟统计订单量 | 否 |
Sliding Window(滑动窗口) |
固定大小、可重叠 | 每10秒统计最近1分钟活跃用户 | 是 |
Session Window(会话窗口) |
基于事件间隔动态划分 | 用户行为会话分析 | 否(但长度可变) |
1)三种类型
a、Tumbling Window(滚动窗口)
- 固定时间长度(如 1 分钟)
- 窗口之间无缝衔接,无重叠
- 每个事件只属于一个窗口
[00:00–00:01) [00:01–00:02) [00:02–00:03) ...
典型场景:
- 每分钟支付成功订单数
- 每小时日志错误计数
TPS、TP999
b、Sliding Window(滑动窗口)
- 窗口长度(
size) 与 滑动步长(slide) 可不同 - 若
slide < size→ 窗口重叠 - 每个事件可能属于多个窗口
窗口长度=1分钟,滑动步长=10秒:
[00:00–00:01) → 输出 at 00:00:10
[00:00:10–00:01:10) → 输出 at 00:00:20
[00:00:20–00:01:20) → 输出 at 00:00:30
...
典型场景:
- 实时监控:每
10秒更新“过去1分钟CPU使用率” - 风控:每
5秒检查“最近30秒异常登录次数”
c、Session Window(会话窗口)
- 无固定边界,由事件之间的空闲时间(
gap) 决定- 开始时间 = 该会话中第一个事件的时间
- 结束时间 = 该会话中最后一个事件的时间 + gap
- 如果两个事件间隔 >
gap→ 切分为两个会话 - 窗口长度动态变化
事件时间:10:00, 10:02, 10:03, 10:08, 10:15
gap = 5分钟 →
会话1: [10:00 – 10:08] (因为 10:03 到 10:08 间隔=5min,刚好断开)
会话2: [10:15 – ...]
2)窗口函数
窗口只是“划定了范围”,真正要回答的是:如何高效、准确地从这个范围里算出结果?
| 模式 | 计算方式 | 状态存储 | 功能能力 |
|---|---|---|---|
增量聚合(ReduceFunction / AggregateFunction) |
来一条算一条,只存中间结果 | 极小(O(1)) | 只能做可增量聚合的操作(如 sum、count、avg) |
全量处理(ProcessWindowFunction) |
等窗口结束,一次性拿到所有数据 | 很大(O(N)) | 任意逻辑(排序、取 TopK、机器学习) |
| 增量 + 全量(组合使用) | 先增量聚合,再把聚合结果交给 ProcessWindowFunction |
小(O(1)) | 在高效基础上增加上下文(如加窗口信息、调用外部服务) |
a、增量聚合:ReduceFunction 或 AggregateFunction
- 原理:每来一个元素,就和当前聚合状态合并,永远只保留一个中间值。
- 优点:内存占用极低,适合超大窗口或高吞吐流。
- 限制:只能用于满足结合律和交换律的操作(如 sum、min、max、count、average)。
典型场景:计算每分钟总订单金额
stream
.keyBy("userId")
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce((order1, order2) -> new Order(order1.amount + order2.amount))
.print();
-- 等价于
.reduce((ReduceFunction<Order>) (order1, order2) ->
new Order(order1.amount + order2.amount)
)
b、全量处理:ProcessWindowFunction
- 原理:窗口关闭时,把所有缓存的元素以
Iterable<T>形式传给你,你可以任意处理。 - 优点:功能最强大,可做排序、过滤、复杂逻辑。
- 缺点:必须把整个窗口的数据存在状态后端,窗口越大越容易
OOM。 - 警告:如果
30秒内有100万条请求,这段代码会尝试把 100 万个Long加载到内存!
典型场景:计算 TP999、输出窗口内所有异常日志
stream
.keyBy("service")
.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
.process(new ProcessWindowFunction<LatencyEvent, String, String, TimeWindow>() {
@Override
public void process(String key, Context ctx, Iterable<LatencyEvent> elements, Collector<String> out) {
List<Long> latencies = new ArrayList<>();
for (LatencyEvent e : elements) {
latencies.add(e.latencyMs);
}
Collections.sort(latencies);
int idx = (int) (0.999 * latencies.size());
long tp999 = latencies.get(idx);
out.collect("Service=" + key + ", TP999=" + tp999 + "ms");
}
});
c、组合模式:增量预聚合 + ProcessWindowFunction 后处理
- 原理:先用
AggregateFunction做高效增量聚合,窗口触发时,把聚合结果(而非原始数据)传给ProcessWindowFunction。 - 优点:既享受增量聚合的低开销,又能访问窗口元信息(如窗口开始/结束时间)、调用外部系统、格式化输出。
- 这是生产中最推荐的“黄金组合”!
典型场景:计算每分钟平均延迟,并附带窗口时间戳写入数据库
- 状态只存
(sum, count),不是百万条原始数据 - 输出包含窗口时间,便于下游对齐
- 可在
enricher中调用HTTP API、写Kafka等
// 1. 定义增量聚合:计算 (sum, count)
AggregateFunction<LatencyEvent, Tuple2<Long, Integer>, Double> avgAgg = new AggregateFunction<...>() {
public Tuple2<Long, Integer> createAccumulator() { return Tuple2.of(0L, 0); }
public Tuple2<Long, Integer> add(LatencyEvent e, Tuple2<Long, Integer> acc) {
return Tuple2.of(acc.f0 + e.latencyMs, acc.f1 + 1);
}
public Double getResult(Tuple2<Long, Integer> acc) {
return acc.f1 == 0 ? 0.0 : (double) acc.f0 / acc.f1;
}
public Tuple2<Long, Integer> merge(...) { ... }
};
// 2. 定义后处理:加上窗口信息
ProcessWindowFunction<Double, MetricRecord, String, TimeWindow> enricher = new ProcessWindowFunction<...>() {
public void process(String key, Context ctx, Iterable<Double> averages, Collector<MetricRecord> out) {
Double avg = averages.iterator().next(); // 只有一个值
TimeWindow w = ctx.window();
out.collect(new MetricRecord(key, avg, w.getStart(), w.getEnd()));
}
};
// 3. 组合使用
stream
.keyBy("service")
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.aggregate(avgAgg, enricher); // ← 关键:两个参数!
d、选择窗口函数处理方式的决策树
- 你的聚合操作是否可增量?(
sum/count/avg/min/max→ 是;TP99/topN→ 否)- 是 → 优先用
ReduceFunction/AggregateFunction - 否 → 进入第 2 步
- 是 → 优先用
- 窗口内数据量是否可控?(< 10万条?)
- 是 → 用
ProcessWindowFunction - 否 → 改用近似算法(T
-Digest,HyperLogLog)封装成AggregateFunction
- 是 → 用
- 是否需要窗口元信息或复杂输出?
- 是 → 用 组合模式(
aggregate(AggFunc, ProcessFunc))
- 是 → 用 组合模式(
3)窗口类型选择建议
a、滚动窗口
核心特点:需要按固定时间单位做统计、汇总、计费、告警的场景
- 电商大促实时看板
- 需求:每分钟展示成交订单数和 GMV(成交总额)
- 实现:
window(TumblingEventTimeWindows.of(Time.minutes(1))) - 为什么合适?大促节奏以分钟为单位对齐运营动作,不需要跨分钟关联。
API网关监控 TPS- 需求:每秒统计 API 调用量,用于限流或容量评估
- 实现:
window(TumblingProcessingTimeWindows.of(Time.seconds(1))).count() - 为什么合适?
TPS本身就是“每秒”的定义,天然匹配滚动窗口。
- 日志平台错误率日报
- 需求:每天凌晨生成前一天的系统错误日志占比报告
- 实现:
window(TumblingEventTimeWindows.of(Time.days(1))) - 为什么合适?对账、计费、合规类任务要求严格按自然日划分,不能重叠或遗漏。
b、滑动窗口
核心特点:窗口重叠、高频更新、平滑趋势
- 风控系统中的短时异常检测
- 需求:用户在最近
5分钟内登录失败超过 3 次,就触发二次验证 - 实现:滑动窗口(
5分钟窗口,每 30 秒检查一次) - 为什么合适?攻击可能发生在任意时刻,必须高频扫描最近行为,不能等到整点才判断。
- 需求:用户在最近
IoT设备心跳健康度- 需求:每 1 分钟检查过去 10 分钟内设备是否上报过心跳,否则标记为离线
- 实现:
window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1))) - 为什么合适?设备可能在任意时间断连,需持续监控最近 10 分钟状态,而非只看整点区间。
c、会话窗口
核心特点:由数据驱动、窗口长度不固定、基于静默期分割,分析用户/设备的一次完整“活动周期”或“交互会话”**
- 用户行为分析(
App使用会话)- 需求:统计用户单次使用 App 的时长、页面浏览数、点击次数
- 实现:
window(EventTimeSessionWindows.withGap(Time.minutes(10))) - 为什么合适?用户可能打开
App后操作 3 分钟,退出;1 小时后再回来——这是两次独立会话,不能合并。
- 游戏对局行为聚合
- 需求:分析玩家在一局
MOBA游戏中的技能释放频率、死亡次数 - 实现:以“开始匹配”为起点,“结算界面”为终点,中间事件间隔通常 < 5 分钟 → gap=5min
- 为什么合适?一局游戏是天然的行为单元,gap 能自动区分不同对局。
- 需求:分析玩家在一局
- 智能硬件在线会话统计
- 需求:计算扫地机器人每次清扫任务的运行时长和电量消耗
- 实现:设备启动后持续上报状态,任务结束停止上报 → gap=2 分钟可分割任务
- 为什么合适?任务时长不确定(20~60 分钟),无法用固定窗口;会话窗口自动适配实际行为。
4)案例:每分钟统计每个用户的点击次数
window()必须在keyBy()之后调用!因为窗口是按key独立维护的(每个用户有自己的窗口)。
stream
.keyBy(...) // 1. 按 key 分组(必须!)
.window(...) // 2. 定义窗口类型
.allowedLateness(...) // 3. (可选)允许迟到数据
.reduce/process/fold/apply // 4. 聚合函数
a、定义事件类
public static class ClickEvent {
public String userId;
public long eventTime; // 毫秒时间戳
public ClickEvent(String userId, long eventTime) {
this.userId = userId;
this.eventTime = eventTime;
}
@Override
public String toString() {
return userId + " clicked at " + eventTime;
}
}
b、生成带 Watermark 的流
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000);
// 模拟乱序点击流
DataStream<ClickEvent> clicks = env.fromCollection(Arrays.asList(
new ClickEvent("user1", 1700000000000L), // 2023-11-14 12:26:40
new ClickEvent("user1", 1700000000005L),
new ClickEvent("user2", 1700000000010L),
new ClickEvent("user1", 1700000000002L) // 迟到事件(早于上一条)
));
// 分配时间戳 + 生成 Watermark(允许5秒乱序)
WatermarkStrategy<ClickEvent> strategy = WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.eventTime);
DataStream<ClickEvent> withWm = clicks.assignTimestampsAndWatermarks(strategy);
c、定义滚动窗口并聚合
// 每1分钟窗口,统计每个用户的点击次数
DataStream<String> result = withWm
.keyBy(event -> event.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new CountAggregate(), new FormatResult());
result.print();
env.execute("User Click Count per Minute");
d、实现聚合函数
// 累加器:只存计数
public static class CountAggregate implements AggregateFunction<ClickEvent, Long, Long> {
@Override
public Long createAccumulator() { return 0L; }
@Override
public Long add(ClickEvent event, Long acc) { return acc + 1; }
@Override
public Long getResult(Long acc) { return acc; }
@Override
public Long merge(Long a, Long b) { return a + b; }
}
// 输出格式化
public static class FormatResult implements WindowFunction<Long, String, String, TimeWindow> {
@Override
public void apply(String userId, TimeWindow window, Iterable<Long> counts, Collector<String> out) {
long count = counts.iterator().next();
String start = new Timestamp(window.getStart()).toString();
String end = new Timestamp(window.getEnd()).toString();
out.collect(userId + " clicked " + count + " times in [" + start + ", " + end + ")");
}
}
e、处理迟到事件
问题:即使有 Watermark,仍可能有事件在窗口关闭后才到达(如网络极端延迟)。
1)allowedLateness(duration)
- 窗口关闭后,继续等待指定时间(如 10 秒)
- 期间到达的迟到事件仍参与计算,并触发窗口再次输出
- 适合少量迟到且需更新结果的场景
2)sideOutputLateData(tag)
- 将超时迟到事件输出到旁路流(side output)
- 主流不受影响,旁路流可单独处理(如写入告警日志)
OutputTag<ClickEvent> lateTag = new OutputTag<ClickEvent>("late-clicks") {};
SingleOutputStreamOperator<String> mainStream = withWm
.keyBy(e -> e.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(10)) // 允许额外等10秒
.sideOutputLateData(lateTag) // 超过10秒的走旁路
.aggregate(new CountAggregate(), new FormatResult());
// 获取迟到流
DataStream<ClickEvent> lateStream = mainStream.getSideOutput(lateTag);
lateStream.print("LATE"); // 输出前缀 "LATE> ..."
mainStream.print("MAIN");
执行流程:
- Watermark 到达 12:01:00 → 触发 [12:00, 12:01) 窗口计算(第一次输出)
- 12:01:05 到达一个迟到事件(eventTime=12:00:50)→ 重新计算并输出(第二次)
- 12:01:15 到达另一个迟到事件(eventTime=12:00:40)→ 因超过 allowedLateness,进入
lateStream
七、状态管理与容错
1、为什么需要状态
map、filter是无状态的:输出只依赖当前输入。- 但很多场景需要记住历史:
- “用户累计点击数”
- “过去5分钟最大温度”
- “去重:该
ID是否已处理过?”
- 解决方案:有状态计算,
Flink允许算子在内存/磁盘中保存中间数据,并在故障时自动恢复
2、状态类型与原理
Flink 提供三种原生状态接口(均支持 Checkpoint 自动持久化):
| 状态类型 | 存储结构 | 适用场景 | 性能特点 |
|---|---|---|---|
ValueState |
单值 | 最新值存储 | 高性能,低内存 |
ListState |
列表 | 事件历史记录 | 中等性能,中等内存 |
MapState |
Map |
嵌套映射 | 高性能,RocksDB优化 |
ReducingState |
单值(聚合) | 增量聚合 | 高性能,低内存 |
AggregatingState |
单值(聚合) | 复杂聚合 | 高性能,需自定义聚合 |
3、Rich Functions:访问状态的入口
普通函数(如
MapFunction)无法访问状态。
必须使用 Rich Function(如 RichMapFunction),它提供:
open():初始化状态getRuntimeContext().getState(...):获取状态句柄close():清理资源
案例:统计每个单词出现次数(带状态)
public static class WordCountFunction extends RichMapFunction<String, Tuple2<String, Long>> {
private ValueState<Long> countState;
@Override
public void open(Configuration parameters) {
// 定义状态描述符:名称 + 类型
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("word-count", Long.class);
countState = getRuntimeContext().getState(descriptor);
}
@Override
public Tuple2<String, Long> map(String word) throws Exception {
Long current = countState.value();
if (current == null) current = 0L;
current += 1;
countState.update(current); // 更新状态
return Tuple2.of(word, current);
}
}
// 使用
stream.keyBy(word -> word)
.map(new WordCountFunction())
.print();
4、状态 TTL:自动清理过期状态
问题:
- 状态会无限增长(如百万用户,每人一个计数器)
- 长期不用的状态浪费内存
解决方案:状态 TTL:自动为状态设置过期时间,到期后自动删除或隐藏。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(1)) // 过期时间 = 1天
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 何时更新TTL?
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 过期后是否可见?
.cleanupInRocksDBCompactFilter(1000) // RocksDB后台清理(可选)
.build();
参数详解:
| 配置项 | 选项 | 说明 |
|---|---|---|
UpdateType |
OnCreateAndWrite |
创建或写入时重置TTL(推荐) |
OnReadAndWrite |
读取也会延长TTL(慎用!) |
|
StateVisibility |
NeverReturnExpired |
过期后返回 null(安全) |
ReturnExpiredIfNotCleanedUp |
可能返回过期值(性能高) |
案例:带 TTL 的用户活跃状态
- 2小时内无新事件的用户,状态自动过期,不再占用内存。
public static class ActiveUserFunction extends RichFlatMapFunction<String, String> {
private ValueState<Boolean> activeState;
@Override
public void open(Configuration conf) {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(2))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("active-flag", Boolean.class);
descriptor.enableTimeToLive(ttlConfig);
activeState = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(String userId, Collector<String> out) {
activeState.update(true); // 标记用户活跃,并重置TTL
out.collect(userId + " is active now");
}
}
5、状态与容错:Checkpoint 机制
-
Flink定期将状态快照(Snapshot)写入持久化存储(如 HDFS、S3) -
故障时从最近
Checkpoint恢复整个作业状态 -
保证
Exactly-Once语义 开启Checkpoint:env.enableCheckpointing(5000); // 每5秒一次
6、窗口和状态
窗口是“短期记忆”,状态是“长期记忆”。
- 窗口 = 固定时间/数量范围内的临时聚合
- 状态 = 跨越任意时间、自定义生命周期的持久记忆
1)核心区别:生命周期与控制粒度
总结对比
- 简单聚合 → 优先用 窗口
- 复杂逻辑(如自定义触发条件)→ 用
ProcessFunction+ 状态
| 功能 | 窗口(Window) |
状态(State) |
|---|---|---|
| 何时创建 | 数据进入窗口时自动创建 | 你手动在 open() 中定义 |
| 生命周期 | 窗口结束 + allowedLateness 超时后自动销毁 |
永不销毁(除非你设 TTL 或作业停止) |
| 你能控制吗? | 不能 — Flink 全权管理 |
能 — 你决定存什么、何时更新、何时清空 |
| 跨窗口共享? | 不能 — 每个窗口独立 | 能 — 状态贯穿整个 key 的生命周期 |
| 目的 | 限定聚合范围 | 记住历史信息 |
| API 层级 | 高层(声明式) | 底层(过程式) |
| 典型场景 | 每分钟统计 | 累计计数、去重 |
| 容错 | 通过 Checkpoint 自动恢复 |
通过 Checkpoint 自动恢复 |
2)为什么窗口不够用
a、场景 1:累计点击数(从用户注册到现在)
-
需求:每个用户从首次使用
App到现在的总点击次数。 -
能用窗口吗?不能!因为:
- 窗口是“分段”的(如每分钟),无法跨窗口累加。
- 即使你用超大窗口(如 10 年),也无法处理“未来新数据”。
-
正确做法:用
ValueState<Long>// 每次点击都读取旧值 +1,再写回 Long total = state.value() == null ? 0L : state.value(); state.update(total + 1);状态会一直保留,第 1 天、第 100 天的数据都能累加!
b、场景 2:去重(全局唯一 ID)
-
需求:过滤掉已经处理过的订单 ID(防止重复支付)。
-
能用窗口吗?不能!因为:
- 重复订单可能相隔几天、几个月。
- 窗口只能记住“最近 1 小时”的 ID,过期就忘了。
-
正确做法:用
MapState<String, Boolean>或ValueState+ 布隆过滤器if (seenState.value(orderId) == null) { seenState.update(orderId, true); // 标记为已见 output.collect(order); } // 否则丢弃(重复)只要状态没过期(或设 TTL=30天),就能防重!
c、场景 3:自定义触发逻辑(非固定时间)
-
需求:当某个传感器温度连续 5 次超过 100°C 才告警。
-
能用窗口吗?很难!因为:
- 不是按时间,而是按“事件次数”触发。
- 滑动窗口无法精确控制“连续 5 次”。
-
正确做法:用
ListState<Double>记录最近 5 个值List<Double> history = new ArrayList<>(listState.get()); history.add(currentTemp); if (history.size() > 5) history.remove(0); listState.update(history); if (history.stream().allMatch(t -> t > 100)) { alert("Overheat!"); }完全由你控制逻辑,不受窗口限制!
d、场景 4:会话超时但需保留历史画像
- 需求:用户会话结束后,仍要保留其“偏好标签”用于下次推荐。
- 窗口能做到吗?不能!Session Window 结束后,内部状态立即销毁。
State能做到吗?能!你可以:- 在会话窗口中计算标签 → 写入全局
MapState - 下次用户回来,直接读取历史标签
- 在会话窗口中计算标签 → 写入全局
3)什么时候用窗口?什么时候用 State
| 需求特征 | 推荐方案 |
|---|---|
| 固定时间/数量范围内的聚合 (如“每分钟错误数”) | 窗口(Tumbling/Sliding) |
| 需要跨窗口、长期累积 (如“用户总消费金额”) | Keyed State(状态属于哪个用户/设备/订单) |
| 去重、缓存、历史记录 | State(配合 TTL) |
| 复杂事件序列检测 (如“登录失败3次后成功”) | ProcessFunction + State |
| 简单统计 + 高性能 | 窗口(底层也用 State,但封装好了) |
八、事件驱动的应用
事件驱动应用 = 系统行为由“外部事件”触发,而非定时轮询或批处理。
Flink的ProcessFunction正是构建高性能事件驱动系统的基石。**
- 实时响应:事件一到,立刻处理
- 状态感知:能记住历史(如用户是否登录过)
- 时间敏感:能处理“延迟事件”或“超时未发生”
- 高吞吐低延迟:适合流式场景
1、ProcessFunction 是什么?—— 核心能力拆解
ProcessFunction 是 Flink 提供的一个底层、灵活、有状态的流处理函数,它让你:
- 逐条处理事件 →
processElement() - 在未来的某个时间点自动执行逻辑 →
onTimer()+TimerService - 记住历史信息 → 访问
Keyed State - 区分事件时间
vs处理时间 → 支持两种Timer
它实现了
RichFunction接口,因此拥有:
open()/close()生命周期getRuntimeContext()→ 获取状态、并行度等
stream
.keyBy(...)
.window(...)
.allowedLateness(Time.seconds(10))
.process(...);
1)核心方法详解
a、processElement
- 调用时机:每来一条输入事件,就调用一次
- 参数说明:
value:当前事件ctx:上下文对象(可获取时间戳、key、TimerService)out:输出结果(可多次调用)
@Override
public void processElement(LoginEvent event, Context ctx, Collector<String> out) {
long eventTime = ctx.timestamp(); // 事件时间戳
String userId = ctx.getCurrentKey(); // 当前 key(需 keyBy 后才有)
TimerService timer = ctx.timerService(); // 获取定时器服务
// 注册一个 10 分钟后的事件时间定时器
timer.registerEventTimeTimer(eventTime + 10 * 60 * 1000);
}
b、 onTimer
- 调用时机:当注册的 Timer 到期时自动触发
- 用途:
- 超时检测(如“15分钟未支付”)
- 延迟计算(如“等待所有数据到齐再输出”)
- 状态清理(如“会话结束清空缓存”)
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
String userId = ctx.getCurrentKey();
out.collect("⏰ Timeout for user: " + userId);
// 可在此清理状态
myState.clear();
}
2)两种时间语义的 Timer
Flink 支持两种时间模型,
ProcessFunction都能处理:
推荐:优先使用事件时间 Timer,保证结果可重现(Exactly-Once)。
| Timer 类型 | 注册方法 | 触发依据 | 适用场景 |
|---|---|---|---|
| 事件时间(Event Time) | registerEventTimeTimer(ts) |
Watermark ≥ ts |
乱序流、精确窗口、业务时间 |
| 处理时间(Processing Time) | registerProcessingTimeTimer(ts) |
系统时钟 ≥ ts |
监控告警、心跳检测、简单超时 |
1)对比示例:
// 事件时间 Timer:基于业务时间(可靠,但可能延迟)
ctx.timerService().registerEventTimeTimer(event.eventTime + 30 * 60 * 1000);
// 处理时间 Timer:基于机器时间(立即,但不精确)
ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 30_000);
3)为什么说它是“事件驱动应用的基础
因为
ProcessFunction实现了 “事件 → 状态 → 时间 → 动作” 的完整闭环:

4)为什么 ProcessFunction 如此重要
ProcessFunction= Flink 的“操作系统内核”高级 API(如窗口、CEP)都是基于它构建的。
| 维度 | 说明 |
|---|---|
| 灵活性 | 可实现任意逻辑,不受窗口限制 |
| 精确性 | 支持事件时间,保证结果正确 |
| 状态管理 | 与 Flink State 深度集成,容错无忧 |
| 时间控制 | Timer 机制实现超时、延迟、调度 |
| 扩展性 | 支持多流、广播、侧输出等高级模式 |
5)ProcessFunction 的家族成员
Flink 提供多种
ProcessFunction变体,适应不同场景:
| 类型 | 说明 | 使用前提 |
|---|---|---|
ProcessFunction<IN, OUT> |
最基础版本 | 无需 keyBy |
KeyedProcessFunction<K, IN, OUT> |
最常用!支持 Keyed State 和 key 上下文 | 必须 keyBy() |
CoProcessFunction<IN1, IN2, OUT> |
处理两个输入流(如主流 + 维表流) | 用于 connect() |
BroadcastProcessFunction<IN, BCAST, OUT> |
处理广播流 + 普通流 | 用于动态规则更新 |
KeyedBroadcastProcessFunction |
广播流 + Keyed State | 复杂规则引擎 |
6)典型事件驱动场景:
| 场景 | 如何用 ProcessFunction 实现 |
|---|---|
| 用户会话分析 | 事件到来更新活跃时间,注册 30 分钟后 Timer,超时则结束会话 |
| 订单超时取消 | 下单时注册 Timer,支付则取消 Timer,超时则自动取消订单 |
| 异常行为检测 | 记录失败次数,连续 3 次失败后锁定账户 |
| 延迟数据补偿 | 等待 5 分钟,若无新数据则输出默认值 |
7)完整示例:用户活跃会话检测(事件时间 + State + Timer)
需求:用户30分钟无活动,视为会话结束,输出会话时长。
public class SessionTimeoutFunction
extends KeyedProcessFunction<String, UserEvent, String> {
// 状态:记录会话开始时间
private ValueState<Long> sessionStartTime;
@Override
public void open(Configuration parameters) {
sessionStartTime = getRuntimeContext()
.getState(new ValueStateDescriptor<>("start-time", Long.class));
}
@Override
public void processElement(UserEvent event, Context ctx, Collector<String> out) throws Exception {
long currentTime = event.timestamp;
// 如果是新会话(状态为空 或 距离上次 > 30分钟)
Long start = sessionStartTime.value();
if (start == null || currentTime - start > 30 * 60 * 1000) {
// 开启新会话
sessionStartTime.update(currentTime);
}
// 每次事件都更新:注册一个 30 分钟后的 Timer
ctx.timerService().registerEventTimeTimer(currentTime + 30 * 60 * 1000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
Long start = sessionStartTime.value();
if (start != null) {
long duration = timestamp - start;
out.collect(ctx.getCurrentKey() + " session ended, duration: " + duration / 1000 + "s");
sessionStartTime.clear(); // 清理会话
}
}
}
2、State、Window 、ProcessFunction核心三件套
Window内部就是用ProcessFunction+State实现的!
State和Time是地基(所有有状态计算的基础)ProcessFunction是框架(直接使用地基构建任意逻辑)Window是预制房(在ProcessFunction上封装的常见模式)
┌───────────────────────┐
│ 高层 API(易用) │ ← Window(每分钟统计)
├───────────────────────┤
│ 中层抽象 │ ← CEP(匹配事件序列)
├───────────────────────┤
│ 底层核心(灵活) │ ← ProcessFunction(完全控制)
├───────────────────────┤
│ 基础能力 │ ← State(记忆) + Time(时间)
└───────────────────────┘
1)他们的关系
| 关系 | 说明 |
|---|---|
| Window 依赖 State | 每个窗口的累加器(如 count)都存在 ValueState 中 |
| Window 依赖 Timer | Watermark 触发窗口计算,本质是 onTimer() 调用 |
| ProcessFunction 可以实现 Window | 你可以用 ProcessFunction 手动模拟 Tumbling Window |
| State 不能直接做聚合 | 它只是存储,需要你写逻辑(而 Window 已封装好) |
| Window 不能做任意逻辑 | 它只支持 reduce/aggregate/apply,无法实现状态机 |
2)Window 其实是 ProcessFunction 的封装
// 伪代码:WindowOperator 的核心逻辑
class WindowOperator extends KeyedProcessFunction<...> {
private MapState<TimeWindow, Accumulator> windowStates; // 窗口状态
@Override
public void processElement(Event e, Context ctx, Collector out) {
// 1. 将事件分配给多个窗口(Sliding)
for (TimeWindow w : assignWindows(e)) {
// 2. 更新该窗口的状态
Accumulator acc = windowStates.get(w);
if (acc == null) acc = createAcc();
acc = add(e, acc);
windowStates.put(w, acc);
// 3. 注册窗口结束时间的 Timer
ctx.timerService().registerEventTimeTimer(w.getEnd());
}
}
@Override
public void onTimer(long ts, OnTimerContext ctx, Collector out) {
// 4. Timer 触发:输出窗口结果
TimeWindow w = findWindowByEnd(ts);
Accumulator acc = windowStates.get(w);
out.collect(emit(acc));
windowStates.remove(w); // 清理状态
}
}
3)什么时候用哪个?—— 决策树

| 需求 | 推荐 | 原因 |
|---|---|---|
| 每5分钟统计 PV | Window | 标准场景,高性能 |
| 用户累计积分 | ProcessFunction + ValueState | 需要长期记忆 |
| 订单15分钟未支付告警 | ProcessFunction + Timer | 需要超时触发 |
| 去重(ID 全局唯一) | ProcessFunction + State + TTL | 窗口无法跨天去重 |
4、FQA
1)aggregate() 和 process()
其实
aggregate()也是用ProcessFunction实现的
aggregate()→WindowedStream的聚合方法(高层 API)process()→KeyedStream/ConnectedStreams的底层处理函数(低层 API)
| 维度 | window().aggregate(...) |
.process(new ProcessFunction()) |
|---|---|---|
| 所属流类型 | WindowedStream<T>(窗口流) |
KeyedStream<T, K> 或 ConnectedStreams |
| API 层级 | 高层(声明式) | 底层(过程式) |
| 主要用途 | 对窗口内数据做增量聚合(如 sum/count/avg) | 完全自定义逻辑(状态 + Timer + 多输出) |
| 是否自动触发 | 是(窗口结束时自动调用) | 否(每条事件都调用 processElement) |
| 能否访问 Timer | 不能 | 能(通过 Context.timerService()) |
| 能否访问原始事件序列 | 只能访问聚合结果 | 能(逐条处理) |
| 性能 | ⚡ 更高(增量聚合,不缓存全量数据) | 灵活但可能更低(需自己管理状态) |
a、典型使用场景对比
场景 1:每5分钟统计用户点击次数
推荐用 aggregate()
- 优点:简洁、高效、自动清理
- 缺点:只能输出聚合结果,无法做额外逻辑(如告警)
stream
.keyBy(event -> event.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new CountAggregate()) // 只需实现 create/add/getResult
.print();
场景 2:用户 30 分钟无活动则告警(会话超时)
必须用 process()
- 原因:需要跨窗口记忆、注册
Timer、动态判断 - 窗口做不到:因为“会话”边界由数据决定,不是固定时间
stream
.keyBy(event -> event.userId)
.process(new SessionTimeoutFunction()) // 自定义状态 + Timer
.print();
场景3:先聚合降噪,再用 ProcessFunction 做智能决策 —— 这是常见架构!
// 先用 window + aggregate 做高效聚合
DataStream<ClickStats> stats = clicks
.keyBy(user)
.window(...)
.aggregate(new ClickAgg());
// 再对聚合结果做复杂处理
stats
.keyBy(stats -> stats.userId)
.process(new AlertOnSpikesFunction()) // 比如突增告警
.print();
b、如何选择?—— 决策指南
- 窗口场景 → 优先考虑
aggregate()/reduce() - 非窗口 or 复杂逻辑 → 用
process()
| 你的需求 | 用哪个? | 理由 |
|---|---|---|
| 固定窗口内的 sum / count / avg | aggregate() |
高层模块,简洁高效 |
| 只想快速出结果,逻辑简单 | aggregate() |
少写代码,少出 |
| 需要访问每条原始事件 | process() |
aggregate() 只见聚合结果 |
需要 Timer(超时/延迟) |
process() |
aggregate() 无法注册 Timer |
需要 Side Output(多路输出) |
process() |
aggregate() 只能单输出 |
| 需要跨窗口状态(如累计值) | process() |
窗口状态不跨窗口共享 |


