前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

一、介绍 Flink

它是一个开源的、分布式、高性能的流处理框架,既能处理“实时数据流”(比如用户点击、传感器数据),也能处理“批数据”(比如昨天的日志文件)。

特性 说明
真正的流处理 不像 Spark Streaming 那样“微批”,Flink 是逐条处理,延迟可低至毫秒级
状态管理 自动保存中间结果(比如用户累计点击数),支持大状态(GB/TB 级)
精确一次(Exactly-once 即使故障重启,结果也不会多算或少算
高吞吐 & 低延迟 每秒处理百万级事件,延迟 < 10ms

2、系统架构

image-20251205164844068

+--------------------------------------------------+
|                Client (提交作业)                 |
+--------------------------------------------------+
                      ↓
+--------------------------------------------------+
|               JobManager (主节点)                |
|  - JobMaster:调度任务                           |
|  - ResourceManager:管理 Slot 资源              |
|  - Dispatcher:接收作业并启动 JobMaster         |
+--------------------------------------------------+
                      ↓
+------------------+      +------------------+
| TaskManager 1    |      | TaskManager 2    |
| - Task Slot 1    |      | - Task Slot 1    |
|   (运行 SubTask) |      |   (运行 SubTask) |
| - Task Slot 2    |      | - Task Slot 2    |
+------------------+      +------------------+
        ↑                         ↑
     数据流分区                数据流分区

1)Client(客户端)

作用:作业提交入口,不参与运行时计算

  • 你写完代码打包提交的地方(比如 flink run -c MyJob my.jar)✅

  • 关键补充

    • Client 负责:

      • 解析用户代码(DataStream API / SQL
      • 构建 **JobGraph**(逻辑执行图)
      • JobGraph + JAR 包上传给 JobManager
    • Client 提交完就退出了!它不常驻集群(除非用 Application Mode

    • 三种部署模式影响 Client 行为

      模式 Client 生命周期 适用场景
      Session Mode 提交后退出 交互式开发
      Per-Job Mode 提交后退出 生产常用
      Application Mode 常驻 JM 内部 Kubernetes 推荐

2)JobManager (JM) —— 集群“大脑”

核心职责:集群主节点,负责调度、容错、协调,相当于“大脑”,负责协调整个作业

a、JobMaster(每个作业一个)

  • 负责单个作业的全生命周期管理
  • 具体职责
    • JobGraph 转为 ExecutionGraph(物理执行图)
    • ResourceManager 申请 Slot
    • 触发和协调 Checkpoint / Savepoint
    • 处理 TaskManager 心跳和失败恢复
    • 决定 Operator Chaining 策略

b、ResourceManager(资源管理器)

  • 分配 Task Slot
  • 关键补充
    • Slot 是逻辑资源单位,不是物理 CPU/内存
    • RM 不直接管 CPU/Memory,而是通过 SlotManager 管理 Slot 池
    • YARN/K8s 上,RM 还会动态申请/释放 TaskManager 容器

c、Dispatcher(作业分发器)

  • 接收作业,启动 JobMaster
  • 额外功能
    • 维护 JobGraph 缓存(用于 Web UI 展示)
    • 支持 Savepoint 触发flink savepoint <jobId>
    • HA 模式下,配合 ZooKeeperJM 故障转移

3)TaskManager (TM) —— “工人”节点

工作节点,执行实际计算任务,真正干活的机器,运行具体的计算任务(SubTask

a、Task Slot(任务槽位)

TM 上的资源隔离单元(类似“虚拟 CPU 核心”)

image-20251205171940999

  • SlotCPU 核心!它是是 逻辑 资源单位,不是 物理隔离

    • 所有 Slot 共享同一个 TaskManager JVM 进程

    • 所有 Slot 共享同一个 CPU 资源池(由 OS 调度)

    • 所有 Slot 共享同一个 JVM 堆内存(Heap)

      • 内存如何隔离?虽然默认不隔离,但 Flink 支持细粒度内存管理

        • # 控制每个 Slot 的堆内存上限
          taskmanager.memory.task.heap.size: 1024mb
          
        • # 每个 Slot 的堆外内存(用于网络缓冲、RocksDB 等)
          taskmanager.memory.task.off-heap.size: 512mb
          
  • Slot vs 并行度

    • 最大支持并行度 = 所有 TaskManagerSlot 总数
    • 例如:3 个 TaskManager,每个有 2 个 Slot → 最大并行度 = 6
  • 为什么需要 Slot

    • 防止单个大作业占满整个 TM
    • 允许多个作业共享同一个 TM(提高资源利用率)
    • 每个 Slot 可运行一个 Task

b、Operator Chaining(算子链)优化

  • Flink 会将满足条件的连续算子(如 map → filter → keyBy合并成一个 Task
  • 这个 Task一个 Slot 中运行内部无序列化/网络开销
  • 条件:same parallelism + no shuffle + no repartition

Flink 应用由 数据流(Data Streams)算子(Operators) 构成,形成一个 有向无环图(DAG)

[Source] → [Operator1] → [Operator2] → ... → [Sink]
  • Source:数据来源,如fromElements(), readTextFile()Kafka、文件、集合等。
  • Transformation:数据转换,包括map(), flatMap(), filter(), keyBy(), reduce()等。
  • Sink:数据输出目的地,例如print(), writeAsText()、如数据库、控制台、文等。

1)执行环境与程序骨架

a、创建执行环境

  • getExecutionEnvironment():自动适配本地/集群模式。
  • 可通过 setParallelism(n) 设置默认并行度。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

b、触发执行

  • 惰性执行:只有调用 execute() 时,Flink 才会构建 JobGraph 并提交到 JobManager
  • 作业名称用于在 Web UI 中识别任务。
env.execute("My Flink Job");

c、示例代码:

  1. 获取 Flink 流处理执行环境
  2. 创建一个 DataStream(数据流),使用 fromElements() 方法从一组预定义的 Person 对象构造源流
  3. 对数据流进行转换:使用 filter() 算子过滤出年龄大于等于 18 岁的 Person 对象
  4. 添加 Sink:将过滤后的结果打印到标准输出(控制台)
  5. 触发作业执行
public static void main(String[] args) throws Exception {
    // 1. 获取 Flink 流处理执行环境
    // 这是所有 Flink 程序的入口点,用于构建和提交数据流作业
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 2. 创建一个 DataStream(数据流),使用 fromElements() 方法从一组预定义的 Person 对象构造源流
    // 此处模拟了一个小型静态数据集,包含三个人的信息
    DataStream<Person> flintstones = env.fromElements(
            new Person("Fred", 35),     // 成年人
            new Person("Wilma", 35),    // 成年人
            new Person("Pebbles", 2)    // 小孩
    );

    // 3. 对数据流进行转换:使用 filter() 算子过滤出年龄大于等于 18 岁的 Person 对象
    // filter 接收一个 Predicate(布尔函数),返回 true 的元素会被保留在结果流中
    DataStream<Person> adults = flintstones.filter(person -> person.age >= 18);

    // 4. 添加 Sink:将过滤后的结果打印到标准输出(控制台)
    // print() 是一个内置的 sink 算子,常用于调试和演示
    // 输出格式为 "<subtask_id> <element>",例如 "1> Fred: age 35"
    adults.print();

    // 5. 触发作业执行
    // 至此,Flink 才会真正构建并提交数据流图(DAG)到本地或集群运行
    // execute() 方法会阻塞主线程,直到作业完成(对于有界流)或被取消(对于无界流)
    env.execute();
}

4、有界流 vs 无界流

Flink 主要且最擅长处理无界流(实时数据流),但它通过“流批一体”架构,也能高效处理有界流(批处理),且使用同一套 API。

jav

1)这两个流区别是什么

类型 描述 典型场景 Flink处理方式
有界流 数据集有限,可完全加载 批处理(比如一天的数据) 视为特殊流处理
无界流 数据持续到达,理论上无限 实时监控(比如每秒进来的订单) 原生支持
  • 无界流 → 用 StreamingExecutionEnvironment
  • 有界流(纯批)→ 用 BatchExecutionEnvironment
特性 无界流(流模式) 有界流(批模式)
执行模式 Streaming(默认) 可选 Batch(Flink 1.12+ 引入)
调度方式 流式 pipeline,持续运行 优化为批调度(如全量 shuffle 后再计算)
状态管理 必须启用 checkpoint 可关闭状态,提升性能
窗口触发 依赖 watermark + trigger 数据读完自动触发所有窗口
资源效率 长期运行,注重稳定性 短期任务,注重吞吐和完成速度

3)为什么无界流容易 OOM,而有界流不容易?

场景 无界流 + ProcessWindowFunction 有界流 + ProcessWindowFunction
窗口数据生命周期 必须在状态后端中持续缓存直到窗口触发 数据在 单次 task 执行栈内处理,结束后立即 GC
背压与流量控制 若数据突增,状态可能持续增长 输入总量已知,调度器可预分配资源
容错机制 依赖 checkpoint 持久化状态 → 内存+磁盘都要存 失败重试只需重跑该 task,无需 checkpoint 窗口数据
大窗口风险 高(如 1 小时窗口 + 高 QPS = 数千万条) 低(即使大窗口,也是“已知总量”,可优化调度)

a、启用了 BATCH 模式

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  • Flink 会启用 批调度器(Batch Scheduler)
  • 不再使用 checkpointwatermark 触发等流式机制
  • 而是像 Spark 一样:shuffle 数据,再分阶段计算

b、JoinGroupBy 都是“分阶段执行”

  • 第一阶段:keyBy(userId) → 将订单和用户数据按 key shuffle 到同一 task manager
  • 第二阶段:每个 task 只处理属于自己 key 的数据子集
  • 第三阶段:窗口聚合时,每个窗口的数据在单次 task 调用中加载 → 计算 → 释放

📌 即使总数据 1 亿条,只要 key 分布均匀,每个 task 只处理几万~几十万条,内存完全可控。

  • 如果某个 key 的数据特别大(如“user1”有 1000 万订单),Flink 会:
    • reduce 阶段自动将中间数据 spill 到磁盘
    • 类似 MapReduce 的 “combinespillmerge” 流程
  • 这是 Streaming 模式不具备的能力

d、没有长期状态缓存

  • 在流模式中,ProcessWindowFunction 必须把窗口数据持久化到状态后端,直到 watermark 触发。
  • 但在批模式中,所有数据已知,窗口一次性触发,无需缓存,计算完就释放。

场景:批处理 ETL 场景,数据量很大(比如 1 亿条订单),但不 OOM

  • 订单数据(orderId, userId, amount)
  • 用户数据(userId, region)
  • 目标:统计每个区域的总订单金额
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class BoundedJoinExample {

    public static void main(String[] args) throws Exception {
        // 1. 创建环境,并显式设置为 BATCH 模式(关键!)
        StreamExecution环境 env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH); // ← 启用批优化

        // 2. 模拟两个有界数据源(实际可用 readTextFile / fromCollection)
        DataStream<Tuple3<String, String, Double>> orders = env.fromCollection(java.util.Arrays.asList(
            Tuple3.of("order1", "user1", 100.0),
            Tuple3.of("order2", "user2", 200.0),
            Tuple3.of("order3", "user1", 150.0),
            Tuple3.of("order4", "user3", 300.0)
        ));

        DataStream<Tuple2<String, String>> users = env.fromCollection(java.util.Arrays.asList(
            Tuple2.of("user1", "beijing"),
            Tuple2.of("user2", "shanghai"),
            Tuple2.of("user3", "guangzhou")
        ));

        // 3. 给数据加上“事件时间”(这里用固定时间,因为是有界流)
        WatermarkStrategy<Tuple3<String, String, Double>> orderWm = WatermarkStrategy.noWatermarks();
        WatermarkStrategy<Tuple2<String, String>> userWm = WatermarkStrategy.noWatermarks();

        DataStream<Tuple3<String, String, Double>> ordersWithTs = orders.assignTimestampsAndWatermarks(orderWm);
        DataStream<Tuple2<String, String>> usersWithTs = users.assignTimestampsAndWatermarks(userWm);

        // 4. Join:按 userId 关联订单和用户
        DataStream<Tuple3<String, String, Double>> enriched = ordersWithTs
            .keyBy(order -> order.f1) // userId
            .intervalJoin(usersWithTs.keyBy(user -> user.f0))
            .between(Time.milliseconds(-1), Time.milliseconds(1)) // 因为是静态数据,时间窗口设为0
            .process(new JoinFunction<...>() {
                @Override
                public Tuple3<String, String, Double> join(Tuple3<String, String, Double> order, Tuple2<String, String> user) {
                    return Tuple3.of(user.f1, order.f1, order.f2); // (region, userId, amount)
                }
            });

        // 5. 按 region 分组,统计总金额(这里用滚动窗口,但因为是有界流,窗口会一次性触发)
        enriched
            .keyBy(record -> record.f0) // region
            .window(TumblingEventTimeWindows.of(Time.days(1))) // 假设所有数据在同一天
            .reduce((a, b) -> Tuple3.of(a.f0, "SUM", a.f2 + b.f2)) // 累加金额
            .print();

        // 6. 执行
        env.execute("Bounded Join Example");
    }
}

二、DataStream API核心组件

1、支持的数据类型

Flink 支持任何 可序列化Java/Scala 类型:

类型 示例 要求说明
基本类型 String, Integer, boolean[] 自动支持
POJO 自定义 Person 需满足 POJO 规范(见下文)
Tuple Tuple2<String, Integer> Flink 内置
Scala Case Class case class Event(id: String, ts: Long) Scala 专用

2、POJO 类规范(必须全部满足):

  • 类是 public 且非内部类;
  • public 无参构造函数;
  • 所有字段要么 publicfinal,要么有标准 getter/setter(符合 JavaBean 命名);
  • 字段类型也需可序列化。

三、核心算子详解

1、Source 算子(数据输入)

方法 说明
fromElements(...) 从内存对象创建流(测试用)
fromCollection(list) Java Collection 创建
readTextFile(path) 按行读取文本文件
socketTextStream(host, port) Socket 实时读取(调试用)
addSource(SourceFunction) 接入 KafkaPulsar 等外部系统

2、Transformation 算子(数据转换)

1)无状态转换(Stateless

算子 功能 示例
map() 1:1 映射 stream.map(x -> x * 2)
flatMap() 1:N 映射 分词、展开嵌套结构
filter() 条件过滤 stream.filter(x -> x > 0)

1)flatMap 示例:文本分词

DataStream<String> lines = env.fromElements("hello world", "flink stream");
DataStream<String> words = lines.flatMap((line, out) -> {
    for (String word : line.split(" ")) {
        out.collect(word); // 可输出多个
    }
});
// 输出: hello, world, flink, stream

2)有状态转换(Stateful

算子 功能
keyBy(keySelector) key 重分区(类似 SQL GROUP BY),触发 网络 shuffle,是昂贵操作,应谨慎使用
window(...) key 分组上定义窗口
reduce(), sum(), max() 聚合操作

keyBy + reduce 示例:每组姓名的最大年龄

people.keyBy(p -> p.name)
       .max("age") // 输出滚动最大值
       .print();

3、Sink 算子(数据输出)

生产环境应使用 Checkpointed Sink(如 KafkaSink)保证 Exactly-Once

方法 说明
print() 控制台输出(开发调试)
writeAsText(path) 写入文本文件(不推荐生产用)
addSink(SinkFunction) 自定义输出(如写入 MySQLKafka

四、并行与分布式执行

Flink 不是一个单线程程序,而是一个分布式流处理引擎。它把一个作业拆成多个任务,并让这些任务同时运行在多台机器(或多个 CPU 核心)上,从而实现高吞吐、低延迟。这个“同时运行多少个任务”的数量,就叫 并行度(Parallelism)

1、什么是 并行度(Parallelism

1)核心定义:

并行度 = 每个算子(operator)被拆分成多少个“子任务”(subtask)来并行执行。

  • 每个 subtask 是一个独立的线程(可能在不同机器上)。
  • Flink 作业的总并行能力 = 各算子 subtask 数量之和(但受限于集群资源)。

2)设置方式:

注意:sourcesink 也可以单独设置并行度!

方式 代码 作用范围
全局设置 env.setParallelism(4) 所有算子默认使用该并行度
算子级设置 .map(...).setParallelism(2) 仅对该算子生效

2、理解并行度如何分配数据

答案:Flink 默认使用 轮询(round-robin) 分发策略(对非 keyby 流):

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2); // 全局并行度 = 2

// 创建一个包含 4 个元素的有界流
DataStream<String> source = env.fromElements("A", "B", "C", "D");

// map 算子(未显式设置,并行度=2)
DataStream<Integer> mapped = source.map(s -> {
    System.out.println("Thread: " + Thread.currentThread().getName() + " processing: " + s);
    return s.length();
});

mapped.print(); // sink 并行度也=2
env.execute();
元素 分配给哪个 subtask
“A” source.subtask[0] → map.subtask[0]
“B” source.subtask[1] → map.subtask[1]
“C” source.subtask[0] → map.subtask[0]
“D” source.subtask[1] → map.subtask[1]

因为 fromElementssource 也会按并行度拆成 2 个 subtask!

可能输出(简化版):

1> 1   // subtask ID = 1(即第1个子任务)处理了 "A" 或 "C"
2> 1   // subtask ID = 2 处理了 "B" 或 "D"
1> 1
2> 1

输出前的 1>2> 就是 subtaskID(从 1 开始编号)。

3、数据如何在算子之间传递

当一个算子的 subtask 处理完数据后,需要把结果传给下一个算子的 subtask。 怎么传?有两种主要方式:

模式 特点 示例
One-to-One 保持分区和顺序(如 sourcemap map 算子之间
Rebalancing 重新分区(如 keyBybroadcast keyBy 后数据按 key 分发

1)One-to-One(直连,保持分区)

  • 触发条件
    • 上下游算子 并行度相同

    • 没有触发重分区操作(如 keyBy, shuffle, rebalance

  • 特点
    • 分区保持:数据从上游子任务 直接传给下游对应编号的子任务
    • 顺序保持:同一分区内的事件顺序不变
    • 零网络开销:如果两个 subtask 在同一台机器(TaskManager),数据直接内存传递
    • 高性能;性能极高,因为避免了序列化和网络传输。
  • 生活例子
    • 4 个快递员(source)分别负责 A、B、C、D 区。 他们把包裹直接交给同一个区域的分拣员map)—— A区快递员 → A区分拣员,B 区 → B 区分拣员…… 没人跨区送包,效率高!
  • 代码示例:这三个算子(source + 2个map)会被 自动链接(Chained 成一个 task,只启动 2 个线程!

    env.setParallelism(2);
    DataStream<String> ds = env.fromElements("X", "Y");
    ds.map(x -> x + "-1")     // 算子1
      .map(x -> x + "-2")     // 算子2 → 与算子1并行度相同,无 keyBy
      .print();
    

2)Rebalancing(重新分区,如 keyBy

  • 触发条件
    • 使用了 keyBy(key)
    • 或显式调用 shuffle(), rebalance(), broadcast()
  • 特点
    • 打破原有分区:数据会根据 key 重新分配到下游不同子任务。
    • 需要网络传输:即使在同一机器,也可能序列化+反序列化
    • 保证 key 聚合:相同 key 的数据一定去同一个下游 subtask
    • 性能开销大:是昂贵操作,但必要(如按用户 ID 聚合)。
  • 生活例子:现在要把所有包裹按“收件人姓名”分类。 无论哪个快递员拿到包裹,都要送到对应姓名的分拣台。 比如:
    • “张三”的包裹 → 全部送到 1 号台
    • “李四”的包裹 → 全部送到 2 号台 这就需要 跨区域调度,甚至用车拉(网络传输)!

代码示例

DataStream<Person> people = env.fromElements(
    new Person("Alice", 25),
    new Person("Bob", 30),
    new Person("Alice", 28),
    new Person("Charlie", 22)
);

// 假设并行度 = 2
KeyedStream<Person, String> keyed = people.keyBy(p -> p.name);

Flink 会对每个 name 计算 哈希值 % 并行度,决定去哪个 subtask

name hashCode % 2 分配到 subtask
“Alice” 0 subtask[0]
“Bob” 1 subtask[1]
“Alice” 0 subtask[0]
“Charlie” 1 subtask[1]

所有 “Alice” 都去了 subtask[0],后续的 max("age") 就能在同一个地方计算!

输出示例:

1> Alice: age 28   // subtask[0] 输出 Alice 的最大年龄
2> Bob: age 30     // subtask[1] 输出 Bob 的最大年龄
2> Charlie: age 22 // subtask[1] 输出 Charlie

注意:1>2> 表示 窗口聚合结果由哪个子任务输出,而不是原始数据来源。

五、时间语义与水印

1、三种时间语义

Apache Flink 中,时间语义决定了窗口何时触发计算事件归属哪个窗口。共有三种:

推荐:使用 Event Time:结果可复现,适合金融、审计等场景。

特性 Processing Time Ingestion Time Event Time(+ Watermark)
时间来源 本地机器时钟 Source 进入时间 数据中的 eventTime
是否可复现
能否处理乱序 不能 不能 能(靠 Watermark
延迟容忍 无(立即处理) 可配置(如 5s
窗口触发依据 系统时间 Source 时间 Watermark 推进
适用场景 监控、告警 少量有序流 金融、审计、对账、精确统计
是否推荐 仅限低精度场景 不推荐 强烈推荐

问题背景:实时统计每分钟支付成功的订单数

class Order {
    String orderId;
    long eventTime; // 订单实际发生的时间(毫秒时间戳)
}

数据流可能是这样的(按到达顺序):

到达顺序 eventTime(格式 HH:mm:ss) 说明
1 10:00:05 正常到达
2 10:00:07 正常到达
3 10:00:02 延迟到达(10:01:20)!实际更早发生
4 10:01:10 属于下一分钟

问题来了:Flink 如何知道 “10:00:00 ~ 10:00:59” 这个窗口的数据已经收齐,可以关闭并输出结果?**

1)Processing Time(处理时间)

  • 窗口划分依据:Flink 各算子使用本地机器的系统时间(即事件被处理时的当前时间)来分配窗口
    • 例如:当某条订单在系统时间为 10:00:05 时被窗口算子处理,它就会被归入 [10:00:00, 10:01:00) 的窗口。
  • 举例

    • 如果当前系统时间是 10:00:05,就把这个事件分到 [10:00:00, 10:01:00) 窗口。

    • 窗口在系统时间到达 10:01:00 时触发计算。

  • 缺点:

    • 结果不可复现即使输入数据完全相同,只要处理时机不同,输出结果就可能不同
      • 情况一(处理快):三条数据都在系统时间 10:00:59 前被窗口算子处理 → 全部分到 [10:00, 10:01) 窗口 → 输出 count = 3
      • 情况二(处理慢): 前两条在 10:00:59 前处理,但第三条因 GC 或网络卡顿,直到 10:01:02 才被处理 →它被分到 [10:01, 10:02) 窗口 → [10:00, 10:01) 窗口只输出 count = 2
    • 无法处理延迟或乱序数据:迟到的 10:00:02 可能被分到 10:01 窗口(如果它在 10:01 之后才被处理)!
  • 适用场景:

    • 实时监控大屏(如“当前在线用户数”)

    • 告警系统(不要求精确,只要快)

    • 对准确性要求低、但对延迟敏感的场景

2)Ingestion Time(摄入时间)

  • 实现方式Source 算子在事件刚进入 Flink 时,自动打上当前系统时间作为时间戳。
  • 相当于 Processing Time 的“统一版本”:所有算子看到的是同一个时间(Source 打的时间),而不是各自机器的时间。

  • 缺点:比 Processing Time 稍好,但仍无法处理源头延迟

  • 订单在业务系统中 10:00 生成,但 10:02 才发到 KafkaFlink 会在 10:02 给它打上时间戳 → 被分到 10:02 的窗口。

  • 丢失了事件的真实发生时间

  • 适用场景: Ingestion Time 很少被使用,因为要么用简单的 Processing Time,要么直接上 Event Time

    • 想要一定稳定性,但又不想处理 Watermark 的复杂性

    • 数据源基本有序、延迟极小(如内部日志流)

3)Event Time(事件时间) ← 推荐!

  • 窗口划分依据:用数据自带的 eventTime 判断时间。
  • 关键机制Watermark(水印线) —— 用于告诉系统:“小于等于某个时间的事件,应该都到了”。

  • 优点:

    • 结果完全由数据决定,与处理速度、网络延迟无关

    • 可重放、可复现:无论跑多少次,只要输入数据相同,结果就相同

    • 正确处理乱序和延迟事件

  • 挑战:

    • 需要理解并配置 Watermark

    • 需要容忍一定的延迟(不能无限等下去)

2、WatermarkEvent Time 的“进度条”

问题:在基于 Event Time 的流处理中,数据可能乱序到达(例如:10:00:02 的事件比 10:00:07 晚到)。那么 Flink 如何知道:“10:00 这一分钟的数据已经收齐,可以关闭窗口并输出结果了?

答案就是:Watermark(水位线),它是 Flink 判断“窗口是否可以关闭”的依据。是一个单调递增的时间戳,表示:“我确信,不会再有 eventTime ≤ W 的事件来了。

1)Watermark 的核心特性

特性 说明
单调递增 Watermark 时间戳永远不会减少,即使后续事件的 eventTime 更小
周期性生成 默认每 200ms 生成一次(可通过 .withWatermarkInterval(Duration) 调整)
滞后于最大事件时间 故意延迟一段缓冲时间(如 5s),以容忍乱序

2)允许最大延迟(Bounded Out-of-Orderness

// 允许最多 5 秒的乱序(即:事件最多延迟 5 秒到达)
WatermarkStrategy<Order> strategy = WatermarkStrategy
    .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, timestamp) -> event.eventTime);
  • Flink 会持续跟踪当前看到的 最大 eventTimemaxTs

  • 每隔一段时间(默认 200ms),生成一个 Watermark

    Watermark= maxTs−allowedLateness
    
  • 例如:

    • 当前最大 eventTime = 10:01:10
    • 允许延迟 = 5
    • Watermark = 10:01:05
    • Flink 认为:所有 eventTime ≤ 10:01:05 的事件都已到达

3)触发窗口的条件:

Flink事件时间(Event Time)窗口 触发依赖 Watermark

  • 窗口 [10:00, 10:05) 只有在 Watermark10:05 时才会触发计算并关闭。
  • Watermark 是由 数据中的事件时间 推进的。
  • 如果没有新数据 → 没有新事件时间 → Watermark 停止推进 → 窗口永远不触发

举例说明:你的订单场景(带详细 Watermark 推进)假设事件按以下顺序到达(括号内为 eventTime):

配置:allowedLateness = 5s

到达顺序 eventTime Flink 处理逻辑(假设 maxDelay=5s
1 10:00:05 暂不触发窗口;Watermark 滞后
2 10:00:07 同上
3 10:00:02 虽然迟到,但仍在 [10:00:00, 10:01:00) 窗口内,且未超 5s 延迟 → 被正确计入
4 10:01:10 触发 Watermark = 10:01:0510:00 窗口关闭!输出 count=3
  • 最终结果:count = 3(包含 10:00:02 的延迟事件)无论重放多少次,只要输入数据相同,结果始终一致。

  • Flink 看到一个 eventTime = 10:01:10 的事件时,它会发出 Watermark = 10:01:10 - 5s = 10:01:05。此时,所有 eventTime ≤ 10:01:05 的窗口都可以安全关闭

3、迟到事件怎么办?

即使设置了 Watermark,仍可能有事件超过容忍延迟才到达(如 10:00:01 的事件在 10:01:10 之后才到)。

a、延长窗口生命周期(allowedLateness

  • 窗口在 Watermark 触发后不会立即销毁,而是继续接收迟到事件(≤ 窗口结束时间 + 10s
  • 每收到一个有效迟到事件,会再次触发窗口计算并输出更新结果
  • 适用于需要“最终一致性”的场景(如实时报表修正)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(10)) // 窗口关闭后,再等 10 秒
.aggregate(...)

b、旁路输出(Side Output

默认行为,迟到事件直接丢弃(无警告!)

  • 超出 allowedLateness 的事件被路由到独立流
  • 主流只处理“准时+容忍期内迟到”的数据
  • 适合审计、监控、异常分析等场景
OutputTag<Order> lateTag = new OutputTag<Order>("late-orders") {};

SingleOutputStreamOperator<Long> result = stream
    .window(...)
    .sideOutputLateData(lateTag)
    .aggregate(...);

// 单独处理迟到数据
result.getSideOutput(lateTag).addSink(...); // 如写入告警日志

3、代码示例:每分钟统计订单数(Event Time + Watermark

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

public class OrderCountWithEventTime {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Flink 1.12+ 默认使用 Event Time,但显式设置更清晰
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(5000); // 开启 checkpoint,保障容错

        // 模拟订单数据源
        DataStream<Order> orders = env.addSource(new OrderSourceFunction());

        // 定义 Watermark 策略:允许 5 秒乱序
        WatermarkStrategy<Order> strategy = WatermarkStrategy
            .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((order, ts) -> order.eventTime)
            .withWatermarkInterval(Duration.ofMillis(200)); // 可选:调整生成频率

        DataStream<Order> withWatermarks = orders.assignTimestampsAndWatermarks(strategy);

        // 定义迟到数据的 Side Output
        OutputTag<Order> lateOrdersTag = new OutputTag<>("late-orders"){};

        // 每分钟滚动窗口统计
        withWatermarks
            .keyBy(order -> "global") // 全局统计(不分组)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .allowedLateness(Time.seconds(10)) // 允许额外 10 秒迟到
            .sideOutputLateData(lateOrdersTag)
            .aggregate(new CountAggregate()) // 自定义聚合函数
            .print("On-time Result");

        // 输出超时迟到事件
        withWatermarks
            .getSideOutput(lateOrdersTag)
            .print("Late Order");

        env.execute("Real-time Order Count per Minute (Event Time)");
    }
}

4、FQA

1)延长周期和最大延迟配置一个行吗?

答案:两者作用完全不同,不能互相替代。

  • Watermark 延迟(X
    • 我等 最多 X 秒,就认为该分钟的数据基本齐了,先算一次!
    • 只调大 Watermark 延迟:结果更“完整”,但延迟高、可能卡住、无法更新
  • Allowed LatenessY
    • 算完后,我还再等 Y 秒,万一有漏网之鱼,我就更新结果。
    • Watermark + allowedLateness快速出初值 + 后续修正,兼顾时效性与准确性
特性 forBoundedOutOfOrderness(X) .allowedLateness(Y)
控制什么? 何时触发窗口 窗口关闭后还能接收多久迟到数据
影响什么? 决定“第一次输出结果的时间点 决定“是否允许多次更新结果
作用对象 Watermark 生成逻辑 窗口生命周期
影响触发时间 是(Watermark = maxTs - X) 否(窗口在 Watermark ≥ end 时立即触发)
是否支持多次输出 否(窗口触发即关闭) 是(每次有效迟到都重新计算)
处理超 X 的迟到事件 不能(除非 X 足够大) 能(只要在 Y 范围内)
空闲流风险 可能卡住(无新事件 → Watermark 不推进) 同样依赖 Watermark,但可配合 withIdleness() 缓解

2)延长周期和最大延迟配置建议

1)参数建议:

  • XWatermark 延迟)设小一点
    • 目标:低延迟、快速出初值
    • 比如 95% 的数据在 5 秒内到达 → 设 X = 5s
  • YallowedLateness)设大一点
    • 目标:捕获迟到数据,修正结果
    • 覆盖极端延迟(如最长可能延迟 15 秒)→ 设 Y = 10~15s

2)两者配合使用

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
// ...
.window(...).allowedLateness(Time.seconds(10))

3)防卡死全局性兜底场景,对可能空闲的流,加上空闲检测:

Source 级空闲,不是 用户(key)级空闲

.withIdleness(Duration.ofSeconds(30)) // 30秒无数据,Watermark 自动推进

六、窗口(Window)计算

1、为什么需要窗口?

无界流无法直接聚合(如“总和”),需划分为有限片段(窗口)进行计算。例如:

  • “用户总点击数” → 随时间无限增长,永远算不完。
  • “过去一小时的错误率” → 需要限定时间范围。

解决方案:窗口(Window:将无界流切分成有限大小的片段(buckets),在每个片段内进行聚合。

1)窗口原理

数据源 → WatermarkStrategy → WindowAssigner → Trigger → Evictor(可选) → WindowProcessFunction
  1. 数据进入系统;
  2. 根据事件时间生成 Watermark,保证时间语义;
  3. 将数据分配到对应的时间窗口;
  4. 当触发条件满足时,触发窗口计算;
  5. (可选)在触发前清理一些数据;
  6. 对窗口内数据进行最终处理并输出结果。
组件 作用 是否必要 主要职责
数据源   必须 提供输入数据
WatermarkStrategy   必须(若用 Event Time 处理乱序,生成水印
WindowAssigner   必须 分配数据到窗口(哪种窗口类型)
Trigger   必须 决定何时触发窗口
Evictor   可选 清理窗口中部分数据
WindowProcessFunction   必须 执行窗口逻辑
  • 如果你使用的是 sum()count() 等内置聚合函数,底层其实也封装了 AggregateFunctionProcessWindowFunction
  • 使用 EventTime 时,Watermark 是关键;否则用 ProcessingTime 则无需 Watermark

a、WindowAssigner

窗口分配器,WindowAssigner 是一个抽象类,用于为每个输入元素分配一个或多个窗口(使用滑动窗口时,一个输入元素会属于多个窗口)。

public abstract class WindowAssigner<T, W extends Window> implements Serializable {
        /**     
        * 为每一个输入元素分配窗口.     
        *     
        * @param element 输入元素     
        * @param timestamp 元素的时间戳。如果没有通过assignTimestampsAndWatermarks方法分配时间戳,则为Long.MIN_VALUE     
        * @param context 窗口上下文     
        * @return 窗口集合     
        */    
        public abstract Collection<W> assignWindows(T element, long timestamp, 
                                                    WindowAssigner.WindowAssignerContext context);    
        
        /**     
        * 获取窗口的默认触发器.     
        *      
        * @param env 流模式环境     
        * @return 触发器     
        */    
        public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);    
        
}

b、Trigger

触发器,窗口何时触发计算。用于为每个输入元素分配完窗口后,需要指定窗口的触发逻辑,常见的有通过Timer定时触发和计数达到指定条数触发。

// 触发器枚举
public enum TriggerResult {    
  // 不做任何处理   
  CONTINUE(false, false),    

  // 触发窗口计算,并清空窗口内的数据   
  FIRE_AND_PURGE(true, true),    

  // 触发窗口计算,不清空窗口内的数据 
  FIRE(true, false),    

  // 清空窗口内的数据 
  PURGE(false, true);  

  // 是否触发计算
  private final boolean fire;    

  // 是否清空窗口
  private final boolean purge;

}


public abstract class Trigger<T, W extends Window> implements Serializable {        

  // 当接收到一条输入元素时,判断触发器的结果    
  public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);          throws Exception;    

  // 到达处理时间Timer注册的时间,会判断触发器的结果    
  public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;   

   // 到达事件时间Timer注册的时间,会判断触发器的结果    
   public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

}

2)窗口分类

不同窗口类型,主要区别在于 WindowAssignerTrigger,较为常用的主要有两类:时间窗口和计数窗口。

维度 时间窗口 计数窗口
划分依据 基于时间划分窗口 基于元素数量划分窗口
窗口对象 TimeWindow GlobalWindow
是否需要 Watermark 事件时间窗口需要 不需要
触发机制 时间到达(+ Watermark) 条数达到阈值
状态管理 每个窗口独立状态 全局单一窗口状态
适用性 实时流处理主流选择 小批量、固定条数场景
适用场景 日志分析、实时监控、按小时/分钟聚合等 批处理式统计、固定条数触发计算

a、计数窗口

  • 计数窗口不区分事件时间 or 处理时间,因为它不依赖时间。
  • 由于所有数据在一个窗口,状态会不断增长,需确保及时触发清理。
  • 不适合高吞吐长期运行场景(可能 OOM),建议配合 evictor 或使用 reduce/aggregate 减少状态。

b、时间窗口

  • 每个 TimeWindow 有明确的 [start, end) 区间。
  • maxTimestamp() 返回 end - 1,因为 Flink 的窗口是左闭右开区间。
public class TimeWindow extends Window {
    private final long start;
    private final long end;

    public long maxTimestamp() {
        return end - 1L; // 窗口内允许的最大事件时间戳
    }
}
窗口类型 是否固定大小 是否重叠 划分依据
TumblingWindow 固定时间长度
SlidingWindow 固定长度 + 滑动步长
SessionWindow 事件间隔(gap)
// 滚动事件时间窗口:每 5 分钟统计一次
stream
  .keyBy(...)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .sum("value");

// 滑动事件时间窗口:每 1 分钟统计最近 5 分钟的数据
stream
  .keyBy(...)
  .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
  .sum("value");

3、计数窗口

1)本质:GlobalWindow + CountTrigger

Flink 没有独立的 CountWindow,而是通过以下组合实现:

  • WindowAssignerGlobalWindows(所有元素分配到同一个全局窗口)
  • TriggerCountTrigger(当窗口内元素数量达到阈值时触发计算)
stream.countWindow(100); 
// 实际是:window(GlobalWindows.create()).trigger(CountTrigger.of(100))

a、GlobalWindow 特点

  • 所有元素都被分配到同一个全局窗口(理论上无限大)。
  • 必须自定义 Trigger,否则永远不会触发(因为没有自然结束时间)。

b、CountTrigger 行为

方法 行为
onElement 每来一个元素,计数器 +1;达到阈值则触发清空并重置
onProcessingTime / onEventTime 不做任何事(跳过)

c、使用场景:

计数窗口适用于 “按固定条数触发计算” 的场景,尤其适合:

场景 说明
批量处理式实时计算 每收集 1000 条日志就做一次聚合,而不是等时间到了
样本采样或批处理模拟 N 条数据作为一个批次进行模型推理或写入
低频数据流的及时响应 数据来得慢,用时间窗口会很久不触发,改用计数可更快响应
测试/调试 快速验证逻辑,无需等待时间窗口结束

d、示例:每 3 条订单就统计一次总金额

  • keyBy().window() → 每个 key 独立计数(如每个用户每 3 条触发)
  • windowAll() → 全局统一计数(所有用户合起来每 3 条触发)
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

public class CountWindowExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 模拟订单流:(orderId, amount)
        DataStream<Order> orderStream = env.fromElements(
            new Order("1", 10.0),
            new Order("2", 20.0),
            new Order("3", 30.0),  // 触发第1次计算(3条)
            new Order("4", 5.0),
            new Order("5", 15.0),
            new Order("6", 25.0),  // 触发第2次计算(又3条)
            new Order("7", 40.0)
        );

        // 使用计数窗口:每3条触发一次
        orderStream
            .keyBy(order -> "global") // 如果不需要 keyBy,也可用 allWindow(见下方说明)
            .window(GlobalWindows.create())
            .trigger(CountTrigger.of(3))
            .sum("amount")
            .print(); // 输出:60.0, 45.0, ...

        env.execute("Count Window Example");
    }

    public static class Order {
        public String id;
        public double amount;

        public Order(String id, double amount) {
            this.id = id;
            this.amount = amount;
        }
    }
}

4)为什么状态会无限增长

.window(GlobalWindows.create())
.trigger(CountTrigger.of(100))
.process(new ProcessWindowFunction<...>())

Flink 会在状态后端中 完整保存这 100 条原始数据,直到窗口触发计算。

b、问题在于:触发 ≠ 清理

原因:Flink 无法确定你是否还需要这些数据(比如用于后续合并、回溯等),所以保守地保留。

  • ProcessWindowFunction 执行完后,Flink 默认不会自动清除这些已处理的数据
  • 下一批 100 条到来时,状态中可能还留着上一批的 100 条 → 状态翻倍!
  • 长期运行 → OOM(内存溢出)或 Checkpoint 失败

5)解决状态无限增长

a、优先使用增量聚合函数(Reduce / Aggregate

强烈建议:只要能用 reduceaggregate 实现逻辑,就不要用 ProcessWindowFunction

这是 最高效、最安全 的方式,从根本上避免缓存原始数据

示例:用 reduce() 替代 process()

stream
  .keyBy(...)
  .window(GlobalWindows.create())
  .trigger(CountTrigger.of(100))
  .reduce((value1, value2) -> new Order(value1.amount + value2.amount))
  .print();

优点:

  • 状态只保存 一个中间结果(如 sumcountavg),不是 100 条原始数据
  • 每来一条就更新状态,触发时直接输出,无需额外清理
  • 内存占用恒定(O(1))

b:使用 Evictor 主动清理已处理数据

如果你必须用 ProcessWindowFunction(比如需要访问窗口元信息、做复杂逻辑),可以配合 Evictor 在触发前清空数据。

stream
  .keyBy(...)
  .window(GlobalWindows.create())
  .trigger(CountTrigger.of(100))
  .evictor(CountEvictor.of(100)) // 触发前移除最近100条
  .process(new ProcessWindowFunction<...>() { ... });

注意:CountEvictor.of(100) 表示“移除最近 100 条”,刚好匹配窗口大小。

工作流程:

  1. 第 100 条数据到达 → 触发器决定触发
  2. Evictor 先执行:从窗口中移除最近的 100 条数据(通常是全部)
  3. ProcessWindowFunction 处理 剩余数据(此时为空或部分)
  4. 状态被清空

缺点:

  • 仍需先缓存 100 条数据 → 内存峰值存在
  • 如果 EvictorTrigger 不匹配,可能导致逻辑错误

c、最佳实践总结

场景 推荐方案
能用 sum/count/max 等简单聚合 ➤ 用 .sum().reduce().aggregate()
需要复杂逻辑,但可增量计算 ➤ 自定义 AggregateFunction
必须用 ProcessWindowFunction ➤ 配合 CountEvictor 清理数据
高吞吐、长期运行 绝对避免 缓存原始数据
// 安全:状态恒定,无增长风险
stream
  .keyBy("userId")
  .window(GlobalWindows.create())
  .trigger(CountTrigger.of(1000))
  .reduce(
      (order1, order2) -> new Order(order1.userId, order1.amount + order2.amount),
      (key, window, reduced, out) -> out.collect(reduced)
  )
  .print();

2、时间窗口

  • 滚动窗口认为:“世界是周期性的”,每 N 秒就该结算一次,不管数据是否连续。
  • 滑动窗口认为:“我们需要更细腻的观察”,每隔一小段时间,就回看最近一段历史。
  • 会话窗口则说:“别预设节奏,让数据自己说话”——只有当行为中断足够久,才视为一个会话结束
类型 特点 适用场景 是否重叠
Tumbling Window(滚动窗口) 固定大小、不重叠 每分钟统计订单量
Sliding Window(滑动窗口) 固定大小、可重叠 每10秒统计最近1分钟活跃用户
Session Window(会话窗口) 基于事件间隔动态划分 用户行为会话分析 否(但长度可变)

1)三种类型

a、Tumbling Window(滚动窗口)

  • 固定时间长度(如 1 分钟)
  • 窗口之间无缝衔接,无重叠
  • 每个事件只属于一个窗口
[00:00–00:01) [00:01–00:02) [00:02–00:03) ...

典型场景

  • 每分钟支付成功订单数
  • 每小时日志错误计数
  • TPSTP999

b、Sliding Window(滑动窗口)

  • 窗口长度(size滑动步长(slide 可不同
  • slide < size → 窗口重叠
  • 每个事件可能属于多个窗口
窗口长度=1分钟,滑动步长=10秒:
[00:00–00:01) → 输出 at 00:00:10
[00:00:10–00:01:10) → 输出 at 00:00:20
[00:00:20–00:01:20) → 输出 at 00:00:30
...

典型场景

  • 实时监控:10 秒更新“过去1分钟 CPU 使用率”
  • 风控:5 秒检查“最近 30 秒异常登录次数”

c、Session Window(会话窗口)

  • 无固定边界,由事件之间的空闲时间(gap 决定
    • 开始时间 = 该会话中第一个事件的时间
    • 结束时间 = 该会话中最后一个事件的时间 + gap
  • 如果两个事件间隔 > gap → 切分为两个会话
  • 窗口长度动态变化
事件时间:10:00, 10:02, 10:03, 10:08, 10:15
gap = 5分钟 → 
会话1: [10:00 – 10:08] (因为 10:03 到 10:08 间隔=5min,刚好断开)
会话2: [10:15 – ...]

2)窗口函数

窗口只是“划定了范围”,真正要回答的是:如何高效、准确地从这个范围里算出结果?

模式 计算方式 状态存储 功能能力
增量聚合(ReduceFunction / AggregateFunction 来一条算一条,只存中间结果 极小(O(1)) 只能做可增量聚合的操作(如 sum、count、avg)
全量处理(ProcessWindowFunction 等窗口结束,一次性拿到所有数据 很大(O(N)) 任意逻辑(排序、取 TopK、机器学习)
增量 + 全量(组合使用) 先增量聚合,再把聚合结果交给 ProcessWindowFunction 小(O(1)) 在高效基础上增加上下文(如加窗口信息、调用外部服务)

a、增量聚合:ReduceFunctionAggregateFunction

  • 原理:每来一个元素,就和当前聚合状态合并,永远只保留一个中间值
  • 优点:内存占用极低,适合超大窗口或高吞吐流。
  • 限制:只能用于满足结合律和交换律的操作(如 sum、min、max、count、average)。

典型场景:计算每分钟总订单金额

stream
  .keyBy("userId")
  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
  .reduce((order1, order2) -> new Order(order1.amount + order2.amount))
  .print();



-- 等价于
.reduce((ReduceFunction<Order>) (order1, order2) -> 
    new Order(order1.amount + order2.amount)
)

b、全量处理:ProcessWindowFunction

  • 原理:窗口关闭时,把所有缓存的元素Iterable<T> 形式传给你,你可以任意处理。
  • 优点:功能最强大,可做排序、过滤、复杂逻辑。
  • 缺点必须把整个窗口的数据存在状态后端,窗口越大越容易 OOM
  • 警告:如果 30 秒内有 100 万条请求,这段代码会尝试把 100 万个 Long 加载到内存!

典型场景:计算 TP999、输出窗口内所有异常日志

stream
  .keyBy("service")
  .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
  .process(new ProcessWindowFunction<LatencyEvent, String, String, TimeWindow>() {
      @Override
      public void process(String key, Context ctx, Iterable<LatencyEvent> elements, Collector<String> out) {
          List<Long> latencies = new ArrayList<>();
          for (LatencyEvent e : elements) {
              latencies.add(e.latencyMs);
          }
          Collections.sort(latencies);
          int idx = (int) (0.999 * latencies.size());
          long tp999 = latencies.get(idx);
          out.collect("Service=" + key + ", TP999=" + tp999 + "ms");
      }
  });

c、组合模式:增量预聚合 + ProcessWindowFunction 后处理

  • 原理:先用 AggregateFunction 做高效增量聚合,窗口触发时,把聚合结果(而非原始数据)传给 ProcessWindowFunction
  • 优点:既享受增量聚合的低开销,又能访问窗口元信息(如窗口开始/结束时间)、调用外部系统、格式化输出。
  • 这是生产中最推荐的“黄金组合”

典型场景:计算每分钟平均延迟,并附带窗口时间戳写入数据库

  • 状态只存 (sum, count),不是百万条原始数据
  • 输出包含窗口时间,便于下游对齐
  • 可在 enricher 中调用 HTTP API、写 Kafka
// 1. 定义增量聚合:计算 (sum, count)
AggregateFunction<LatencyEvent, Tuple2<Long, Integer>, Double> avgAgg = new AggregateFunction<...>() {
    public Tuple2<Long, Integer> createAccumulator() { return Tuple2.of(0L, 0); }
    public Tuple2<Long, Integer> add(LatencyEvent e, Tuple2<Long, Integer> acc) {
        return Tuple2.of(acc.f0 + e.latencyMs, acc.f1 + 1);
    }
    public Double getResult(Tuple2<Long, Integer> acc) {
        return acc.f1 == 0 ? 0.0 : (double) acc.f0 / acc.f1;
    }
    public Tuple2<Long, Integer> merge(...) { ... }
};

// 2. 定义后处理:加上窗口信息
ProcessWindowFunction<Double, MetricRecord, String, TimeWindow> enricher = new ProcessWindowFunction<...>() {
    public void process(String key, Context ctx, Iterable<Double> averages, Collector<MetricRecord> out) {
        Double avg = averages.iterator().next(); // 只有一个值
        TimeWindow w = ctx.window();
        out.collect(new MetricRecord(key, avg, w.getStart(), w.getEnd()));
    }
};

// 3. 组合使用
stream
  .keyBy("service")
  .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
  .aggregate(avgAgg, enricher); // ← 关键:两个参数!

d、选择窗口函数处理方式的决策树

  1. 你的聚合操作是否可增量?sum/count/avg/min/max → 是;TP99/topN → 否)
    • 是 → 优先用 ReduceFunction / AggregateFunction
    • 否 → 进入第 2 步
  2. 窗口内数据量是否可控?(< 10万条?)
    • 是 → 用 ProcessWindowFunction
    • 否 → 改用近似算法(T-Digest, HyperLogLog)封装成 AggregateFunction
  3. 是否需要窗口元信息或复杂输出?
    • 是 → 用 组合模式aggregate(AggFunc, ProcessFunc)

3)窗口类型选择建议

a、滚动窗口

核心特点:需要按固定时间单位做统计、汇总、计费、告警的场景

  • 电商大促实时看板
    • 需求:每分钟展示成交订单数和 GMV(成交总额)
    • 实现:window(TumblingEventTimeWindows.of(Time.minutes(1)))
    • 为什么合适?大促节奏以分钟为单位对齐运营动作,不需要跨分钟关联。
  • API 网关监控 TPS
    • 需求:每秒统计 API 调用量,用于限流或容量评估
    • 实现:window(TumblingProcessingTimeWindows.of(Time.seconds(1))).count()
    • 为什么合适?TPS 本身就是“每秒”的定义,天然匹配滚动窗口。
  • 日志平台错误率日报
    • 需求:每天凌晨生成前一天的系统错误日志占比报告
    • 实现:window(TumblingEventTimeWindows.of(Time.days(1)))
    • 为什么合适?对账、计费、合规类任务要求严格按自然日划分,不能重叠或遗漏。

b、滑动窗口

核心特点:窗口重叠、高频更新、平滑趋势

  • 风控系统中的短时异常检测
    • 需求:用户在最近 5 分钟内登录失败超过 3 次,就触发二次验证
    • 实现:滑动窗口(5 分钟窗口,每 30 秒检查一次)
    • 为什么合适?攻击可能发生在任意时刻,必须高频扫描最近行为,不能等到整点才判断。
  • IoT 设备心跳健康度
    • 需求:每 1 分钟检查过去 10 分钟内设备是否上报过心跳,否则标记为离线
    • 实现:window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
    • 为什么合适?设备可能在任意时间断连,需持续监控最近 10 分钟状态,而非只看整点区间。

c、会话窗口

核心特点:由数据驱动、窗口长度不固定、基于静默期分割,分析用户/设备的一次完整“活动周期”或“交互会话”**

  • 用户行为分析(App 使用会话)
    • 需求:统计用户单次使用 App 的时长、页面浏览数、点击次数
    • 实现:window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    • 为什么合适?用户可能打开 App 后操作 3 分钟,退出;1 小时后再回来——这是两次独立会话,不能合并。
  • 游戏对局行为聚合
    • 需求:分析玩家在一局 MOBA 游戏中的技能释放频率、死亡次数
    • 实现:以“开始匹配”为起点,“结算界面”为终点,中间事件间隔通常 < 5 分钟 → gap=5min
    • 为什么合适?一局游戏是天然的行为单元,gap 能自动区分不同对局。
  • 智能硬件在线会话统计
    • 需求:计算扫地机器人每次清扫任务的运行时长和电量消耗
    • 实现:设备启动后持续上报状态,任务结束停止上报 → gap=2 分钟可分割任务
    • 为什么合适?任务时长不确定(20~60 分钟),无法用固定窗口;会话窗口自动适配实际行为。

4)案例:每分钟统计每个用户的点击次数

window() 必须在 keyBy() 之后调用!因为窗口是key 独立维护的(每个用户有自己的窗口)。

stream
  .keyBy(...)                 // 1. 按 key 分组(必须!)
  .window(...)                // 2. 定义窗口类型
  .allowedLateness(...)       // 3. (可选)允许迟到数据
  .reduce/process/fold/apply  // 4. 聚合函数

a、定义事件类

public static class ClickEvent {
    public String userId;
    public long eventTime; // 毫秒时间戳

    public ClickEvent(String userId, long eventTime) {
        this.userId = userId;
        this.eventTime = eventTime;
    }

    @Override
    public String toString() {
        return userId + " clicked at " + eventTime;
    }
}

b、生成带 Watermark 的流

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000);

// 模拟乱序点击流
DataStream<ClickEvent> clicks = env.fromCollection(Arrays.asList(
    new ClickEvent("user1", 1700000000000L), // 2023-11-14 12:26:40
    new ClickEvent("user1", 1700000000005L),
    new ClickEvent("user2", 1700000000010L),
    new ClickEvent("user1", 1700000000002L)  // 迟到事件(早于上一条)
));

// 分配时间戳 + 生成 Watermark(允许5秒乱序)
WatermarkStrategy<ClickEvent> strategy = WatermarkStrategy
    .<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, ts) -> event.eventTime);

DataStream<ClickEvent> withWm = clicks.assignTimestampsAndWatermarks(strategy);

c、定义滚动窗口并聚合

// 每1分钟窗口,统计每个用户的点击次数
DataStream<String> result = withWm
    .keyBy(event -> event.userId)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new CountAggregate(), new FormatResult());

result.print();
env.execute("User Click Count per Minute");

d、实现聚合函数

// 累加器:只存计数
public static class CountAggregate implements AggregateFunction<ClickEvent, Long, Long> {
    @Override
    public Long createAccumulator() { return 0L; }
    @Override
    public Long add(ClickEvent event, Long acc) { return acc + 1; }
    @Override
    public Long getResult(Long acc) { return acc; }
    @Override
    public Long merge(Long a, Long b) { return a + b; }
}

// 输出格式化
public static class FormatResult implements WindowFunction<Long, String, String, TimeWindow> {
    @Override
    public void apply(String userId, TimeWindow window, Iterable<Long> counts, Collector<String> out) {
        long count = counts.iterator().next();
        String start = new Timestamp(window.getStart()).toString();
        String end = new Timestamp(window.getEnd()).toString();
        out.collect(userId + " clicked " + count + " times in [" + start + ", " + end + ")");
    }
}

e、处理迟到事件

问题:即使有 Watermark,仍可能有事件在窗口关闭后才到达(如网络极端延迟)。

1)allowedLateness(duration)

  • 窗口关闭后,继续等待指定时间(如 10 秒)
  • 期间到达的迟到事件仍参与计算,并触发窗口再次输出
  • 适合少量迟到且需更新结果的场景

2)sideOutputLateData(tag)

  • 超时迟到事件输出到旁路流(side output)
  • 主流不受影响,旁路流可单独处理(如写入告警日志)
OutputTag<ClickEvent> lateTag = new OutputTag<ClickEvent>("late-clicks") {};

SingleOutputStreamOperator<String> mainStream = withWm
    .keyBy(e -> e.userId)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .allowedLateness(Time.seconds(10))      // 允许额外等10秒
    .sideOutputLateData(lateTag)            // 超过10秒的走旁路
    .aggregate(new CountAggregate(), new FormatResult());

// 获取迟到流
DataStream<ClickEvent> lateStream = mainStream.getSideOutput(lateTag);
lateStream.print("LATE"); // 输出前缀 "LATE> ..."

mainStream.print("MAIN");

执行流程

  1. Watermark 到达 12:01:00 → 触发 [12:00, 12:01) 窗口计算(第一次输出)
  2. 12:01:05 到达一个迟到事件(eventTime=12:00:50)→ 重新计算并输出(第二次)
  3. 12:01:15 到达另一个迟到事件(eventTime=12:00:40)→ 因超过 allowedLateness,进入 lateStream

七、状态管理与容错

1、为什么需要状态

  • mapfilter无状态的:输出只依赖当前输入。
  • 但很多场景需要记住历史
    • “用户累计点击数”
    • “过去5分钟最大温度”
    • “去重:该 ID 是否已处理过?”
  • 解决方案:有状态计算Flink 允许算子在内存/磁盘中保存中间数据,并在故障时自动恢复

2、状态类型与原理

Flink 提供三种原生状态接口(均支持 Checkpoint 自动持久化):

状态类型 存储结构 适用场景 性能特点
ValueState 单值 最新值存储 高性能,低内存
ListState 列表 事件历史记录 中等性能,中等内存
MapState Map 嵌套映射 高性能,RocksDB优化
ReducingState 单值(聚合) 增量聚合 高性能,低内存
AggregatingState 单值(聚合) 复杂聚合 高性能,需自定义聚合

3、Rich Functions:访问状态的入口

普通函数(如 MapFunction)无法访问状态。

必须使用 Rich Function(如 RichMapFunction),它提供:

  • open():初始化状态
  • getRuntimeContext().getState(...):获取状态句柄
  • close():清理资源

案例:统计每个单词出现次数(带状态)

public static class WordCountFunction extends RichMapFunction<String, Tuple2<String, Long>> {
    private ValueState<Long> countState;

    @Override
    public void open(Configuration parameters) {
        // 定义状态描述符:名称 + 类型
        ValueStateDescriptor<Long> descriptor =
            new ValueStateDescriptor<>("word-count", Long.class);
        countState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public Tuple2<String, Long> map(String word) throws Exception {
        Long current = countState.value();
        if (current == null) current = 0L;
        current += 1;
        countState.update(current); // 更新状态
        return Tuple2.of(word, current);
    }
}

// 使用
stream.keyBy(word -> word)
      .map(new WordCountFunction())
      .print();

4、状态 TTL:自动清理过期状态

问题:

  • 状态会无限增长(如百万用户,每人一个计数器)
  • 长期不用的状态浪费内存

解决方案:状态 TTL:自动为状态设置过期时间,到期后自动删除或隐藏。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(1)) // 过期时间 = 1天
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 何时更新TTL?
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 过期后是否可见?
    .cleanupInRocksDBCompactFilter(1000) // RocksDB后台清理(可选)
    .build();

参数详解:

配置项 选项 说明
UpdateType OnCreateAndWrite 创建或写入时重置TTL(推荐)
  OnReadAndWrite 读取也会延长TTL(慎用!)
StateVisibility NeverReturnExpired 过期后返回 null(安全)
  ReturnExpiredIfNotCleanedUp 可能返回过期值(性能高)

案例:带 TTL 的用户活跃状态

  • 2小时内无新事件的用户,状态自动过期,不再占用内存。
public static class ActiveUserFunction extends RichFlatMapFunction<String, String> {
    private ValueState<Boolean> activeState;

    @Override
    public void open(Configuration conf) {
        StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.hours(2))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build();

        ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("active-flag", Boolean.class);
        descriptor.enableTimeToLive(ttlConfig);

        activeState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(String userId, Collector<String> out) {
        activeState.update(true); // 标记用户活跃,并重置TTL
        out.collect(userId + " is active now");
    }
}

5、状态与容错:Checkpoint 机制

  • Flink 定期将状态快照(Snapshot)写入持久化存储(如 HDFS、S3)

  • 故障时从最近 Checkpoint 恢复整个作业状态

  • 保证 Exactly-Once 语义 开启 Checkpoint

    env.enableCheckpointing(5000); // 每5秒一次
    

6、窗口和状态

窗口是“短期记忆”,状态是“长期记忆”

  • 窗口 = 固定时间/数量范围内的临时聚合
  • 状态 = 跨越任意时间、自定义生命周期的持久记忆

1)核心区别:生命周期与控制粒度

总结对比

  • 简单聚合 → 优先用 窗口
  • 复杂逻辑(如自定义触发条件)→ 用 ProcessFunction + 状态
功能 窗口(Window 状态(State
何时创建 数据进入窗口时自动创建 你手动在 open() 中定义
生命周期 窗口结束 + allowedLateness 超时后自动销毁 永不销毁(除非你设 TTL 或作业停止)
你能控制吗? 不能 — Flink 全权管理 能 — 你决定存什么、何时更新、何时清空
跨窗口共享? 不能 — 每个窗口独立 能 — 状态贯穿整个 key 的生命周期
目的 限定聚合范围 记住历史信息
API 层级 高层(声明式) 底层(过程式)
典型场景 每分钟统计 累计计数、去重
容错 通过 Checkpoint 自动恢复 通过 Checkpoint 自动恢复

2)为什么窗口不够用

a、场景 1:累计点击数(从用户注册到现在)

  • 需求:每个用户从首次使用 App 到现在的总点击次数。

  • 能用窗口吗?不能!因为:

    • 窗口是“分段”的(如每分钟),无法跨窗口累加。
    • 即使你用超大窗口(如 10 年),也无法处理“未来新数据”。
  • 正确做法:用 ValueState<Long>

    // 每次点击都读取旧值 +1,再写回
    Long total = state.value() == null ? 0L : state.value();
    state.update(total + 1);
    

    状态会一直保留,第 1 天、第 100 天的数据都能累加!

b、场景 2:去重(全局唯一 ID

  • 需求:过滤掉已经处理过的订单 ID(防止重复支付)。

  • 能用窗口吗?不能!因为:

    • 重复订单可能相隔几天、几个月。
    • 窗口只能记住“最近 1 小时”的 ID,过期就忘了。
  • 正确做法:用 MapState<String, Boolean>ValueState + 布隆过滤器

    if (seenState.value(orderId) == null) {
        seenState.update(orderId, true); // 标记为已见
        output.collect(order);
    }
    // 否则丢弃(重复)
    

    只要状态没过期(或设 TTL=30天),就能防重!

c、场景 3:自定义触发逻辑(非固定时间)

  • 需求:当某个传感器温度连续 5 次超过 100°C 才告警。

  • 能用窗口吗?很难!因为:

    • 不是按时间,而是按“事件次数”触发。
    • 滑动窗口无法精确控制“连续 5 次”。
  • 正确做法:用 ListState<Double> 记录最近 5 个值

    List<Double> history = new ArrayList<>(listState.get());
    history.add(currentTemp);
    if (history.size() > 5) history.remove(0);
    listState.update(history);
      
    if (history.stream().allMatch(t -> t > 100)) {
        alert("Overheat!");
    }
    

    完全由你控制逻辑,不受窗口限制!

d、场景 4:会话超时但需保留历史画像

  • 需求:用户会话结束后,仍要保留其“偏好标签”用于下次推荐。
  • 窗口能做到吗?不能!Session Window 结束后,内部状态立即销毁。
  • State 能做到吗?能!你可以:
    • 在会话窗口中计算标签 → 写入全局 MapState
    • 下次用户回来,直接读取历史标签

3)什么时候用窗口?什么时候用 State

需求特征 推荐方案
固定时间/数量范围内的聚合 (如“每分钟错误数”) 窗口(Tumbling/Sliding
需要跨窗口、长期累积 (如“用户总消费金额”) Keyed State(状态属于哪个用户/设备/订单)
去重、缓存、历史记录 State(配合 TTL
复杂事件序列检测 (如“登录失败3次后成功”) ProcessFunction + State
简单统计 + 高性能 窗口(底层也用 State,但封装好了)

八、事件驱动的应用

事件驱动应用 = 系统行为由“外部事件”触发,而非定时轮询或批处理。

FlinkProcessFunction 正是构建高性能事件驱动系统的基石。**

  • 实时响应:事件一到,立刻处理
  • 状态感知:能记住历史(如用户是否登录过)
  • 时间敏感:能处理“延迟事件”或“超时未发生”
  • 高吞吐低延迟:适合流式场景

1、ProcessFunction 是什么?—— 核心能力拆解

ProcessFunctionFlink 提供的一个底层、灵活、有状态的流处理函数,它让你:

  1. 逐条处理事件processElement()
  2. 在未来的某个时间点自动执行逻辑onTimer() + TimerService
  3. 记住历史信息 → 访问 Keyed State
  4. 区分事件时间 vs 处理时间 → 支持两种 Timer

它实现了 RichFunction 接口,因此拥有:

  • open() / close() 生命周期
  • getRuntimeContext() → 获取状态、并行度等
stream
    .keyBy(...)
    .window(...)
    .allowedLateness(Time.seconds(10))
    .process(...);

1)核心方法详解

a、processElement

  • 调用时机:每来一条输入事件,就调用一次
  • 参数说明
    • value:当前事件
    • ctx:上下文对象(可获取时间戳、keyTimerService
    • out:输出结果(可多次调用)
@Override
public void processElement(LoginEvent event, Context ctx, Collector<String> out) {
    long eventTime = ctx.timestamp();          // 事件时间戳
    String userId = ctx.getCurrentKey();        // 当前 key(需 keyBy 后才有)
    TimerService timer = ctx.timerService();   // 获取定时器服务

    // 注册一个 10 分钟后的事件时间定时器
    timer.registerEventTimeTimer(eventTime + 10 * 60 * 1000);
}

b、 onTimer

  • 调用时机:当注册的 Timer 到期时自动触发
  • 用途
    • 超时检测(如“15分钟未支付”)
    • 延迟计算(如“等待所有数据到齐再输出”)
    • 状态清理(如“会话结束清空缓存”)
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
    String userId = ctx.getCurrentKey();
    out.collect("⏰ Timeout for user: " + userId);

    // 可在此清理状态
    myState.clear();
}

2)两种时间语义的 Timer

Flink 支持两种时间模型,ProcessFunction 都能处理:

推荐:优先使用事件时间 Timer,保证结果可重现(Exactly-Once)。

Timer 类型 注册方法 触发依据 适用场景
事件时间(Event Time) registerEventTimeTimer(ts) Watermark ts 乱序流、精确窗口、业务时间
处理时间(Processing Time) registerProcessingTimeTimer(ts) 系统时钟 ≥ ts 监控告警、心跳检测、简单超时

1)对比示例:

// 事件时间 Timer:基于业务时间(可靠,但可能延迟)
ctx.timerService().registerEventTimeTimer(event.eventTime + 30 * 60 * 1000);

// 处理时间 Timer:基于机器时间(立即,但不精确)
ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 30_000);

3)为什么说它是“事件驱动应用的基础

因为 ProcessFunction 实现了 “事件 → 状态 → 时间 → 动作” 的完整闭环:

image-20251204194938608

4)为什么 ProcessFunction 如此重要

ProcessFunction = Flink 的“操作系统内核”高级 API(如窗口、CEP)都是基于它构建的。

维度 说明
灵活性 可实现任意逻辑,不受窗口限制
精确性 支持事件时间,保证结果正确
状态管理 与 Flink State 深度集成,容错无忧
时间控制 Timer 机制实现超时、延迟、调度
扩展性 支持多流、广播、侧输出等高级模式

5)ProcessFunction 的家族成员

Flink 提供多种 ProcessFunction 变体,适应不同场景:

类型 说明 使用前提
ProcessFunction<IN, OUT> 最基础版本 无需 keyBy
KeyedProcessFunction<K, IN, OUT> 最常用!支持 Keyed State 和 key 上下文 必须 keyBy()
CoProcessFunction<IN1, IN2, OUT> 处理两个输入流(如主流 + 维表流) 用于 connect()
BroadcastProcessFunction<IN, BCAST, OUT> 处理广播流 + 普通流 用于动态规则更新
KeyedBroadcastProcessFunction 广播流 + Keyed State 复杂规则引擎

6)典型事件驱动场景:

场景 如何用 ProcessFunction 实现
用户会话分析 事件到来更新活跃时间,注册 30 分钟后 Timer,超时则结束会话
订单超时取消 下单时注册 Timer,支付则取消 Timer,超时则自动取消订单
异常行为检测 记录失败次数,连续 3 次失败后锁定账户
延迟数据补偿 等待 5 分钟,若无新数据则输出默认值

7)完整示例:用户活跃会话检测(事件时间 + State + Timer

需求:用户30分钟无活动,视为会话结束,输出会话时长。

public class SessionTimeoutFunction 
    extends KeyedProcessFunction<String, UserEvent, String> {

    // 状态:记录会话开始时间
    private ValueState<Long> sessionStartTime;

    @Override
    public void open(Configuration parameters) {
        sessionStartTime = getRuntimeContext()
            .getState(new ValueStateDescriptor<>("start-time", Long.class));
    }

    @Override
    public void processElement(UserEvent event, Context ctx, Collector<String> out) throws Exception {
        long currentTime = event.timestamp;

        // 如果是新会话(状态为空 或 距离上次 > 30分钟)
        Long start = sessionStartTime.value();
        if (start == null || currentTime - start > 30 * 60 * 1000) {
            // 开启新会话
            sessionStartTime.update(currentTime);
        }

        // 每次事件都更新:注册一个 30 分钟后的 Timer
        ctx.timerService().registerEventTimeTimer(currentTime + 30 * 60 * 1000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        Long start = sessionStartTime.value();
        if (start != null) {
            long duration = timestamp - start;
            out.collect(ctx.getCurrentKey() + " session ended, duration: " + duration / 1000 + "s");
            sessionStartTime.clear(); // 清理会话
        }
    }
}

2、StateWindowProcessFunction核心三件套

Window 内部就是用 ProcessFunction + State 实现的!

  • StateTime 是地基(所有有状态计算的基础)
  • ProcessFunction 是框架(直接使用地基构建任意逻辑)
  • Window 是预制房(在 ProcessFunction 上封装的常见模式)
        ┌───────────────────────┐
        │   高层 API(易用)     │ ← Window(每分钟统计)
        ├───────────────────────┤
        │   中层抽象             │ ← CEP(匹配事件序列)
        ├───────────────────────┤
        │   底层核心(灵活)     │ ← ProcessFunction(完全控制)
        ├───────────────────────┤
        │   基础能力              │ ← State(记忆) + Time(时间)
        └───────────────────────┘

1)他们的关系

关系 说明
Window 依赖 State 每个窗口的累加器(如 count)都存在 ValueState
Window 依赖 Timer Watermark 触发窗口计算,本质是 onTimer() 调用
ProcessFunction 可以实现 Window 你可以用 ProcessFunction 手动模拟 Tumbling Window
State 不能直接做聚合 它只是存储,需要你写逻辑(而 Window 已封装好)
Window 不能做任意逻辑 它只支持 reduce/aggregate/apply,无法实现状态机

2)Window 其实是 ProcessFunction 的封装

// 伪代码:WindowOperator 的核心逻辑
class WindowOperator extends KeyedProcessFunction<...> {
    private MapState<TimeWindow, Accumulator> windowStates; // 窗口状态

    @Override
    public void processElement(Event e, Context ctx, Collector out) {
        // 1. 将事件分配给多个窗口(Sliding)
        for (TimeWindow w : assignWindows(e)) {
            // 2. 更新该窗口的状态
            Accumulator acc = windowStates.get(w);
            if (acc == null) acc = createAcc();
            acc = add(e, acc);
            windowStates.put(w, acc);

            // 3. 注册窗口结束时间的 Timer
            ctx.timerService().registerEventTimeTimer(w.getEnd());
        }
    }

    @Override
    public void onTimer(long ts, OnTimerContext ctx, Collector out) {
        // 4. Timer 触发:输出窗口结果
        TimeWindow w = findWindowByEnd(ts);
        Accumulator acc = windowStates.get(w);
        out.collect(emit(acc));
        windowStates.remove(w); // 清理状态
    }
}

3)什么时候用哪个?—— 决策树

image-20251204200028853

需求 推荐 原因
每5分钟统计 PV Window 标准场景,高性能
用户累计积分 ProcessFunction + ValueState 需要长期记忆
订单15分钟未支付告警 ProcessFunction + Timer 需要超时触发
去重(ID 全局唯一) ProcessFunction + State + TTL 窗口无法跨天去重

4、FQA

1)aggregate()process()

其实 aggregate() 也是用 ProcessFunction 实现的

  • aggregate()WindowedStream 的聚合方法(高层 API)
  • process()KeyedStream / ConnectedStreams 的底层处理函数(低层 API)
维度 window().aggregate(...) .process(new ProcessFunction())
所属流类型 WindowedStream<T>(窗口流) KeyedStream<T, K>ConnectedStreams
API 层级 高层(声明式) 底层(过程式)
主要用途 对窗口内数据做增量聚合(如 sum/count/avg) 完全自定义逻辑(状态 + Timer + 多输出)
是否自动触发 是(窗口结束时自动调用) 否(每条事件都调用 processElement
能否访问 Timer 不能 能(通过 Context.timerService()
能否访问原始事件序列 只能访问聚合结果 能(逐条处理)
性能 ⚡ 更高(增量聚合,不缓存全量数据) 灵活但可能更低(需自己管理状态)

a、典型使用场景对比

场景 1:每5分钟统计用户点击次数

推荐用 aggregate()

  • 优点:简洁、高效、自动清理
  • 缺点:只能输出聚合结果,无法做额外逻辑(如告警)
stream
    .keyBy(event -> event.userId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new CountAggregate()) // 只需实现 create/add/getResult
    .print();

场景 2:用户 30 分钟无活动则告警(会话超时)

必须用 process()

  • 原因:需要跨窗口记忆、注册 Timer、动态判断
  • 窗口做不到:因为“会话”边界由数据决定,不是固定时间
stream
    .keyBy(event -> event.userId)
    .process(new SessionTimeoutFunction()) // 自定义状态 + Timer
    .print();

场景3:先聚合降噪,再用 ProcessFunction 做智能决策 —— 这是常见架构!

// 先用 window + aggregate 做高效聚合
DataStream<ClickStats> stats = clicks
    .keyBy(user)
    .window(...)
    .aggregate(new ClickAgg());

// 再对聚合结果做复杂处理
stats
    .keyBy(stats -> stats.userId)
    .process(new AlertOnSpikesFunction()) // 比如突增告警
    .print();

b、如何选择?—— 决策指南

  • 窗口场景 → 优先考虑 aggregate() / reduce()
  • 非窗口 or 复杂逻辑 → 用 process()
你的需求 用哪个? 理由
固定窗口内的 sum / count / avg aggregate() 高层模块,简洁高效
只想快速出结果,逻辑简单 aggregate() 少写代码,少出
需要访问每条原始事件 process() aggregate() 只见聚合结果
需要 Timer(超时/延迟) process() aggregate() 无法注册 Timer
需要 Side Output(多路输出) process() aggregate() 只能单输出
需要跨窗口状态(如累计值) process() 窗口状态不跨窗口共享

ContactAuthor