前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

一、核心参数配置

1、运行参数

# flink-conf.yaml 核心配置
state.backend: rocksdb
state.checkpoints.dir: hdfs:///prod/checkpoints
state.savepoints.dir: hdfs:///prod/savepoints

# 检查点策略
execution.checkpointing.interval: 60s       # 1分钟(金融场景)
execution.checkpointing.timeout: 180s       # 3倍间隔
execution.checkpointing.min-pause: 30s      # 两次检查点最小间隔
execution.checkpointing.tolerable-failed-checkpoints: 3  # 允许3次失败

# RocksDB 优化
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.fixed-per-slot: 1024mb
state.backend.incremental: true             # 增量检查点

# 作业级别配置(代码中)
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig()
   .setMinPauseBetweenCheckpoints(30000)
   .setCheckpointTimeout(180000)
   .setMaxConcurrentCheckpoints(1)
   .enableExternalizedCheckpoints(
        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

1)检查点 (Checkpoint) - 开关配置策略

Flink 会定期将整个作业的当前状态(算子状态、源位置等)保存到一个持久化存储中(如 HDFS,S3)。如果作业发生故障,可以从最近一次成功的检查点恢复,保证数据处理的准确性。

  • 何时开启
    • 必须开启
      • 有状态计算(窗口聚合、会话分析、机器学习特征工程)
      • 要求 Exactly-Once 语义的关键业务(金融交易、计费系统)
    • 可关闭:无状态 ETL 作业(简单过滤、格式转换),但建议始终开启
  • 生产环境建议始终开启,这是 Flink 容错基石
  • 性能影响:开启后吞吐量通常下降 10-30%,但换来的是数据准确性保障

2)检查点模式 (Checkpoint Mode) - 精确度选择策略

  • 含义:指定检查点的精确度级别。
  • 作用
    • Exactly-Once (精确一次):这是最严格的模式。它确保每条输入数据在最终结果中只被处理一次。这需要与支持事务的外部系统(如 Kafka Sink)配合使用,并且对性能有一定影响。
    • At-Least-Once (至少一次):这是更宽松的模式。它保证每条数据至少被处理一次,但可能会因为网络问题或重试导致某些数据被处理多次。通常用于对重复数据不敏感的场景,性能比 Exactly-Once 更好。
  • 选择建议
    • 如果您的业务逻辑要求绝对精确(例如金融交易),请选择 Exactly-Once
    • 如果可以容忍少量重复处理(例如日志统计),可以选择 At-Least-Once
  • 建议:即使选择 At-Least-Once,也要在业务层做去重(如使用 event_id 去重表)

3)检查点间隔 (Checkpoint Interval) - 时间窗口配置策略

  • 含义:定义 Flink 触发下一次检查点的时间间隔。
    • interval 是“理想节奏”,min-pause 是“强制休息”。
    • 没有 min-pauseFlink 会在检查点完成后“兴奋地”立刻开始下一次,完全无视你设的 interval
  • 作用:控制容错恢复的粒度和开销。
  • 权衡
    • 间隔短 (如 2分钟):恢复时丢失的数据量少,恢复时间短,但频繁的检查点会增加 I/O 开销,可能影响作业吞吐量。
    • 间隔长 (如 10分钟):I/O 开销小,吞吐量高,但恢复时可能丢失更多数据,恢复时间更长。
  • 选择建议:根据可接受的数据丢失窗口(Data Loss Window)来设定。
    • 例如,如果能容忍最多 2 分钟的数据丢失,则设为 2 分钟是合理的。
  • 核心公式: 最大可容忍数据丢失 = 检查点间隔 × 故障频率

  • 典型场景配置

    业务场景 建议间隔 依据
    金融风控 30-60秒 每分钟损失可能达数万元
    用户行为分析 2-5分钟 分钟级精度可接受
    IoT 设备监控 1-2分钟 设备状态需及时更新
    日志分析 5-10分钟 可容忍较大数据丢失

a、有 min-pause 的情况( 安全):

时间 事件 计算逻辑
10:00:00 触发 #1 初始触发
10:00:20 #1 完成 耗时 20s
10:01:00 触发 #2 max(10:00:00+60s, 10:00:20+30s) = max(10:01:00, 10:00:50) = 10:01:00
10:01:20 #2 完成 耗时 20s
10:02:00 触发 #3 max(10:01:00+60s, 10:01:20+30s) = max(10:02:00, 10:01:50) = 10:02:00

结果:严格每 60s 触发一次,即使检查点很快完成。

b、无 min-pause 的情况( 危险):

min-pause 未设置(或为 0),Flink 的行为是: “上次检查点一完成,就立刻触发下一次!”

时间 事件
10:00:00 触发 #1
10:00:20 #1 完成 → 立即触发 #2
10:00:40 #2 完成 → 立即触发 #3
10:01:00 #3 完成 → 立即触发 #4
10:01:20 #4 完成 → 立即触发 #5

结果:虽然你设置了 interval=60s,但因为检查点只要 20s 就完成,系统每 20s 就做一次检查点!

4)检查点超时时间 (Checkpoint Timeout) - 超时设置策略

  • 含义:定义一个检查点操作从开始到必须完成的最大允许时间。
  • 作用:防止某个检查点因各种原因(如大状态、网络延迟、任务繁忙)而长时间卡住,从而阻塞后续的检查点触发。
  • 行为:如果一个检查点在 6 分钟内未能完成,它会被标记为失败,Flink 会尝试进行下一次检查点。
  • 选择建议:这个值应该大于或等于 检查点间隔,否则检查点会持续失败。对于状态很大的作业,可能需要设置更长的超时时间。
    • 黄金比例超时时间 = 检查点间隔 × 2~3
  • 调整依据

    # 伪代码:根据状态大小估算超时
    state_size_gb = get_current_state_size()
    if state_size_gb < 1:
        timeout = interval * 1.5  # 小状态
    elif state_size_gb < 10:
        timeout = interval * 2.5  # 中等状态
    else:
        timeout = interval * 4    # 大状态(RocksDB)
    
  • 运维技巧:设置监控告警,当 Checkpoint持 续超时 3 次时自动触发作业扩容

5)状态存储类型 (State Storage Type) - 存储引擎选择策略

  • 含义:指定了 Flink 如何存储和管理作业的状态。
  • 作用:不同的后端适用于不同规模和需求的作业。

  • 系统默认:通常指 FsStateBackend 或根据集群配置自动选择。如果您有大状态需求,强烈建议显式选择 RocksDBStateBackend
状态后端 存储架构 变化追踪能力 文件版本控制
MemoryStateBackend 纯内存 Map
FsStateBackend 内存 + 全量文件 无细粒度追踪 全量覆盖
RocksDBStateBackend LSM-Tree 数据库 SST 文件级 文件版本管理

a、MemoryStateBackend (内存)

状态存储在 TaskManagerJVM 内存中。速度快,但容量有限,不适合大状态作业,且重启后状态丢失。

  • 工作方式:每次 Checkpoint 都像用拍立得相机重新拍一张完整照片
  • 为什么不支持增量
    • 状态全部在 JVM 堆内存中
    • 没有内置的”变化检测”机制
    • 无法识别哪些数据是新增/修改的
  • 局限:状态 >1GB 时基本不可用
  • 适用场景:测试环境 • 无状态作业,状态大小 < 1GB

image-20251205161451239

b、FsStateBackend (文件系统)

状态先存储在 TaskManager 内存,然后异步写入分布式文件系统(如 HDFS)。适合中等大小的状态。

  • 工作方式:每次 Checkpoint 都把整个状态打包成一个完整包裹寄出
  • 为什么不支持增量
    • 虽然最终存储在文件系统,但内存中是全量状态
    • 没有版本控制机制来跟踪变化
    • 每次都是全量序列化 + 全量传输
    • 案例:每秒处理 10 万事件,但 只有 10% 的用户被更新,还是会存储10万数据
  • 局限10GB 状态 = 每次 Checkpoint 传输 10GB
  • 适用场景:小状态作业 • 低延迟要求,1-10GB,

image-20251205161522778

c、RocksDBStateBackend (RocksDB)

  • 状态存储在本地磁盘上的 RocksDB 数据库中。可以处理非常大的状态(仅受磁盘限制),支持增量 Checkpoint
  • 工作方式
    • 首次 Checkpoint:上传所有 SST (Sorted String Table) 文件
    • 后续 Checkpoint只上传新生成的 SST 文件
    • 恢复时:组合所有相关的 SST 文件重建完整状态
  • 支持增量:每秒处理 10 万事件,但 只有 10% 的用户被更新,之后会存储10%的数据
  • 技术核心:利用 RocksDBLSM-Tree 架构和文件版本控制
  • 适用场景:生产环境默认 • 大状态作业 • 长时间运行,TB 级

image-20251205161705813

6)状态最大保留时长 (Max State Retention Time) - 存储生命周期策略

  • 含义:定义了 Flink 在成功创建一个新检查点后,旧的、已过期的检查点(即不是最新的那个)可以在持久化存储中保留多长时间。
  • 作用:控制检查点存储空间的占用。
  • 行为
    1. Flink 会周期性地执行检查点。
    2. 当一个新的检查点成功完成时,旧的检查点会变成“过期”状态。
    3. 这个参数决定了这些过期检查点可以存在多久。
    4. 超过此时间后,Flink 会自动清理(删除)这些过期的检查点。
  • 重要性:这是一个关键的空间管理参数。
    • 如果设置得太长,会占用大量存储空间;
    • 如果设置得太短,可能导致无法回滚到较早的检查点进行调试或恢复。
  • 选择建议保留时长 > (检查点间隔 × 3) + 预期恢复时间

  • 分场景建议

    作业重要性 建议保留时长 依据
    核心业务(支付) 24-72小时 支持灾难恢复
    重要业务(推荐) 4-12小时 支持问题回溯
    普通业务(日志) 1-2小时 仅满足基本恢复

7)微批模式 (Micro-Batch Mode) - 性能优化开关策略

  • 含义:是否启用微批处理模式。
  • 作用:这是一种优化模式
    • 将流处理作业的连续数据流分割成一系列小的批次(micro-batches)进行处理。
    • 这可以利用批处理引擎(如 Spark)的优化技术,有时能提高性能。
  • 选择建议:对于大多数标准的流处理场景,保持 OFF 即可。只有在特定的优化需求下才会考虑开启。

  • 何时开启

    • 建议开启
      • Sink 是数据库(MySQL/Postgres)且吞吐 > 1K records/sec
      • 窗口聚合作业(特别是 Tumbling Window)
    • 禁止开启:事件驱动型应用(CEP 复杂事件处理)
  • 配置口诀写库开微批,实时关微批;吞吐是瓶颈,微批救急用

  • 最佳实践配置

    // 开启微批 + 设置批大小/时间
    env.setRuntimeMode(RuntimeExecutionMode.BATCH); // Flink 1.15+
    env.getConfig().setAutoWatermarkInterval(200); // 200ms 触发
      
    // 或旧版本:
    env.enableCheckpointing(60000);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 500ms 微批
    

2、运行参数-配置参考

1)状态存储核心配置

# 状态后端选择:RocksDB (生产环境大状态作业唯一选择)
# ✅ 优势:支持 TB 级状态、增量 Checkpoint、故障恢复快
# ❌ 对比:Memory/Fs 后端在 >10GB 状态时性能急剧下降
state.backend: rocksdb

# 检查点存储路径 (HDFS/S3/阿里云 OSS 等分布式存储)
# 💡 关键:必须使用高可用存储,避免单点故障
# ⚠️  坑:本地路径会导致恢复失败!
state.checkpoints.dir: hdfs:///prod/checkpoints

# 保存点存储路径 (手动触发的 Checkpoint,用于版本升级/回滚)
# 🔒 安全建议:与 checkpoints.dir 分离,避免误删
state.savepoints.dir: hdfs:///prod/savepoints

2)检查点容错策略 (金融级配置)

# 检查点间隔:60秒 (金融场景黄金标准)
# 📊 数据丢失容忍 = 间隔 × 故障频率
#   • 60s: 最多丢失1分钟数据 (金融/支付场景)
#   • 300s: 5分钟 (普通业务)
#   • 600s+: 10分钟+ (日志分析)
execution.checkpointing.interval: 60s

# 检查点超时:180秒 (必须 > 间隔 × 2)
# 🔍 超时行为:超过180秒未完成则标记失败,触发下一次检查点
# 💡 计算公式:超时 = 间隔 × (2 ~ 5) [状态越大系数越高]
execution.checkpointing.timeout: 180s

# 两次检查点最小间隔:30秒
# ⚖️ 作用:防止检查点雪崩(前一个未完成就触发下一个)
# 🌰 示例:间隔60s + 最小暂停30s = 实际触发频率90s
# ✅ 最佳实践:min-pause = interval × 0.5
execution.checkpointing.min-pause: 30s

# 允许连续失败次数:3次
# 🛡️ 容错机制:3次连续失败才会触发作业重启
# ⚠️  风险:设为0会导致单次失败立即重启(大状态作业灾难)
# ✅ 金融场景建议:3-5次;普通场景:2-3次
execution.checkpointing.tolerable-failed-checkpoints: 3

3)RocksDB 深度优化 (性能关键)

# 内存托管模式:true (强烈推荐)
# ✅ 优势:Flink 自动管理 RocksDB 内存,避免 OOM
# ❌ 风险:false 时需手动配置 block-cache/write-buffer
# 📌 适用:所有生产环境
state.backend.rocksdb.memory.managed: true

# 每个 slot 分配的 RocksDB 内存:1024MB
# 💡 调优指南:
#   • 小状态作业 (<10GB): 512-1024MB
#   • 中等状态 (10-100GB): 1024-2048MB  
#   • 超大状态 (>100GB): 2048-4096MB
# ⚠️  限制:总内存 = fixed-per-slot × 并行度,勿超物理内存
state.backend.rocksdb.memory.fixed-per-slot: 1024mb

# 增量检查点:true (TB 级状态必备)
# 🚀 性能提升:I/O 降低 25-100 倍,Checkpoint 时间从分钟级→秒级
# 🔍 原理:只上传新生成的 SST 文件,而非全量状态
# ❌ 限制:首次 Checkpoint 仍是全量
state.backend.incremental: true

4)作业代码级配置

// 启用检查点 (60秒间隔 + Exactly-Once 语义)
// 💡 Exactly-Once:每条数据精确处理一次(金融场景必需)
// ⚠️  性能代价:比 At-Least-Once 慢 15-25%
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);

// 检查点配置对象
env.getCheckpointConfig()
   // 两次检查点最小间隔 (30秒)
   // ✅ 作用:防止 Checkpoint 雪崩(生产环境必需)
   .setMinPauseBetweenCheckpoints(30000)
   
   // 检查点超时 (180秒)
   // 🔗 必须与 flink-conf.yaml 中 timeout 保持一致
   .setCheckpointTimeout(180000)
   
   // 最大并发检查点数:1 (强烈推荐)
   // ⚖️ 权衡:
   //   • 1 = 稳定性优先(推荐生产环境)
   //   • >1 = 吞吐优先(可能增加 I/O 压力)
   .setMaxConcurrentCheckpoints(1)
   
   // 外部化检查点策略:作业取消时保留
   // 🔥 重要性:这是灾难恢复的唯一保障!
   // 📌 两种模式:
   //   • RETAIN_ON_CANCELLATION: 保留(推荐)
   //   • DELETE_ON_CANCELLATION: 删除(测试环境用)
   .enableExternalizedCheckpoints(
        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
   );

enableExternalizedCheckpoints

  • 核心作用:外部化检查点策略:灾难恢复的”最后一根稻草”
    • 作业取消时保留检查点:即使人为停止作业,检查点仍保留在 HDFS/S3
    • 灾难恢复保障:当作业因 Bug 崩溃、集群故障时,可从最近检查点恢复
    • 升级回滚能力:代码升级失败时,可回退到旧版本 + 旧状态
模式 作业正常停止 作业异常崩溃 适用场景 风险
RETAIN_ON_CANCELLATION (保留) 保留检查点 保留检查点 所有生产环境 需手动清理旧检查点
DELETE_ON_CANCELLATION (删除) 删除检查点 保留检查点 仅测试/开发环境 生产环境灾难!

15)总结

  • 检查点路径使用分布式存储 (非本地)
  • interval × 3 ≤ timeout (当前 60×3=180 ✓)
  • 增量检查点已开启 (state.backend.incremental: true)
  • 外部化检查点策略为 RETAIN_ON_CANCELLATION
  • RocksDB 内存配置未超过物理内存
  • 已配置合理的重启策略
  • HDFS/S3 存储空间 > (状态大小 × 保留检查点数量 × 1.5)

3、资源配置

1)管理节点(JM

JM 不执行计算,只做调度,因此资源要求不高。

参数 解释
数量 1 JM 数量,通常为 1 或 2(高可用)
CPU 1核 JM 轻量级,无需高性能 CPU
内存 4GB 足够处理调度逻辑
磁盘 5Gi 存储临时日志等

2)运行节点 (TM)

总并行度 = TM 数量 × 单 TM 槽位数

参数 解释
数量 5 TM 数量 = 并行度 / 单 TM 槽位数
CPU 2核 每个 TM 至少 2 核,避免 I/O 等待
内存 4GB 一般作业足够,大状态需更多
磁盘 5Gi 存储中间结果、状态快照

3)单 TM 槽位数 (Slot)

含义:每个 TM 可以运行的任务数

  • 它是是 逻辑 资源单位,不是 物理隔离
  • Slot 数越多,单 TM 并行能力越强
  • 但过多 Slot 会增加 GC 开销

配置参考:不要超过 CPU 核心数(避免上下文切换)

  • 小作业:1-2 Slot

  • 大作业:4-8 Slot

4)集群参数

  • 含义:状态后端类型
  • 选项
    • filesystem:文件系统存储(HDFS/S3)
    • rocksdbRocksDB 数据库(支持 TB 级状态)
    • memory:内存存储(仅用于测试)
  • 生产建议:大状态作业必须使用 rocksdb

4、资源配置-参考

1)并行度与性能关系

image-20251205165508530

2)槽位数与 GC 的权衡

  • 槽位数多 → 单 TM 并行高 → GC 频繁
  • 槽位数少 → 单 TM 并行低 → GC 减少

3)状态后端选择策略

场景 推荐后端 理由
小状态 (<1GB) filesystem 简单高效
中等状态 (1-10GB) filesystem 成本低
大状态 (>10GB) rocksdb 支持增量 checkpoint
超大状态 (>100GB) rocksdb + 增量 必须

4)总结

  1. 确定作业规模 → 决定 TM 数量
  2. 确定单 TM 能力 → 设置槽位数
  3. 计算总并行度 → 设置默认并行度
  4. 选择状态后端 → 配置 storage
  5. 预留 5% 并行度 → 保证重启能力

二、Flink SQL 实时计算作业

1、秒级监控热门类目

1)场景:实时统计每小时每个商品类目

  • 总点击次数(total_clicks
  • 独立用户数(uv
  • 平均会话时长(简化版)
-- =============================================================================
-- Flink SQL 实时作业:用户点击行为聚合(按小时 + 类目)
-- 目标:写入 ClickHouse,支持实时大屏
-- =============================================================================

-- 1. 创建源表:从 Kafka 读取用户行为日志
CREATE TABLE user_behavior_source (
    user_id BIGINT,
    event_time_str STRING,          -- 原始时间字符串(如 "2025-04-05 14:23:18")
    page STRING,
    product_id BIGINT,
    category_id BIGINT,
    action STRING,

    -- 将字符串时间转为 TIMESTAMP,并定义为事件时间
    event_time AS TO_TIMESTAMP(event_time_str, 'yyyy-MM-dd HH:mm:ss'),

    -- 定义 Watermark:允许 5 分钟乱序
    WATERMARK FOR event_time AS event_time - INTERVAL '5' MINUTE
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'flink-click-analysis',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

-- 2. 创建结果表:写入 ClickHouse
CREATE TABLE click_summary_sink (
    window_start STRING,            -- 窗口开始时间(格式:2025-04-05 14:00:00)
    category_id BIGINT,
    total_clicks BIGINT,
    uv BIGINT,
    PRIMARY KEY (window_start, category_id) NOT ENFORCED  -- ClickHouse 主键提示
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:clickhouse://clickhouse:8123/default',
    'table-name' = 'click_summary',
    'username' = 'default',
    'password' = '',
    'sink.buffer-flush.max-rows' = '10000',
    'sink.buffer-flush.interval' = '60s'
);

-- 3. 核心计算:按小时窗口 + 类目聚合
INSERT INTO click_summary_sink
SELECT
    DATE_FORMAT(TUMBLE_START(event_time, INTERVAL '1' HOUR), 'yyyy-MM-dd HH:mm:ss') AS window_start,
    category_id,
    COUNT(*) AS total_clicks,                     -- 总点击量
    COUNT(DISTINCT user_id) AS uv                 -- 独立用户数(UV)
FROM user_behavior_source
WHERE
    action = 'click'                              -- 只统计点击
    AND category_id IS NOT NULL                   -- 过滤无效类目
GROUP BY
    TUMBLE(event_time, INTERVAL '1' HOUR),        -- 1 小时滚动窗口
    category_id;

2)关键设计分析

a、为什么用事件时间(Event Time)?

  • 用户行为可能因网络延迟乱序到达
  • 使用 WATERMARK 保证 语义正确性(即使数据迟到也能正确归窗)

b、窗口选择:滚动窗口(TUMBLE

  • 每小时统计一次,不重叠 → 适合报表场景
  • 若需“最近1小时滑动”,可用 HOP 窗口

ContactAuthor