大数据_Flink实战篇
前言
Github:https://github.com/HealerJean
一、核心参数配置
1、运行参数
# flink-conf.yaml 核心配置
state.backend: rocksdb
state.checkpoints.dir: hdfs:///prod/checkpoints
state.savepoints.dir: hdfs:///prod/savepoints
# 检查点策略
execution.checkpointing.interval: 60s # 1分钟(金融场景)
execution.checkpointing.timeout: 180s # 3倍间隔
execution.checkpointing.min-pause: 30s # 两次检查点最小间隔
execution.checkpointing.tolerable-failed-checkpoints: 3 # 允许3次失败
# RocksDB 优化
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.fixed-per-slot: 1024mb
state.backend.incremental: true # 增量检查点
# 作业级别配置(代码中)
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig()
.setMinPauseBetweenCheckpoints(30000)
.setCheckpointTimeout(180000)
.setMaxConcurrentCheckpoints(1)
.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
1)检查点 (Checkpoint) - 开关配置策略
Flink会定期将整个作业的当前状态(算子状态、源位置等)保存到一个持久化存储中(如HDFS,S3)。如果作业发生故障,可以从最近一次成功的检查点恢复,保证数据处理的准确性。
- 何时开启:
- 必须开启:
- 有状态计算(窗口聚合、会话分析、机器学习特征工程)
- 要求
Exactly-Once语义的关键业务(金融交易、计费系统)
- 可关闭:无状态
ETL作业(简单过滤、格式转换),但建议始终开启
- 必须开启:
- 生产环境建议:始终开启,这是
Flink容错基石 - 性能影响:开启后吞吐量通常下降
10-30%,但换来的是数据准确性保障
2)检查点模式 (Checkpoint Mode) - 精确度选择策略
- 含义:指定检查点的精确度级别。
- 作用:
Exactly-Once(精确一次):这是最严格的模式。它确保每条输入数据在最终结果中只被处理一次。这需要与支持事务的外部系统(如KafkaSink)配合使用,并且对性能有一定影响。At-Least-Once(至少一次):这是更宽松的模式。它保证每条数据至少被处理一次,但可能会因为网络问题或重试导致某些数据被处理多次。通常用于对重复数据不敏感的场景,性能比Exactly-Once更好。
- 选择建议:
- 如果您的业务逻辑要求绝对精确(例如金融交易),请选择
Exactly-Once。 - 如果可以容忍少量重复处理(例如日志统计),可以选择
At-Least-Once。
- 如果您的业务逻辑要求绝对精确(例如金融交易),请选择
- 建议:即使选择
At-Least-Once,也要在业务层做去重(如使用event_id去重表)
3)检查点间隔 (Checkpoint Interval) - 时间窗口配置策略
- 含义:定义
Flink触发下一次检查点的时间间隔。interval是“理想节奏”,min-pause是“强制休息”。- 没有
min-pause,Flink会在检查点完成后“兴奋地”立刻开始下一次,完全无视你设的interval!
- 作用:控制容错恢复的粒度和开销。
- 权衡:
- 间隔短 (如 2分钟):恢复时丢失的数据量少,恢复时间短,但频繁的检查点会增加 I/O 开销,可能影响作业吞吐量。
- 间隔长 (如 10分钟):I/O 开销小,吞吐量高,但恢复时可能丢失更多数据,恢复时间更长。
- 选择建议:根据可接受的数据丢失窗口(
Data Loss Window)来设定。- 例如,如果能容忍最多 2 分钟的数据丢失,则设为 2 分钟是合理的。
-
核心公式: 最大可容忍数据丢失 = 检查点间隔 × 故障频率
-
典型场景配置:
业务场景 建议间隔 依据 金融风控 30-60秒 每分钟损失可能达数万元 用户行为分析 2-5分钟 分钟级精度可接受 IoT 设备监控 1-2分钟 设备状态需及时更新 日志分析 5-10分钟 可容忍较大数据丢失
a、有 min-pause 的情况( 安全):
| 时间 | 事件 | 计算逻辑 |
|---|---|---|
| 10:00:00 | 触发 #1 | 初始触发 |
| 10:00:20 | #1 完成 | 耗时 20s |
| 10:01:00 | 触发 #2 | max(10:00:00+60s, 10:00:20+30s) = max(10:01:00, 10:00:50) = 10:01:00 |
| 10:01:20 | #2 完成 | 耗时 20s |
| 10:02:00 | 触发 #3 | max(10:01:00+60s, 10:01:20+30s) = max(10:02:00, 10:01:50) = 10:02:00 |
结果:严格每 60s 触发一次,即使检查点很快完成。
b、无 min-pause 的情况( 危险):
当
min-pause未设置(或为 0),Flink 的行为是: “上次检查点一完成,就立刻触发下一次!”
| 时间 | 事件 |
|---|---|
| 10:00:00 | 触发 #1 |
| 10:00:20 | #1 完成 → 立即触发 #2 |
| 10:00:40 | #2 完成 → 立即触发 #3 |
| 10:01:00 | #3 完成 → 立即触发 #4 |
| 10:01:20 | #4 完成 → 立即触发 #5 |
| … | … |
结果:虽然你设置了 interval=60s,但因为检查点只要 20s 就完成,系统每 20s 就做一次检查点!
4)检查点超时时间 (Checkpoint Timeout) - 超时设置策略
- 含义:定义一个检查点操作从开始到必须完成的最大允许时间。
- 作用:防止某个检查点因各种原因(如大状态、网络延迟、任务繁忙)而长时间卡住,从而阻塞后续的检查点触发。
- 行为:如果一个检查点在 6 分钟内未能完成,它会被标记为失败,
Flink会尝试进行下一次检查点。 - 选择建议:这个值应该大于或等于 检查点间隔,否则检查点会持续失败。对于状态很大的作业,可能需要设置更长的超时时间。
- 黄金比例:超时时间 = 检查点间隔 × 2~3
-
调整依据:
# 伪代码:根据状态大小估算超时 state_size_gb = get_current_state_size() if state_size_gb < 1: timeout = interval * 1.5 # 小状态 elif state_size_gb < 10: timeout = interval * 2.5 # 中等状态 else: timeout = interval * 4 # 大状态(RocksDB) - 运维技巧:设置监控告警,当
Checkpoint持 续超时3次时自动触发作业扩容
5)状态存储类型 (State Storage Type) - 存储引擎选择策略
- 含义:指定了
Flink如何存储和管理作业的状态。 -
作用:不同的后端适用于不同规模和需求的作业。
- 系统默认:通常指
FsStateBackend或根据集群配置自动选择。如果您有大状态需求,强烈建议显式选择RocksDBStateBackend。
| 状态后端 | 存储架构 | 变化追踪能力 | 文件版本控制 |
|---|---|---|---|
| MemoryStateBackend | 纯内存 Map | 无 | 无 |
| FsStateBackend | 内存 + 全量文件 | 无细粒度追踪 | 全量覆盖 |
| RocksDBStateBackend | LSM-Tree 数据库 |
SST 文件级 | 文件版本管理 |
a、MemoryStateBackend (内存):
状态存储在
TaskManager的JVM内存中。速度快,但容量有限,不适合大状态作业,且重启后状态丢失。
- 工作方式:每次
Checkpoint都像用拍立得相机重新拍一张完整照片 - 为什么不支持增量:
- 状态全部在
JVM堆内存中 - 没有内置的”变化检测”机制
- 无法识别哪些数据是新增/修改的
- 状态全部在
- 局限:状态 >
1GB时基本不可用 - 适用场景:测试环境 • 无状态作业,状态大小 < 1GB

b、FsStateBackend (文件系统)
状态先存储在
TaskManager内存,然后异步写入分布式文件系统(如HDFS)。适合中等大小的状态。
- 工作方式:每次
Checkpoint都把整个状态打包成一个完整包裹寄出 - 为什么不支持增量:
- 虽然最终存储在文件系统,但内存中是全量状态
- 没有版本控制机制来跟踪变化
- 每次都是全量序列化 + 全量传输
- 案例:每秒处理 10 万事件,但 只有 10% 的用户被更新,还是会存储10万数据
- 局限:
10GB状态 = 每次Checkpoint传输 10GB - 适用场景:小状态作业 • 低延迟要求,1-10GB,

c、RocksDBStateBackend (RocksDB):
- 状态存储在本地磁盘上的
RocksDB数据库中。可以处理非常大的状态(仅受磁盘限制),支持增量Checkpoint - 工作方式:
- 首次
Checkpoint:上传所有SST(Sorted String Table) 文件 - 后续
Checkpoint:只上传新生成的 SST 文件 - 恢复时:组合所有相关的 SST 文件重建完整状态
- 首次
- 支持增量:每秒处理 10 万事件,但 只有 10% 的用户被更新,之后会存储10%的数据
- 技术核心:利用
RocksDB的LSM-Tree架构和文件版本控制 - 适用场景:生产环境默认 • 大状态作业 • 长时间运行,TB 级

6)状态最大保留时长 (Max State Retention Time) - 存储生命周期策略
- 含义:定义了
Flink在成功创建一个新检查点后,旧的、已过期的检查点(即不是最新的那个)可以在持久化存储中保留多长时间。 - 作用:控制检查点存储空间的占用。
- 行为:
Flink会周期性地执行检查点。- 当一个新的检查点成功完成时,旧的检查点会变成“过期”状态。
- 这个参数决定了这些过期检查点可以存在多久。
- 超过此时间后,
Flink会自动清理(删除)这些过期的检查点。
- 重要性:这是一个关键的空间管理参数。
- 如果设置得太长,会占用大量存储空间;
- 如果设置得太短,可能导致无法回滚到较早的检查点进行调试或恢复。
-
选择建议:保留时长 > (检查点间隔 × 3) + 预期恢复时间
-
分场景建议:
作业重要性 建议保留时长 依据 核心业务(支付) 24-72小时 支持灾难恢复 重要业务(推荐) 4-12小时 支持问题回溯 普通业务(日志) 1-2小时 仅满足基本恢复
7)微批模式 (Micro-Batch Mode) - 性能优化开关策略
- 含义:是否启用微批处理模式。
- 作用:这是一种优化模式
- 将流处理作业的连续数据流分割成一系列小的批次(
micro-batches)进行处理。 - 这可以利用批处理引擎(如
Spark)的优化技术,有时能提高性能。
- 将流处理作业的连续数据流分割成一系列小的批次(
-
选择建议:对于大多数标准的流处理场景,保持
OFF即可。只有在特定的优化需求下才会考虑开启。 -
何时开启:
- 建议开启:
Sink是数据库(MySQL/Postgres)且吞吐 >1K records/sec- 窗口聚合作业(特别是 Tumbling Window)
- 禁止开启:事件驱动型应用(
CEP复杂事件处理)
- 建议开启:
-
配置口诀:写库开微批,实时关微批;吞吐是瓶颈,微批救急用
-
最佳实践配置:
// 开启微批 + 设置批大小/时间 env.setRuntimeMode(RuntimeExecutionMode.BATCH); // Flink 1.15+ env.getConfig().setAutoWatermarkInterval(200); // 200ms 触发 // 或旧版本: env.enableCheckpointing(60000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 500ms 微批
2、运行参数-配置参考
1)状态存储核心配置
# 状态后端选择:RocksDB (生产环境大状态作业唯一选择)
# ✅ 优势:支持 TB 级状态、增量 Checkpoint、故障恢复快
# ❌ 对比:Memory/Fs 后端在 >10GB 状态时性能急剧下降
state.backend: rocksdb
# 检查点存储路径 (HDFS/S3/阿里云 OSS 等分布式存储)
# 💡 关键:必须使用高可用存储,避免单点故障
# ⚠️ 坑:本地路径会导致恢复失败!
state.checkpoints.dir: hdfs:///prod/checkpoints
# 保存点存储路径 (手动触发的 Checkpoint,用于版本升级/回滚)
# 🔒 安全建议:与 checkpoints.dir 分离,避免误删
state.savepoints.dir: hdfs:///prod/savepoints
2)检查点容错策略 (金融级配置)
# 检查点间隔:60秒 (金融场景黄金标准)
# 📊 数据丢失容忍 = 间隔 × 故障频率
# • 60s: 最多丢失1分钟数据 (金融/支付场景)
# • 300s: 5分钟 (普通业务)
# • 600s+: 10分钟+ (日志分析)
execution.checkpointing.interval: 60s
# 检查点超时:180秒 (必须 > 间隔 × 2)
# 🔍 超时行为:超过180秒未完成则标记失败,触发下一次检查点
# 💡 计算公式:超时 = 间隔 × (2 ~ 5) [状态越大系数越高]
execution.checkpointing.timeout: 180s
# 两次检查点最小间隔:30秒
# ⚖️ 作用:防止检查点雪崩(前一个未完成就触发下一个)
# 🌰 示例:间隔60s + 最小暂停30s = 实际触发频率90s
# ✅ 最佳实践:min-pause = interval × 0.5
execution.checkpointing.min-pause: 30s
# 允许连续失败次数:3次
# 🛡️ 容错机制:3次连续失败才会触发作业重启
# ⚠️ 风险:设为0会导致单次失败立即重启(大状态作业灾难)
# ✅ 金融场景建议:3-5次;普通场景:2-3次
execution.checkpointing.tolerable-failed-checkpoints: 3
3)RocksDB 深度优化 (性能关键)
# 内存托管模式:true (强烈推荐)
# ✅ 优势:Flink 自动管理 RocksDB 内存,避免 OOM
# ❌ 风险:false 时需手动配置 block-cache/write-buffer
# 📌 适用:所有生产环境
state.backend.rocksdb.memory.managed: true
# 每个 slot 分配的 RocksDB 内存:1024MB
# 💡 调优指南:
# • 小状态作业 (<10GB): 512-1024MB
# • 中等状态 (10-100GB): 1024-2048MB
# • 超大状态 (>100GB): 2048-4096MB
# ⚠️ 限制:总内存 = fixed-per-slot × 并行度,勿超物理内存
state.backend.rocksdb.memory.fixed-per-slot: 1024mb
# 增量检查点:true (TB 级状态必备)
# 🚀 性能提升:I/O 降低 25-100 倍,Checkpoint 时间从分钟级→秒级
# 🔍 原理:只上传新生成的 SST 文件,而非全量状态
# ❌ 限制:首次 Checkpoint 仍是全量
state.backend.incremental: true
4)作业代码级配置
// 启用检查点 (60秒间隔 + Exactly-Once 语义)
// 💡 Exactly-Once:每条数据精确处理一次(金融场景必需)
// ⚠️ 性能代价:比 At-Least-Once 慢 15-25%
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
// 检查点配置对象
env.getCheckpointConfig()
// 两次检查点最小间隔 (30秒)
// ✅ 作用:防止 Checkpoint 雪崩(生产环境必需)
.setMinPauseBetweenCheckpoints(30000)
// 检查点超时 (180秒)
// 🔗 必须与 flink-conf.yaml 中 timeout 保持一致
.setCheckpointTimeout(180000)
// 最大并发检查点数:1 (强烈推荐)
// ⚖️ 权衡:
// • 1 = 稳定性优先(推荐生产环境)
// • >1 = 吞吐优先(可能增加 I/O 压力)
.setMaxConcurrentCheckpoints(1)
// 外部化检查点策略:作业取消时保留
// 🔥 重要性:这是灾难恢复的唯一保障!
// 📌 两种模式:
// • RETAIN_ON_CANCELLATION: 保留(推荐)
// • DELETE_ON_CANCELLATION: 删除(测试环境用)
.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
enableExternalizedCheckpoints
- 核心作用:外部化检查点策略:灾难恢复的”最后一根稻草”
- 作业取消时保留检查点:即使人为停止作业,检查点仍保留在
HDFS/S3 - 灾难恢复保障:当作业因 Bug 崩溃、集群故障时,可从最近检查点恢复
- 升级回滚能力:代码升级失败时,可回退到旧版本 + 旧状态
- 作业取消时保留检查点:即使人为停止作业,检查点仍保留在
| 模式 | 作业正常停止 | 作业异常崩溃 | 适用场景 | 风险 |
|---|---|---|---|---|
RETAIN_ON_CANCELLATION (保留) |
保留检查点 | 保留检查点 | 所有生产环境 | 需手动清理旧检查点 |
DELETE_ON_CANCELLATION (删除) |
删除检查点 | 保留检查点 | 仅测试/开发环境 | 生产环境灾难! |
15)总结
- 检查点路径使用分布式存储 (非本地)
- interval × 3 ≤ timeout (当前 60×3=180 ✓)
- 增量检查点已开启 (state.backend.incremental: true)
- 外部化检查点策略为 RETAIN_ON_CANCELLATION
- RocksDB 内存配置未超过物理内存
- 已配置合理的重启策略
- HDFS/S3 存储空间 > (状态大小 × 保留检查点数量 × 1.5)
3、资源配置
1)管理节点(JM)
JM不执行计算,只做调度,因此资源要求不高。
| 参数 | 值 | 解释 |
|---|---|---|
| 数量 | 1 | JM 数量,通常为 1 或 2(高可用) |
| CPU | 1核 | JM 轻量级,无需高性能 CPU |
| 内存 | 4GB |
足够处理调度逻辑 |
| 磁盘 | 5Gi |
存储临时日志等 |
2)运行节点 (TM)
总并行度 =
TM数量 × 单TM槽位数
| 参数 | 值 | 解释 |
|---|---|---|
| 数量 | 5 | TM 数量 = 并行度 / 单 TM 槽位数 |
| CPU | 2核 | 每个 TM 至少 2 核,避免 I/O 等待 |
| 内存 | 4GB | 一般作业足够,大状态需更多 |
| 磁盘 | 5Gi |
存储中间结果、状态快照 |
3)单 TM 槽位数 (Slot)
含义:每个 TM 可以运行的任务数
- 它是是 逻辑 资源单位,不是 物理隔离
Slot数越多,单TM并行能力越强- 但过多
Slot会增加GC开销
配置参考:不要超过 CPU 核心数(避免上下文切换)
-
小作业:1-2 Slot
-
大作业:4-8 Slot
4)集群参数
- 含义:状态后端类型
- 选项:
filesystem:文件系统存储(HDFS/S3)rocksdb:RocksDB数据库(支持 TB 级状态)memory:内存存储(仅用于测试)
- 生产建议:大状态作业必须使用
rocksdb。
4、资源配置-参考
1)并行度与性能关系

2)槽位数与 GC 的权衡
- 槽位数多 → 单 TM 并行高 → GC 频繁
- 槽位数少 → 单 TM 并行低 → GC 减少
3)状态后端选择策略
| 场景 | 推荐后端 | 理由 |
|---|---|---|
| 小状态 (<1GB) | filesystem | 简单高效 |
| 中等状态 (1-10GB) | filesystem | 成本低 |
| 大状态 (>10GB) | rocksdb | 支持增量 checkpoint |
| 超大状态 (>100GB) | rocksdb + 增量 | 必须 |
4)总结
- 确定作业规模 → 决定
TM数量 - 确定单
TM能力 → 设置槽位数 - 计算总并行度 → 设置默认并行度
- 选择状态后端 → 配置
storage - 预留
5%并行度 → 保证重启能力
二、Flink SQL 实时计算作业
1、秒级监控热门类目
1)场景:实时统计每小时每个商品类目
- 总点击次数(
total_clicks) - 独立用户数(
uv) - 平均会话时长(简化版)
-- =============================================================================
-- Flink SQL 实时作业:用户点击行为聚合(按小时 + 类目)
-- 目标:写入 ClickHouse,支持实时大屏
-- =============================================================================
-- 1. 创建源表:从 Kafka 读取用户行为日志
CREATE TABLE user_behavior_source (
user_id BIGINT,
event_time_str STRING, -- 原始时间字符串(如 "2025-04-05 14:23:18")
page STRING,
product_id BIGINT,
category_id BIGINT,
action STRING,
-- 将字符串时间转为 TIMESTAMP,并定义为事件时间
event_time AS TO_TIMESTAMP(event_time_str, 'yyyy-MM-dd HH:mm:ss'),
-- 定义 Watermark:允许 5 分钟乱序
WATERMARK FOR event_time AS event_time - INTERVAL '5' MINUTE
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-click-analysis',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
-- 2. 创建结果表:写入 ClickHouse
CREATE TABLE click_summary_sink (
window_start STRING, -- 窗口开始时间(格式:2025-04-05 14:00:00)
category_id BIGINT,
total_clicks BIGINT,
uv BIGINT,
PRIMARY KEY (window_start, category_id) NOT ENFORCED -- ClickHouse 主键提示
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:clickhouse://clickhouse:8123/default',
'table-name' = 'click_summary',
'username' = 'default',
'password' = '',
'sink.buffer-flush.max-rows' = '10000',
'sink.buffer-flush.interval' = '60s'
);
-- 3. 核心计算:按小时窗口 + 类目聚合
INSERT INTO click_summary_sink
SELECT
DATE_FORMAT(TUMBLE_START(event_time, INTERVAL '1' HOUR), 'yyyy-MM-dd HH:mm:ss') AS window_start,
category_id,
COUNT(*) AS total_clicks, -- 总点击量
COUNT(DISTINCT user_id) AS uv -- 独立用户数(UV)
FROM user_behavior_source
WHERE
action = 'click' -- 只统计点击
AND category_id IS NOT NULL -- 过滤无效类目
GROUP BY
TUMBLE(event_time, INTERVAL '1' HOUR), -- 1 小时滚动窗口
category_id;
2)关键设计分析
a、为什么用事件时间(Event Time)?
- 用户行为可能因网络延迟乱序到达
- 使用
WATERMARK保证 语义正确性(即使数据迟到也能正确归窗)
b、窗口选择:滚动窗口(TUMBLE)
- 每小时统计一次,不重叠 → 适合报表场景
- 若需“最近1小时滑动”,可用
HOP窗口


