项目经验_之_全量数据预热
前言
Github:https://github.com/HealerJean
一、全量数据预热系统
- 数据不丢:即使中间服务假死,恢复后也能追上。
- 一致性:
5000台机器数据基本一致(允许秒级延迟) - 性能:避免大
Key、避免全量替换、避免高CPU/GC - 可扩展:支持每日全量刷新 + 增量同步
二、全量写入方案
1、方案目标
| 目标 | 描述 |
|---|---|
| 支持千万级数据全量加载 | 数据总量约 10GB(每条记录 ~1KB),需避免 OOM 和长时间阻塞 |
| 高效存储与传输 | 减少磁盘占用、网络带宽消耗,优先选择紧凑序列化格式,每个数据源可选序列化方式,可选压测 |
| 分片并行写入加速 | 利用多线程提升写入速度,缩短整体导出耗时 |
| 原子性发布机制 | 防止客户端读取到“部分完成”的快照文件 |
| 版本控制与通知机制 | 实现服务端主动通知,确保下游及时感知新版本 |
| 自动清理旧快照 | 避免 OSS/S3 存储无限膨胀,保留最近 7 天历史 |
| 增量起点可追溯 | 记录本次全量开始时间,作为后续增量拉取的起始位点 |
| 生产级稳定性保障 | 支持失败重试、临时文件保护、日志监控 |
2、整体架构概览
- 数据源读取:基于主键游标分页,流式加载,避免 OOM。
- 分片写入:按
userId哈希分发到多个分片,并行写入。 - 序列化压缩:使用 Protobuf + GZIP,极致压缩。
- 原子发布:先写
.tmp临时文件,完成后rename发布。 - 版本通知:将最新版本号写入 Redis,下游轮询感知。
- 清理旧版本:保留最近 7 天快照,防止存储膨胀
+------------------+ +----------------------------+
| 源系统 (MySQL) | --> | 分页读取 + 流式处理引擎 |
+------------------+ +--------------+-------------+
|
v
+----------------------------------+
| 按 userId 哈希分发到 N 个分片写入器 |
+----------------+-----------------+
|
v
+--------------------+--------------------+
| 对象存储层 (OSS / S3) |
| full_data_v{version}_{shardId}.pb.gz |
+--------------------+--------------------+
|
v
+----------+-----------+
| Redis 版本中心 |
| snapshot:user_tag:latest |
+----------+-----------+
|
v
+-----------+------------+
| 下游服务(定时轮询 Redis) |
| 下载分片 → 解压 → 加载内存 |
+-----------+------------+
3、核心组件详解
1)数据源读取模块
-
问题背景:直接
SELECT * FROM table会一次性加载全部数据,导致内存溢出(OOM),尤其在千万级以上数据量时不可接受。 -
解决方案:基于主键的游标分页
- 使用
user_id(主键或唯一索引)作为分页锚点。 - 每次读取
1W条,避免内存堆积。 - 不依赖
OFFSET,避免深度分页性能下降。
- 使用
SELECT user_id, tag_id, tag_value, update_time
FROM user_tag
WHERE user_id > ?
ORDER BY user_id ASC
LIMIT 10000;
public List<UserTag> findByPage(Long lastId, int pageSize) {
String sql = "SELECT ... WHERE user_id > ? ORDER BY user_id LIMIT ?";
return jdbcTemplate.query(sql, (rs, rowNum) -> mapRowToUserTag(rs), lastId, pageSize);
}
2)序列化格式
推荐使用
Protobuf+GZIP组合,在压缩率和解析性能之间取得最佳平衡,适合跨语言、高性能场景。
- 内部高性能服务通信 → 用
Protobuf - 对外
REST API/ 调试友好 → 用JSON(或JSON + Gzip如果带宽紧张) - 持久化存储 / 批量数据导出 → 用
Protobuf + Gzip - 已有
JSON系统想降带宽 → 快速启用JSON + Gzip(零代码改造)
| 格式 | 大小对比 | 性能对比 | 是否推荐 | 说明 |
|---|---|---|---|---|
JSON |
100% | 1x | ❌ | 可读性好,但体积大、性能低,不适合高性能或带宽敏感场景 |
JSON + Gzip |
~30% | ~0.8x(略慢) | ✅ | 体积显著减小,适合网络传输;但 CPU 开销增加,且仍需解析文本 |
Protobuf |
~40% | 3x 更快 | ✅✅ | 二进制、高效、跨语言,适合内部服务通信和存储 |
Protobuf + Gzip |
~15% | 2.5x | ✅✅✅ | 极致压缩 + 高性能,适合冷数据存储或高成本带宽场景 |
3)OSS 数据分片策略(负载均衡 + 小文件优化)
a、分片大小
| 参数 | 原始方案(1000片) | 优化后方案(100片) |
|---|---|---|
| 单片大小 | ~10MB | ~100–150MB |
| 文件数量 | 1000 | 100 |
| 并发连接 | 高(易超限) | 适中 |
OSS 元数据压力 |
极高 | 可控 |
-
为什么改为
100片?SS/S3最佳实践建议单个对象大小 ≥ 100MB,以减少元数据开销和列举延迟
-
1000个小文件会导致OSS元数据压力大、列举慢、打开开销高。100个分片可在大多数集群环境中良好并行处理。
-
哈希算法改进(防倾斜):
-
int shardId = Math.floorMod(Objects.hash(userId), SHARD_COUNT); -
使用
Objects.hash()提供更好的散列分布。 Math.floorMod确保结果为正整数,避免负索引异常。
-
b、OSS 性能优化
问题1:OSS 能否支持数千台机器同时下载同一个文件?
答案::完全可以,但必须优化方式,不能 5000 台机器同时下载,导致 所有请求打到同一个 Object,这样会导致
OSS内部热点(虽然会自动负载均衡)- 出口带宽瓶颈(如果都在一个 VPC 出口)
- 下载慢、超时、失败率高
问题2:正确做法是什么呢?
答案:正确做法:OSS + CDN 加速(推荐方案)
10GB文件,CDN缓存后,3000台机器可在10分钟内全部下载完成(平均 50MB/s)CDN缓存策略:- 缓存
.pb.gz文件,TTL设置为0(不自动过期) - 手动调用
Refresh刷新缓存(每次发布新版本时)
- 缓存
OSS+CDN开启HTTPS- 使用内网
CDN(如果支持) → 更快、更便宜
| 优势 | 说明 |
|---|---|
| 首请求回源,后续命中缓存 | 第一台机器下载后,CDN 缓存整个文件 |
就近接入点(Edge)下载 |
每台机器从最近的 CDN 节点拉取,速度快 |
| 支持百万级并发下载 | CDN 天生为高并发设计 |
减轻 OSS 压力 |
只有少量回源请求 |
4)存储层设计: 压缩上传与原子发布
a、压缩流构建(GZIP + Protobuf)
使用
GZIPOutputStream包裹输出流,自动压缩:
OutputStream rawStream = storage.getOutputStream(tempFilename);
GZIPOutputStream gos = new GZIPOutputStream(rawStream);
CodedOutputStream cos = CodedOutputStream.newInstance(gos);
// 写入每条消息(delimited)
for (UserTag record : batch) {
UserTagProto proto = convertToProto(record);
proto.writeDelimitedTo(cos); // 自动添加长度前缀
}
cos.flush();
gos.close(); // 触发压缩结束
b、原子发布机制(临时文件 + rename)
- 若任务中断,临时文件存在,下次可恢复;
- 客户端只看到完整发布的文件,不会读到半成品;
- 支持多种底层存储(本地/
OSS/S3)统一语义。
// 写入 .tmp 临时文件
String tempFile = filename + ".tmp";
try (OutputStream os = storage.getOutputStream(tempFile)) {
writeData(os);
}
// 原子性发布:rename to final name
storage.publish(tempFile, filename);
5)版本通知机制:Redis 替代 ZooKeeper
a、 ZooKeeper 的局限性(不推荐用于事件广播)
| 风险 | 说明 |
|---|---|
ZK 瞬时推送风暴 |
一次 setData(),ZK 向数千台机器发送事件 |
ZK 延迟升高 |
大量 Watcher 导致协调服务响应变慢 |
| 客户端处理不过来 | 数千台机器同时发起下载 → 网络洪峰 |
ZK 不是消息广播系统 |
它是协调服务,不适合大规模事件通知 |
b、推荐方案:Redis 轻量级版本中心
TTL设置:所有key设置 7 天过期,与快照生命周期一致。- 下游行为:
- 每隔
1~5分钟轮询一次snapshot:{dataSource}:latest; - 发现版本变化后,下载对应的
manifest并校验各分片。
- 每隔
当前最新版本号
snapshot:user_tag:latest → "1730486400"
# 版本详情(Hash)
snapshot:user_tag:1730486400.version → "1730486400"
snapshot:user_tag:1730486400.createTime → "1730486400000"
snapshot:user_tag:1730486400.recordCount → "9876543"
snapshot:user_tag:1730486400.manifestUrl → "manifest/user_tag_v1730486400_manifest.json"
6) 清理策略:保留最近 7 天快照
防止存储无限增长。支持回滚到过去一周内的任意版本。降低备份成本。
public void cleanupOld(String dataSource, long currentVersion) {
try {
String prefix = dataSource + "_v";
for (String file : storage.listFiles(prefix)) {
Long ver = extractVersion(file);
if (ver != null && ver < currentVersion) {
storage.delete(file);
log.info("🗑️ 删除旧快照文件: {}", file);
}
}
// 清理旧 manifest 和 checkpoint
cleanupByPrefix("manifest/" + prefix, currentVersion);
cleanupByPrefix("checkpoints/" + prefix, currentVersion);
} catch (IOException e) {
log.error("清理失败", e);
}
}
private void cleanupByPrefix(String prefix, long currentVersion) throws IOException {
for (String file : storage.listFiles(prefix)) {
Long ver = extractVersion(file);
if (ver != null && ver < currentVersion) {
storage.delete(file);
log.info("🗑️ 删除旧辅助文件: {}", file);
}
}
}
7)增量衔接:记录全量开始时间
-
关键字段:
-
startTime: 当前全量任务开始的时间戳(即上次全量完成后第一次增量的起点) -
也可使用
max(update_time)作为更精确的起点,取决于业务语义。
-
-
后续增量消费方式:
来源 拉取方式 MySQLBinlog从 update_time > startTime查询Kafka从对应时间点的 offset 开始消费 RedisStreamXREAD with $or timestamp
8)错误处理与健壮性设计
a、健壮性设计
| 风险 | 应对措施 |
|---|---|
| 写入中途崩溃 | 使用临时文件,下次启动可判断是否继续 |
| 断点续传 | 记录已处理的最大 user_id,失败后从中断处恢复 |
ZK 连接断开 |
自动重连 + 本地缓存最后版本 |
| 内存溢出 | 流式处理,无中间集合缓存 |
| 文件未完全写入 | 只有全部写完才发布版本 |
b、验证方式
| 验证项 | 方法 |
| ———- | ————————————————- |
| 断点续传 | kill 进程 → 重启 → 查看日志是否从上次位置继续 |
| 数据一致 | 查看下游日志是否打印 ` 加载成功,共 X 条 |
| 文件完整性 | cat /data-preload/snapshots/manifest.json |
| 断点文件 | cat /data-preload/checkpoints/checkpoint_v.txt` |
9)文件命名规范 & 示例
dataSource: 数据源名称(如user_tag,item_profile)version:System.currentTimeMillis(),全局唯一且有序shardId:[0, N-1].pb.gz: Protobuf + Gzip 压缩格式
full_data_v{version}_{shardId}.pb.gz
示例:
full_data_v1730486400000_0.pb.gz
full_data_v1730486400000_1.pb.gz
...
full_data_v1730486400000_99.pb.gz
10)性能估算(以 1000 万条为例)
| 指标 | 数值 |
|---|---|
| 原始数据大小 | ~10 GB |
| Protobuf 大小 | ~4 GB |
| Protobuf + Gzip | ~1.5 GB |
| 分片数量 | 100 个 |
| 平均每个文件 | ~15 MB |
| 导出耗时 | 5–10 分钟(DB 负载正常) |
| 内存占用 | < 200 MB(流式处理) |
CPU 占用 |
中等(Protobuf 序列化) |
11)方案优势总结
| 维度 | 说明 |
|---|---|
| 高效性 | Protobuf + Gzip 极致压缩,节省 85% 存储 |
| 一致性 | 临时文件 + ZK 原子发布,杜绝脏读 |
| 可维护性 | 自动清理旧版本,防止数据膨胀 |
| 可观测性 | 版本信息丰富,支持监控告警 |
| 扩展性 | 分片结构天然支持水平扩展 |
| 兼容性 | Protobuf 跨语言,易于被 Go/Python/C++ 客户端解析 |
1)流读取,而不是 csv
| 维度 | 流式文本读取 | EasyExcel 读 CSV |
|---|---|---|
| 适用格式 | 纯文本 CSV(每行一个记录) | 官方不主推 CSV,需额外配置,功能弱于 Excel |
| 内存占用 | 极低(逐行读,无缓存) | 较低(SAX 模式),但 CSV 解析不如原生流高效 |
| 性能 | 高(无反射、无注解解析) | 中等(有对象映射开销) |
| 开发效率 | 需手动解析字段(split 或 JSON) | 可用 @ExcelProperty 自动映射(但 CSV 支持差) |
| 错误处理 | 精细(可定位到具体行) | 较粗(依赖 EasyExcel 异常回调) |
| 依赖 | 无额外依赖 | 需引入 easyexcel |
CSV 特性支持 |
需自己处理引号、转义、分隔符等 | EasyExcel 的 CSV reader 功能较弱,对 RFC4180 支持不完整 |
二、增量写入方案
基于 Redis 的增量数据写入方案,用于将消息队列中的增量数据高效、有序地持久化到 Redis 中,并支持按时间窗口和分片(shard)组织数据,同时具备容错恢复能力。
1、背景与目标
1)为什么需要增量快照?
- 全量快照成本高:每次重新拉取所有用户标签数据(如
user_tag)会消耗大量 I/O、网络和内存。 - 实时性要求:业务需要近实时感知用户标签变更(如打标、去标),不能依赖小时级/天级全量。
- 资源隔离:全量与增量应解耦,避免互相干扰。
2)设计目标
- 高吞吐、低延迟地处理来自消息队列的增量数据;
- 保证数据顺序性(通过全局递增
offset); - 支持按时间范围查询(通过时间索引);
- 自动分片存储,避免单个
ZSET过大影响性能; - 具备故障恢复能力:当
Redis中offset丢失时,能从数据库恢复; - 控制内存占用:通过
TTL和分片数量限制自动清理旧数据。
2、方案选型
1)方案对比
a、 Kafka + Redis ZSET + 分片偏移
非常适合数百~数千台机器。
1)原理:
-
数据按
offset写入RedisZSET(score=offset,member=data) -
每个
shard对应一个ZSETkey(如incr:dataset:shard0) -
客户端本地维护
currentOffset,按offset范围拉取 -
分片大小由
maxMembersSize控制(如每shard1000条) -
空
shard时主动探测下一个shard防漏
2)优点
- 天然支持高并发读:
Redis单实例可支撑数万QPSZSETrangeByScore是O(logN + M),性能良好。
- 精确
offset控制:基于score的偏移机制,支持任意offset拉取,适合“按位点恢复”。 - 无需中间件依赖:仅依赖
Redis,部署简单。 - 支持全量+增量统一模型:全量可视为 指定
offset开始拉。
3)缺点
- 偏移量管理复杂
- 写入端:生产者需按
offset写入正确shard(需协调)。 - 客户端:需要自行维护偏移量,容易出错
- 写入端:生产者需按
- 空窗口问题:
- 分片边界:若某
shard无数据,需主动探测下一shard - 切换边界易出错:如
maxMembersSize变化导致shard计算不一致
- 分片边界:若某
b、Kafka 广播模式 (首选)
百台以内较合适;数百台勉强可用(需大集群+分区调优);数千台以上不推荐(除非做分层代理)。
1)原理
- 每台机器启动时生成唯一
group.id(如hotcache-${host}-${pid}) - 所有机器订阅同一个
topic,各自独立消费全部 partition offset由Kafka自动管理(__consumer_offsets)
2)优点
offset自动持久化:Kafka自动记录每个group的消费位点,重启后自动resume。- 断网追赶能力强:只要数据未被
retention清除(如保留 7 天),可从任意offset拉取。 - 高吞吐、持久化存储:
Kafka本身为日志设计,磁盘顺序写,支持TB级数据。
3)缺点
- 运维复杂度高:
- 需维护
Kafka集群、ZooKeeper/KRaft、监控lag等 - 连接数爆炸:每台机器一个
consumer,数千台 → 数千个TCP连接 + 消费者协调开销。
- 需维护
- 带宽压力大:每台机器都要拉全量增量数据(广播语义),网络带宽 ×
N。 Kafka集群压力:- 数千消费者同时拉取同一
topic,broker负载高 - 启动风暴:服务批量重启时,所有机器同时拉历史数据,
Kafka broker瞬间被打满。
- 数千消费者同时拉取同一
4)适用场景
- 数据一致性要求极高的场景
- 消息量不是特别巨大的情况
- 客户端数量在数百台以内
5)伪代码
// 简化版实现
@KafkaListener(topics = "#{'${cache.topics}'.split(',')}")
public void handleCacheUpdate(ConsumerRecord<String, CacheUpdateMessage> record) {
try {
applyCacheUpdate(record.value());
// 异步提交偏移量
kafkaTemplate.commitSync();
} catch (Exception e) {
// 处理失败,记录异常但不提交偏移量
log.error("Failed to process cache update", e);
}
}
// 服务启动时全量加载 + 增量追赶
@PostConstruct
public void init() {
// 1. 从OSS加载全量
loadFullDataFromOSS();
// 2. 寻找合适的Kafka偏移量开始位置
long startOffset = calculateStartOffset();
// 3. 从指定位置开始消费
kafkaConsumer.seekToBeginning(Collections.singletonList(partition));
kafkaConsumer.seek(partition, startOffset);
}
d、Redis Stream + 消费者组
1)原理
- 使用
Redis 5.0+的Stream数据结构 - 创建
consumer group,每台机器作为独立consumer加入(可共享group,也可独立) - 但你要的是广播,所以每台机器应使用独立
group
2)优点
- 支持消费组 +
offset自动管理:比ZSET更规范。 - 内存可控:
Stream支持MAXLEN ~ N自动截断,避免无限增长。
3)缺点
- 仍是广播模式:每台机器独立
group→ 数据全量复制,带宽浪费同Kafka。 Redis单点瓶颈:Stream写入是单线程(Redis主线程),高吞吐下易成瓶颈。- 数万台
consumer同时读,RedisCPU和网络打满。
- 无水平扩展:
RedisCluster下Stream必须在一个slot(即一个master),无法分片。- 所有读写压在一个
Redismaster(Stream无法分片)
4)适用规模
- 几十 ~ 数百台:可用,优于
ZSET(因有ACK和自动截断)。 - 数千台以上:不推荐。
Redis成为单点瓶颈,且广播流量大。
2)深度分析
a、服务重启如何加载全量 + 增量?
Kafka最标准,ZSET最轻量。
Kafka:全量从OSS,增量从committedoffset拉取(天然支持)。RedisStream:全量OSS,增量从本地存储的last-id开始XREAD。RedisZSET:全量OSS,增量从本地offset开始计算shard拉取(你已实现)。
b、元数据与连接爆炸问题
| 维度 | Kafka |
Redis Stream |
Redis ZSET(分片方案) |
|---|---|---|---|
| 内存占用 | 磁盘存储,内存用于缓存(多节点) | 常驻内存 :单节点承载全部压力 | 常驻内存, shard 可分散多节点 |
| 机器扩展 | partition + broker |
无法扩展(单 key 锁死,单点必崩) |
加 shard + Redis 节点 |
| 运维管理 | 高(需监控 lag、ISR、磁盘) |
中(监控内存、连接、trim) |
低(监控 Redis Cluster ) |
| 元数据规模 | O(消费者数 × 分区数) |
O(消费者数 + pending 数),极易内存爆炸 | O(shard 数) |
| 连接数影响 | 高连接数,但 分散到多 Broker |
高连接数 全压单 Redis 节点,长连接 + 高频监听 → 主线程阻塞 |
连接池复用,连接分散到 Cluster,短请求 → 低延迟、高吞吐 |
3、核心组件概览
| 组件 | 职责 |
|---|---|
SnapshotGlobalConfig |
配置中心,管理各数据集(如 user_tag)的全量 & 增量参数 |
IncrementalWriteService |
消费消息队列(MQ/Kafka)中的增量事件,写入 Redis |
SnapshotPathEnum |
定义 Redis 中各类 Key 的命名规范(路径模板) |
DatasetIncrementalConfig |
增量相关配置项(如分片大小、TTL、落库周期等) |
1)关键配置项说明
hot-snapshot:
datasets:
user_tag:
incremental:
max-members-size: 4000 # 每个 ZSET 最多存 4000 条
db-save-offset-interval: 100 # 每 100 条落库一次 offset
ttl-seconds: 86400 # 数据保留 24 小时
time2-offset-interval: 60 # 每 60 秒记录一次时间偏移(实际代码未用此值!)
2)Redis 数据结构设计
| 用途 | Key 示例 | 说明 |
|---|---|---|
| 全局 offset | user_tag:incr:latestOffset |
INCR 生成唯一 ID |
| 增量分片 | user_tag:incr:shard:0 |
ZSET,member=data, score=offset |
| 时间偏移映射 | user_tag:incr:timeOffset:1730985600 |
value=offset |
| 时间段记录 | user_tag:incr:timePeriod |
ZSET,记录所有时间戳(用于清理 |
4、增量写入全流程详解
package com.healerjean.proj.hotcache.incr;
import com.healerjean.proj.hotcache.config.DatasetIncrementalConfig;
import com.healerjean.proj.hotcache.config.IncrementalExecutionConfig;
import com.healerjean.proj.hotcache.config.SnapshotGlobalConfig;
import com.healerjean.proj.hotcache.enums.SnapshotPathEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.Duration;
import java.time.LocalDateTime;
/**
* 消息队列消费者 - 处理增量数据写入
*
* @author zhangyujin
* @date 2025/11/3
*/
@Slf4j
@Component
public class IncrementalWriteService {
/***
* snapshotGlobalConfig
*/
@Resource
private SnapshotGlobalConfig snapshotGlobalConfig;
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 增量任务写入
*
* @param datasetName datasetName
*/
public void write(String data, String datasetName) {
IncrementalExecutionConfig executionConfig = snapshotGlobalConfig.instanceIncrementalConfig(datasetName);
DatasetIncrementalConfig incrementalConfig = executionConfig.getIncrementalConfig();
Long ttlSeconds = incrementalConfig.getTtlSeconds();
LocalDateTime dateTime = LocalDateTime.now();
Duration ttl = Duration.ofSeconds(ttlSeconds);
// 1. 获取全局唯一 offset
String offsetKey = SnapshotPathEnum.REDIS_INCR_LATEST_OFFSET_KEY.format(datasetName);
Long offset = redisTemplate.opsForValue().increment(offsetKey);
// 2. 如果 Redis 返回 null(如 key 不存在且 incr 失败),从 DB 恢复
if (offset == null || offset <= 0) {
offset = restoreOffsetFromDb(datasetName, offsetKey);
}
// 2. 根据 offset 计算分片 ID(每 MAX_MEMBERS_PER_ZSET 一个分片)
Integer maxMembersSize = incrementalConfig.getMaxMembersSize();
long shardId = (offset - 1) / maxMembersSize;
String zsetKey = SnapshotPathEnum.REDIS_INCR_CURRENT_SHARD_KEY.format(datasetName, shardId);
// 3. 写入 ZSET(score = offset)
Boolean added = redisTemplate.opsForZSet().add(zsetKey, data, (double) offset);
if (Boolean.TRUE.equals(added)) {
redisTemplate.expire(zsetKey, ttl);
}
// 4. 写入时间索引
long incrTimeSecond = executionConfig.getIncrIntervalTimeSecond(dateTime, 1);
String timePeriodKey = SnapshotPathEnum.REDIS_INCR_TIME_PERIOD_KEY.format(datasetName);
String timeOffset = SnapshotPathEnum.REDIS_INCR_TIME_OFFSET_KEY.format(datasetName, String.valueOf(incrTimeSecond));
redisTemplate.opsForValue().set(timeOffset, offset.toString(), ttl);
redisTemplate.opsForZSet().add(timePeriodKey, String.valueOf(incrTimeSecond), (double) incrTimeSecond);
Long currentSize = redisTemplate.opsForZSet().size(timePeriodKey);
if (currentSize != null && currentSize > maxMembersSize) {
long removeCount = currentSize - maxMembersSize;
redisTemplate.opsForZSet().removeRange(timePeriodKey, 0, removeCount - 1);
}
redisTemplate.expire(timePeriodKey, ttl);
Integer dbSaveOffsetInterval = incrementalConfig.getDbSaveOffsetInterval();
if (offset % dbSaveOffsetInterval == 0) {
// 6. 将 offset 写入数据库
}
}
/**
* 从数据库恢复
*
* @param datasetName datasetName
* @param offsetKey offsetKey
* @return {@link Long}
*/
private long restoreOffsetFromDb(String datasetName, String offsetKey) {
String lockKey = "lock:offset:init:" + datasetName;
Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", Duration.ofSeconds(10));
if (Boolean.FALSE.equals(locked)) {
throw new RuntimeException("Interrupted while waiting for offset init");
}
try {
// 从 DB 读取最新 offset
// Long dbOffset = offsetRepository.getCurrentOffsetFromDb(datasetName);
// long nextOffset = dbOffset + 1;
// redisTemplate.opsForValue().set(offsetKey, String.valueOf(nextOffset));
// return nextOffset;
} finally {
redisTemplate.delete(lockKey);
}
return 1;
}
}
1)获取全局唯一 offset
💡 若 Redis 初次启动或 key 不存在,
INCR返回 1;若失败(如集群故障),则尝试从 DB 恢复(见容错机制)。
String offsetKey = "user_tag:incr:latestOffset";
Long offset = redisTemplate.opsForValue().increment(offsetKey);
- 使用
Redis的INCR原子操作生成递增 offset。 - 作用:作为每条增量数据的“唯一
ID”和排序依据(ZSET的score)。
2)容灾兜底 —— 从 DB 恢复 offset(可选)
设计意图:保证系统在 Redis 不可用时仍能安全恢复,避免 offset 重置导致数据重复/丢失
- 当
Redis异常(如集群故障、key被误删),尝试从数据库恢复最新offset - 使用
Redis分布式锁 防止并发初始化冲突
if (offset == null || offset <= 0) {
offset = restoreOffsetFromDb(datasetName, offsetKey);
}
3)计算分片 ID 并写入 ZSet
long shardId = (offset - 1) / maxMembersSize;
String zsetKey = REDIS_INCR_CURRENT_SHARD_KEY.format(datasetName, shardId);
- 分片策略:每
maxMembersSize条数据一个 ZSet(如 10 万条/分片) - Score = offset:保证按写入顺序排序,便于下游按 offset 范围拉取
- 自动过期:每个分片独立设置 TTL,避免内存无限增长
4)写入 ZSET(按 offset 排序)
redisTemplate.opsForZSet().add(zsetKey, data, (double) offset);
redisTemplate.expire(zsetKey, ttlSeconds); // 如 86400s = 24h
ZSET结构:member= 序列化后的业务数据(如JSON字符串)score=offset(保证全局有序)
- 设置
TTL自动过期,避免无限堆积。
5)建立时间索引(Time-to-Offset 映射)
long incrTimeSecond = executionConfig.getIncrIntervalTimeSecond(dateTime, 1); // 按秒对齐
String timeOffsetKey = "user_tag:incr:timeOffset:" + incrTimeSecond;
redisTemplate.opsForValue().set(timeOffsetKey, String.valueOf(offset), ttl);
- 作用:支持“按时间查询该时刻之后的所有增量”。
- 例如:想知道今天
10:00之后有哪些变更?先查timeOffset:1730985600得到 offset=12345,再从 offset=12345 开始拉取。
5、代码实现
1)普通 redis (并发风险)
package com.healerjean.proj.hotcache.incr;
import com.healerjean.proj.hotcache.config.DatasetIncrementalConfig;
import com.healerjean.proj.hotcache.config.IncrementalExecutionConfig;
import com.healerjean.proj.hotcache.config.SnapshotGlobalConfig;
import com.healerjean.proj.hotcache.enums.SnapshotPathEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.Duration;
import java.time.LocalDateTime;
/**
* 消息队列消费者 - 处理增量数据写入
*
* @author zhangyujin
* @date 2025/11/3
*/
@Slf4j
@Component
public class IncrementalWriteService {
/***
* snapshotGlobalConfig
*/
@Resource
private SnapshotGlobalConfig snapshotGlobalConfig;
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 增量任务写入
*
* @param datasetName datasetName
*/
public void write(String data, String datasetName) {
IncrementalExecutionConfig executionConfig = snapshotGlobalConfig.instanceIncrementalConfig(datasetName);
DatasetIncrementalConfig incrementalConfig = executionConfig.getIncrementalConfig();
Long ttlSeconds = incrementalConfig.getTtlSeconds();
LocalDateTime dateTime = LocalDateTime.now();
Duration ttl = Duration.ofSeconds(ttlSeconds);
// 1. 获取全局唯一 offset
String offsetKey = SnapshotPathEnum.REDIS_INCR_LATEST_OFFSET_KEY.format(datasetName);
Long offset = redisTemplate.opsForValue().increment(offsetKey);
// 2. 如果 Redis 返回 null(如 key 不存在且 incr 失败),从 DB 恢复
if (offset == null || offset <= 0) {
offset = restoreOffsetFromDb(datasetName, offsetKey);
}else {
// 6. todo 将 offset 写入数据库
}
// 2. 根据 offset 计算分片 ID(每 MAX_MEMBERS_PER_ZSET 一个分片)
Integer maxMembersSize = incrementalConfig.getMaxMembersSize();
long shardId = (offset - 1) / maxMembersSize;
String zsetKey = SnapshotPathEnum.REDIS_INCR_CURRENT_SHARD_KEY.format(datasetName, shardId);
// 3. 写入 ZSET(score = offset)
Boolean added = redisTemplate.opsForZSet().add(zsetKey, data, (double) offset);
if (Boolean.TRUE.equals(added)) {
redisTemplate.expire(zsetKey, ttl);
}
// 4. 写入时间索引
long incrTimeSecond = executionConfig.getIncrIntervalTimeSecond(dateTime, 1);
String timePeriodKey = SnapshotPathEnum.REDIS_INCR_TIME_PERIOD_KEY.format(datasetName);
String timeOffset = SnapshotPathEnum.REDIS_INCR_TIME_OFFSET_KEY.format(datasetName, String.valueOf(incrTimeSecond));
redisTemplate.opsForValue().set(timeOffset, offset.toString(), ttl);
redisTemplate.opsForZSet().add(timePeriodKey, String.valueOf(incrTimeSecond), (double) incrTimeSecond);
Long currentSize = redisTemplate.opsForZSet().size(timePeriodKey);
if (currentSize != null && currentSize > maxMembersSize) {
long removeCount = currentSize - maxMembersSize;
redisTemplate.opsForZSet().removeRange(timePeriodKey, 0, removeCount - 1);
}
redisTemplate.expire(timePeriodKey, ttl);
// 6. todo 将 offset 写入数据库
}
/**
* 从数据库恢复
*
* @param datasetName datasetName
* @param offsetKey offsetKey
* @return {@link Long}
*/
private long restoreOffsetFromDb(String datasetName, String offsetKey) {
String lockKey = "lock:offset:init:" + datasetName;
Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", Duration.ofSeconds(10));
if (Boolean.FALSE.equals(locked)) {
throw new RuntimeException("Interrupted while waiting for offset init");
}
try {
// 从 DB 读取最新 offset
// Long dbOffset = offsetRepository.getCurrentOffsetFromDb(datasetName);
// long nextOffset = dbOffset + 1;
// redisTemplate.opsForValue().set(offsetKey, String.valueOf(nextOffset));
// return nextOffset;
} finally {
redisTemplate.delete(lockKey);
}
return 1;
}
}
2)LUA-redis
package com.healerjean.proj.hotcache.incr;
import com.healerjean.proj.hotcache.config.DatasetIncrementalConfig;
import com.healerjean.proj.hotcache.config.IncrementalExecutionConfig;
import com.healerjean.proj.hotcache.config.SnapshotGlobalConfig;
import com.healerjean.proj.hotcache.enums.SnapshotPathEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
/**
* 消息队列消费者 - 处理增量数据写入
*
* @author zhangyujin
* @date 2025/11/3
*/
@Slf4j
@Component
public class IncrementalLuaWriteService {
private final static RedisScript<String> WRITE_INCREMENTAL_SCRIPT = RedisScript.of(
"redis.call('ZADD', KEYS[1], ARGV[2], ARGV[1]);" +
"redis.call('EXPIRE', KEYS[1], ARGV[3]);" +
"redis.call('SET', KEYS[2], ARGV[2], 'EX', ARGV[3]);" +
"redis.call('ZADD', KEYS[3], ARGV[4], ARGV[4]);" +
"redis.call('EXPIRE', KEYS[3], ARGV[3]);" +
"local maxMembers = tonumber(ARGV[5]);" +
"local currentSize = redis.call('ZCARD', KEYS[3]);" +
"if currentSize > maxMembers then " +
" local removeCount = currentSize - maxMembers; " +
" redis.call('ZREMRANGEBYRANK', KEYS[3], 0, removeCount - 1); " +
"end; " +
"return 1;",
Long.class
);
/***
* snapshotGlobalConfig
*/
@Resource
private SnapshotGlobalConfig snapshotGlobalConfig;
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 增量任务写入
*
* @param datasetName datasetName
*/
public void write(String data, String datasetName) {
IncrementalExecutionConfig executionConfig = snapshotGlobalConfig.instanceIncrementalConfig(datasetName);
DatasetIncrementalConfig incrementalConfig = executionConfig.getIncrementalConfig();
Long ttlSeconds = incrementalConfig.getTtlSeconds();
Integer maxMembersSize = incrementalConfig.getMaxMembersSize();
LocalDateTime now = LocalDateTime.now();
// 1. 获取全局唯一 offset
String offsetKey = SnapshotPathEnum.REDIS_INCR_LATEST_OFFSET_KEY.format(datasetName);
Long offset = redisTemplate.opsForValue().increment(offsetKey);
if (offset == null || offset <= 0) {
offset = restoreOffsetFromDb(datasetName, offsetKey);
// 注意:restore 后建议 SET offsetKey offset+1 避免重复,此处简化
}else {
// 6. todo 将 offset 写入数据库
}
// 2. 计算 shardId 和所有 keys
long shardId = (offset - 1) / maxMembersSize;
String shardKey = SnapshotPathEnum.REDIS_INCR_CURRENT_SHARD_KEY.format(datasetName, shardId);
long incrTimeSecond = executionConfig.getIncrIntervalTimeSecond(now, 1);
String timeOffsetKey = SnapshotPathEnum.REDIS_INCR_TIME_OFFSET_KEY.format(datasetName, String.valueOf(incrTimeSecond));
String timePeriodKey = SnapshotPathEnum.REDIS_INCR_TIME_PERIOD_KEY.format(datasetName);
// 3. 准备 Lua 参数
List<String> keys = Arrays.asList(shardKey, timeOffsetKey, timePeriodKey);
List<String> args = Arrays.asList(
data,
String.valueOf(offset),
String.valueOf(ttlSeconds),
String.valueOf(incrTimeSecond),
String.valueOf(maxMembersSize)
);
// 4. 执行 Lua 脚本
redisTemplate.execute(WRITE_INCREMENTAL_SCRIPT, keys, args);
}
/**
* 从数据库恢复
*
* @param datasetName datasetName
* @param offsetKey offsetKey
* @return {@link Long}
*/
private long restoreOffsetFromDb(String datasetName, String offsetKey) {
String lockKey = "lock:offset:init:" + datasetName;
Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", Duration.ofSeconds(10));
if (Boolean.FALSE.equals(locked)) {
throw new RuntimeException("Interrupted while waiting for offset init");
}
try {
// 从 DB 读取最新 offset
// Long dbOffset = offsetRepository.getCurrentOffsetFromDb(datasetName);
// long nextOffset = dbOffset + 1;
// redisTemplate.opsForValue().set(offsetKey, String.valueOf(nextOffset));
// return nextOffset;
} finally {
redisTemplate.delete(lockKey);
}
return 1;
}
}
6、FAQ
1)容错与恢复机制
a、Redis offset 丢失怎么办?
- 通过分布式锁
lock:offset:init:user_tag防止并发初始化。 - 从数据库读取最新
offset,+1后写回Redis。注意上锁,防止并发 - 注意:当前代码中
DB恢复逻辑被注释,需实现offsetRepository.getCurrentOffsetFromDb()。
b、数据重复 or 丢失?
- 不保证
exactly-once,但可通过 offset 幂等处理(消费端去重)。 - 建议:业务数据自带版本号或时间戳,便于去重。
三、全量加载方案
1、整体目标
- 启动时加载最新全量快照(从本地或
OSS下载并反序列化)。 - 读取
Redis中的元数据(包括incrPullTimeLowerBound和当前 offset)。 - 增量拉取:
- 从
RedisZSET拉取增量数据(基于时间区间 + offset)。 - 对内存中的全量数据做 原地更新(
add/delete/modify)。
- 从
- 定时任务(每5秒):
- 根据本地记录的
offset,持续拉取新写入的增量数据。 - 更新本地
offset。
- 根据本地记录的
2、全量加载
1)定时-加载方案
Redis+ 轮询(简单,适合容忍延迟)
-
客户端随机延迟(最简单有效):每台机器收到“新版本可用”后,不是立即下载,而是随机等待
0 ~ T秒再开始。 -
单机限流:刷新期间进行限流,比如多分片同时下载等
while (true) {
Long latest = redis.get("latest_snapshot_version");
if (latest > currentVersion) {
downloadFromCDN(latest);
break;
}
Thread.sleep(5000); // 每 5 秒查一次
}
2)代码实现
package com.healerjean.proj.hotcache.service.pull;
import com.alibaba.fastjson.JSON;
import com.google.gson.Gson;
import com.healerjean.proj.hotcache.config.DatasetSnapshotConfig;
import com.healerjean.proj.hotcache.config.SnapshotGlobalConfig;
import com.healerjean.proj.hotcache.enums.SnapshotPathEnum;
import com.healerjean.proj.hotcache.factory.SnapshotFactory;
import com.healerjean.proj.hotcache.model.SnapshotMetadata;
import com.healerjean.proj.hotcache.model.UserTag;
import com.healerjean.proj.hotcache.service.cache.InMemoryUserTagCache;
import com.healerjean.proj.hotcache.service.serialization.DataSerializerStrategy;
import com.healerjean.proj.hotcache.service.storage.StorageServiceStrategy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
/**
* FullLoadToMemoryService
*
* @author zhangyujin
* @date 2025/11/6
*/
@Service
@Slf4j
public class FullLoadToMemoryService {
@Autowired
private SnapshotGlobalConfig snapshotGlobalConfig;
@Autowired
private InMemoryUserTagCache userTagCache;
@Resource
private RedisTemplate<String, String> redisTemplate;
public void loadLatestFullSnapshotToMemory(String datasetName) throws Exception {
// 1. 从 Redis 获取最新版本
String latestKey = SnapshotPathEnum.REDIS_SNAPSHOT_LATEST_VERSION_KEY.format(datasetName);
String latestVersion = redisTemplate.opsForValue().get(latestKey);
if (latestVersion == null) {
throw new IllegalStateException("No full snapshot found in Redis");
}
// 2. 获取元数据(含 manifestUrl)
String metaKey = SnapshotPathEnum.REDIS_SNAPSHOT_VERSION_META_KEY.format(datasetName, latestVersion);
String metaJson = redisTemplate.opsForValue().get(metaKey);
SnapshotMetadata metadata = new Gson().fromJson(metaJson, SnapshotMetadata.class);
// 3. 根据 storageStrategy 下载所有分片文件并反序列化
DatasetSnapshotConfig snapshotConfig = metadata.getConfig();
StorageServiceStrategy storage = SnapshotFactory.getInstance().getStorageServiceStrategy(datasetName, snapshotConfig.getStorageStrategy());
DataSerializerStrategy dataSerializer = SnapshotFactory.getInstance().getDataSerializer(datasetName, snapshotConfig.getSerializerStrategy());
for (SnapshotMetadata.FileInfo fileInfo : metadata.getFileInfoMap().values()) {
try (InputStream is = storage.download(fileInfo.getFilename())) {
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line;
while ((line = reader.readLine()) != null) {
if (!line.trim().isEmpty()) {
try {
UserTag userTag = (UserTag)dataSerializer.deserialize(line.getBytes(StandardCharsets.UTF_8));
userTagCache.put(userTag.getUserId(), userTag);
}catch (Exception e){
log.error("line:{}, fileInfo:{}", line, JSON.toJSONString(fileInfo), e);
throw new RuntimeException(e.getMessage(), e);
}
}
}
}
}
log.info("✅ 全量加载完成,共 {} 条记录", userTagCache.getAll().size());
}
}
四、增量加载方案
1、背景与目标
在高并发、低延迟的用户标签系统中,内存缓存(如
InMemoryUserTagCache)是提升查询性能的关键。但如何高效、可靠地将 Redis 中的增量数据同步到内存,是一个挑战。核心目标:从 Redis 中按偏移量(
offset)或时间下界(time lower bound)拉取增量数据,并持续加载到本地内存缓存中,实现“热更新”而不中断服务。
2、核心组件概览
1)方法提供
该服务提供两个主要入口方法:
| 方法 | 功能 |
|---|---|
pullIncrementalByOffset(String datasetName) |
基于偏移量(offset)持续拉取增量数据 |
pullIncrementalByLowerBound(String datasetName) |
基于时间下界定位起始 offset,再调用 pullIncrementalByOffset |
2)Redis 数据结构依赖
| Key 类型 | 枚举 | 用途 |
|---|---|---|
String |
REDIS_SNAPSHOT_LATEST_VERSION_KEY |
存储最新快照版本号 |
String |
REDIS_SNAPSHOT_VERSION_META_KEY |
存储快照元数据(含 incrPullTimeLowerBound) |
ZSet |
REDIS_INCR_TIME_PERIOD_KEY |
时间戳 → 用于查找起始时间 |
String |
REDIS_INCR_TIME_OFFSET_KEY |
时间戳 → offset 映射 |
ZSet |
REDIS_INCR_CURRENT_SHARD_KEY |
分片增量数据(score=offset, value=事件内容) |
3)关键设计
a、数据分片(Sharding)机制
- 增量数据存储在
Redis的ZSET中,按score= offset 排序。 - 由于
ZSET有性能上限(如成员数限制),系统采用分片策略:- 每个分片最多容纳
maxMembersSize条记录。 - 分片
ID计算公式:shardId = (offset - 1) / maxMembersSize - 对应
Redis Key:REDIS_INCR_CURRENT_SHARD_KEY.format(datasetName, shardId)
- 每个分片最多容纳
b、本地偏移量追踪
- 每个
datasetName维护一个原子偏移量,表示已处理的最大offset。 - 初始值由
pullIncrementalByLowerBound根据时间戳反查得到。
private final static Map<String, AtomicLong> LOCAL_PROCESSED_OFFSET_MAP = new ConcurrentHashMap<>();
3)代码实现
a、通过时间拉取
/**
* 时间驱动的起始点定位
*/
public void pullIncrementalByLowerBound(String datasetName) {
// 1、从 Redis 获取最新快照版本(latestVersion)
String latestKey = SnapshotPathEnum.REDIS_SNAPSHOT_LATEST_VERSION_KEY.format(datasetName);
String latestVersion = redisTemplate.opsForValue().get(latestKey);
if (latestVersion == null) {
throw new IllegalStateException("No full snapshot found in Redis");
}
// 2. 解析其元数据 SnapshotMetadata,获取 incrPullTimeLowerBound(时间戳)
String metaKey = SnapshotPathEnum.REDIS_SNAPSHOT_VERSION_META_KEY.format(datasetName, latestVersion);
String metaJson = redisTemplate.opsForValue().get(metaKey);
SnapshotMetadata metadata = new Gson().fromJson(metaJson, SnapshotMetadata.class);
long incrPullTimeLowerBound = metadata.getIncrPullTimeLowerBound();
String timePeriodKey = SnapshotPathEnum.REDIS_INCR_TIME_PERIOD_KEY.format(datasetName);
// 3、 查找 >= 时间下界的第一个时间点
Set<String> candidates = redisTemplate.opsForZSet().rangeByScore(timePeriodKey, (double) incrPullTimeLowerBound, Double.MAX_VALUE, 0, 1);
if (CollectionUtils.isEmpty(candidates)) {
LOCAL_PROCESSED_OFFSET_MAP.put(datasetName, new AtomicLong(0));
log.info("No time period found >= {} for dataset {}", incrPullTimeLowerBound, datasetName);
return;
}
// 4、通过 REDIS_INCR_TIME_OFFSET_KEY 反查对应的 offset
String firstTimeStr = candidates.iterator().next();
String offsetKey = SnapshotPathEnum.REDIS_INCR_TIME_OFFSET_KEY.format(datasetName, firstTimeStr);
String offsetValue = redisTemplate.opsForValue().get(offsetKey);
long latestOffset = Long.parseLong(offsetValue);
// 5、初始化 LOCAL_PROCESSED_OFFSET_MAP 并启动 pullIncrementalByOffset
LOCAL_PROCESSED_OFFSET_MAP.put(datasetName, new AtomicLong(latestOffset));
pullIncrementalByOffset(datasetName);
}
b、通过游标拉取
@Autowired
private SnapshotGlobalConfig snapshotGlobalConfig;
@Autowired
private InMemoryUserTagCache userTagCache;
@Resource
private RedisTemplate<String, String> redisTemplate;
private static final long PULL_BATCH_SIZE = 300L;
/**
* 本地已处理的偏移量(必须先通过 pullIncrementalByLowerBound 初始化)
*/
private final static Map<String, AtomicLong> LOCAL_PROCESSED_OFFSET_MAP = new ConcurrentHashMap<>();
/**
* 增量拉取流程
*
* @param datasetName datasetName
*/
public void pullIncrementalByOffset(String datasetName) {
if (!LOCAL_PROCESSED_OFFSET_MAP.containsKey(datasetName)) {
log.warn("Offset not initialized for dataset: {}. Skipping pull. Call pullIncrementalByLowerBound first.", datasetName);
return;
}
IncrementalExecutionConfig executionConfig = snapshotGlobalConfig.instanceIncrementalConfig(datasetName);
DatasetIncrementalConfig incrementalConfig = executionConfig.getIncrementalConfig();
Integer maxMembersSize = incrementalConfig.getMaxMembersSize();
AtomicLong atomicCurrentOffset = LOCAL_PROCESSED_OFFSET_MAP.get(datasetName);
while (true) {
// 1、计算当前应读取的 shardId,
long currentOffset = atomicCurrentOffset.get();
String latestKey = SnapshotPathEnum.REDIS_INCR_LATEST_OFFSET_KEY.format(datasetName);
String latestOffsetStr = redisTemplate.opsForValue().get(latestKey);
long latestOffset = Long.parseLong(latestOffsetStr);
if (currentOffset >= latestOffset) {
// 当前 shard 已追上最新 offset → 结束
log.debug("Dataset {} caught up to latest offset {}, sleeping...", datasetName, latestOffset);
break;
}
// 2、从 Redis ZSET 拉取一批数据
long shardId = currentOffset / maxMembersSize;
String shardKey = SnapshotPathEnum.REDIS_INCR_CURRENT_SHARD_KEY.format(datasetName, shardId);
long nextOffset = currentOffset + 1;
double maxScore = Math.min(nextOffset + PULL_BATCH_SIZE - 1, latestOffset);
Set<ZSetOperations.TypedTuple<String>> tuples = redisTemplate.opsForZSet().rangeByScoreWithScores(shardKey, (double) nextOffset, maxScore);
// 3、防止拉取数据为空,且当前 offset 未更新,则继续拉取
if (CollectionUtils.isEmpty(tuples)) {
// 当前 shard 无数据,尝试跳转到下一个 shard 起始位置
long nextShardStart = (shardId + 1) * maxMembersSize;
long jumpTo = Math.min(nextShardStart, latestOffset);
// 如果跳转幅度很大(比如超过一个 batch),先小步试探
if (jumpTo - currentOffset > PULL_BATCH_SIZE * 2) {
// 只前进一小步,避免跨度过大跳过异常数据
jumpTo = currentOffset + 1;
}
if (jumpTo > currentOffset) {
atomicCurrentOffset.set(jumpTo);
log.debug("Advanced offset to {} for dataset {}", jumpTo, datasetName);
continue;
}
// 已到最新,但仍无数据 → 可能数据延迟,稍后重试
log.debug("No data found in shard {} for dataset {}, but not caught up yet.", shardId, datasetName);
break;
}
// 4、批量处理 & 更新偏移量
for (ZSetOperations.TypedTuple<String> data : tuples) {
Double score = data.getScore();
String value = data.getValue();
}
// 5、更新本地 offset 为本次拉取的最大 score
// 更新 offset 为本次最大 score(注意:不再 +1)
long maxProcessedOffset = tuples.stream()
.mapToLong(t -> Objects.requireNonNull(t.getScore()).longValue())
.max()
.orElse(currentOffset);
atomicCurrentOffset.set(maxProcessedOffset);
log.debug("Pulled {} records for dataset {}, offset updated to {}",
tuples.size(), datasetName, maxProcessedOffset);
}
}
五、数据验证方案
1、可观测性验证
1)关键验证清单
| 验证类别 | 关键问题 | 验证手段 |
|---|---|---|
| 数据完整 | 全量是否丢数据? | 行数对比 + 抽样校验 |
| 原子发布 | 是否读到半成品? | 中断测试 + 文件检查 |
| 容错恢复 | Redis 挂了能否恢复? |
清空 offset + 重启 |
| 性能达标 | 能否支撑5000台? | 压测 + CDN 并发测试 |
| 自动清理 | 存储是否会爆炸? | 模拟多版本 + 检查清理 |
2)必须具备的监控项:
| 类别 | 监控项 | 验证方式 |
|---|---|---|
| 全量 | 快照生成耗时、文件大小、recordCount |
查看日志或 Prometheus 指标 |
| 增量 | 当前 offset、每秒处理量、分片数量 | Redis 监控 + 应用指标 |
| 存储 | OSS/S3 文件数量、总大小 | 存储平台控制台 |
| 一致性 | 各节点缓存差异率 | 日志采样比对 |
| 错误 | 序列化失败、网络超时、DB 连接异常 | 告警日志 |
2、数据准确性验证
1)数据完整性验证
- 目标:确认全量快照包含所有应有记录,无遗漏。
- 方法:
- 对比源数据库(如
MySQL)中user_tag表的总行数vs快照元数据中的recordCount。 - 抽样校验:随机选取若干
user_id,检查其在快照文件和数据库中的字段是否一致(如tag_value,update_time)。 - 检查分片总数 × 平均每片记录数 ≈ 总记录数(避免某分片丢失
- 对比源数据库(如
2)文件原子性与一致性验证
- 目标:确保客户端不会读到“半成品”快照。
- 方法:
- 在快照生成过程中
kill进程,重启后检查:- 是否存在
.tmp文件(说明未完成发布)。 - 确保
Redis中的latest版本是否仍为旧版本(未错误更新)。
- 是否存在
- 发布完成后,检查所有分片文件是否存在且命名规范(无
.tmp后缀)。
- 在快照生成过程中
3)压缩与序列化正确性验证
- 目标:
Protobuf+GZIP能正确反序列化。 - 方法:
- 手动下载一个
.pb.gz文件,用工具(如protoc --decode或自研解析脚本)解压并解析。 - 验证解析出的数据结构与原始记录一致(字段名、类型、值)。
- 手动下载一个
4)5000 台机器数据一致性验证
- 目标:所有节点最终状态基本一致(秒级延迟内)。
- 方法:
- 在一台机器上查询某个
user_id的标签。 - 同时在其他多台机器上查询同一
user_id。 - 汇总数据定时上报,通过日志采集 + 聚合分析实现大规模比对。
- 在一台机器上查询某个
3、性能与稳定性验证
1)全量导出性能验证
- 指标:
- 导出耗时 ≤ 10分钟(1000万条)。
- 内存占用 < 200MB(流式处理)。
CPU使用率平稳,无长时间 GC。
- 方法:
- 在生产类似环境压测,监控资源使用。
- 检查日志中是否有
OOM、超时、连接池耗尽等错误。
2)增量吞吐验证
- 指标:
- 支持 ≥ 1万条/秒的增量写入。
Redis ZSET单分片 ≤4000条,避免大Key。
- 方法:
- 使用压测工具模拟高并发增量事件。
- 监控
Redis内存、CPU、网络带宽。 - 检查
LOCAL_PROCESSED_OFFSET_MAP是否持续增长(无卡顿)。
3)CDN 下载并发验证
- 目标:
5000台机器能快速下载快照。 - 方法:
- 模拟 1000~5000 台客户端并发从
CDN下载.pb.gz文件。 - 测量平均下载时间、失败率。
- 确认
CDN缓存命中率 >99%(首台回源,其余命中边缘节点)。
- 模拟 1000~5000 台客户端并发从
4)CPU 影响
“少分配、慢处理、错峰干、分隔离”,内存服务的性能瓶颈往往不在计算,而在 对象创建过多 → 触发频繁 GC → STW(Stop-The-World)卡顿。
| 目标 | 具体怎么做 |
|---|---|
降 GC |
流式处理 + 避免中间对象 + 用原生数组 + 对象池(谨慎) |
稳 CPU |
错峰全量 + 限流增量 + 分批拉取(每批 ≤1000 条) |
| 优内存 | 扁平结构 + 不可变全量快照 + 避免 HashMap 嵌套 |
调 JVM |
固定堆 + G1/ZGC + 监控 GC 日志 |
| 做隔离 | 拉取和服务分离进程,或用 cgroup 限 CPU |
a、控制拉取节奏 → 避免 CPU 突刺
全量 + 多源增量同时拉,CPU 瞬间打满,GC 跟着暴增。
- 错峰调度全量任务
- 不要所有数据源同时开始全量;
- 通过配置中心或
cron控制启动时间。
- 限制增量消费速率
- 给每个数据源的增量拉取设置 最大吞吐上限(如 5000 条/秒);
- 超过就
sleep或丢弃背压(根据业务容忍度);
- 流式处理,禁止“全载入再处理”
- 全量导出时,边读边解析边更新内存,不要先把 1000 万条全 load 到 List 里;
- 用
cursor/scroll API分批拉(每批 1000 条),处理完一批再拉下一批。
b、内存结构优化 → 减少 GC Roots
-
使用紧凑内存布局
-
Map<Long, String>(HashMap节点对象多,GC压力大)。 -
例如:把 user_id → tag 映射,用两个平行数组实现(类似列式存储):
user_ids: [1001, 1002, 1003, ...] tags: ["VIP", "NORMAL", "TEST", ...]Map<Long, String>(HashMap 节点对象多,GC 压力大)
-
-
避免嵌套对象和长链引用
- 对象图越扁平,
GC扫描越快; - 不要用
User -> Profile -> Settings -> Map<...>这种深结构。
- 对象图越扁平,
-
定期“冻结”全量数据
- 全量加载完成后,把数据结构标记为 不可变(
Immutable); - 后续增量只更新增量缓存,查询时
merge; - 不可变对象更容易被
JVM优化,且不会在老年代频繁修改。
- 全量加载完成后,把数据结构标记为 不可变(


