前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

一、全量数据预热系统

  • 数据不丢:即使中间服务假死,恢复后也能追上。
  • 一致性5000 台机器数据基本一致(允许秒级延迟)
  • 性能:避免大 Key 、避免全量替换、避免高 CPU/GC
  • 可扩展:支持每日全量刷新 + 增量同步

二、全量写入方案

1、方案目标

目标 描述
支持千万级数据全量加载 数据总量约 10GB(每条记录 ~1KB),需避免 OOM 和长时间阻塞
高效存储与传输 减少磁盘占用、网络带宽消耗,优先选择紧凑序列化格式,每个数据源可选序列化方式,可选压测
分片并行写入加速 利用多线程提升写入速度,缩短整体导出耗时
原子性发布机制 防止客户端读取到“部分完成”的快照文件
版本控制与通知机制 实现服务端主动通知,确保下游及时感知新版本
自动清理旧快照 避免 OSS/S3 存储无限膨胀,保留最近 7 天历史
增量起点可追溯 记录本次全量开始时间,作为后续增量拉取的起始位点
生产级稳定性保障 支持失败重试、临时文件保护、日志监控

2、整体架构概览

  • 数据源读取:基于主键游标分页,流式加载,避免 OOM。
  • 分片写入:按 userId 哈希分发到多个分片,并行写入。
  • 序列化压缩:使用 Protobuf + GZIP,极致压缩。
  • 原子发布:先写 .tmp 临时文件,完成后 rename 发布。
  • 版本通知:将最新版本号写入 Redis,下游轮询感知。
  • 清理旧版本:保留最近 7 天快照,防止存储膨胀
+------------------+     +----------------------------+
|   源系统 (MySQL)   | --> | 分页读取 + 流式处理引擎      |
+------------------+     +--------------+-------------+
                                         |
                                         v
                       +----------------------------------+
                       | 按 userId 哈希分发到 N 个分片写入器 |
                       +----------------+-----------------+
                                        |
                                        v
                   +--------------------+--------------------+
                   |            对象存储层 (OSS / S3)           |
                   |  full_data_v{version}_{shardId}.pb.gz     |
                   +--------------------+--------------------+
                                        |
                                        v
                             +----------+-----------+
                             |    Redis 版本中心     |
                             | snapshot:user_tag:latest |
                             +----------+-----------+
                                        |
                                        v
                            +-----------+------------+
                            | 下游服务(定时轮询 Redis)  |
                            | 下载分片 → 解压 → 加载内存  |
                            +-----------+------------+

3、核心组件详解

1)数据源读取模块

  • 问题背景:直接 SELECT * FROM table 会一次性加载全部数据,导致内存溢出(OOM),尤其在千万级以上数据量时不可接受。

  • 解决方案:基于主键的游标分页

    • 使用 user_id(主键或唯一索引)作为分页锚点。
    • 每次读取 1W 条,避免内存堆积。
    • 不依赖 OFFSET,避免深度分页性能下降。
SELECT user_id, tag_id, tag_value, update_time
FROM user_tag
WHERE user_id > ?
ORDER BY user_id ASC
LIMIT 10000;
public List<UserTag> findByPage(Long lastId, int pageSize) {
    String sql = "SELECT ... WHERE user_id > ? ORDER BY user_id LIMIT ?";
    return jdbcTemplate.query(sql, (rs, rowNum) -> mapRowToUserTag(rs), lastId, pageSize);
}

2)序列化格式

推荐使用 Protobuf + GZIP 组合,在压缩率和解析性能之间取得最佳平衡,适合跨语言、高性能场景。

  • 内部高性能服务通信 → 用 Protobuf
  • 对外 REST API / 调试友好 → 用 JSON(或 JSON + Gzip 如果带宽紧张)
  • 持久化存储 / 批量数据导出 → 用 Protobuf + Gzip
  • 已有 JSON 系统想降带宽 → 快速启用 JSON + Gzip(零代码改造)
格式 大小对比 性能对比 是否推荐 说明
JSON 100% 1x 可读性好,但体积大、性能低,不适合高性能或带宽敏感场景
JSON + Gzip ~30% ~0.8x(略慢) 体积显著减小,适合网络传输;但 CPU 开销增加,且仍需解析文本
Protobuf ~40% 3x 更快 ✅✅ 二进制、高效、跨语言,适合内部服务通信和存储
Protobuf + Gzip ~15% 2.5x ✅✅✅ 极致压缩 + 高性能,适合冷数据存储或高成本带宽场景

3)OSS 数据分片策略(负载均衡 + 小文件优化)

a、分片大小

参数 原始方案(1000片) 优化后方案(100片)
单片大小 ~10MB ~100–150MB
文件数量 1000 100
并发连接 高(易超限) 适中
OSS 元数据压力 极高 可控
  • 为什么改为 100 片?

    • SS/S3 最佳实践建议单个对象大小 ≥ 100MB,以减少元数据开销和列举延迟
  • 1000 个小文件会导致 OSS 元数据压力大、列举慢、打开开销高。

    • 100 个分片可在大多数集群环境中良好并行处理。
  • 哈希算法改进(防倾斜):

    • int shardId = Math.floorMod(Objects.hash(userId), SHARD_COUNT);
      
    • 使用 Objects.hash() 提供更好的散列分布。

    • Math.floorMod 确保结果为正整数,避免负索引异常。

b、OSS 性能优化

问题1:OSS 能否支持数千台机器同时下载同一个文件?

答案::完全可以,但必须优化方式,不能 5000 台机器同时下载,导致 所有请求打到同一个 Object,这样会导致

  • OSS 内部热点(虽然会自动负载均衡)
  • 出口带宽瓶颈(如果都在一个 VPC 出口)
  • 下载慢、超时、失败率高

问题2:正确做法是什么呢?

答案:正确做法:OSS + CDN 加速(推荐方案)

  • 10GB 文件,CDN 缓存后,3000 台机器可在 10 分钟内全部下载完成(平均 50MB/s)
  • CDN 缓存策略
    • 缓存 .pb.gz 文件,TTL 设置为 0(不自动过期)
    • 手动调用 Refresh 刷新缓存(每次发布新版本时)
  • OSS + CDN 开启 HTTPS
  • 使用内网 CDN(如果支持) → 更快、更便宜
优势 说明
首请求回源,后续命中缓存 第一台机器下载后,CDN 缓存整个文件
就近接入点(Edge)下载 每台机器从最近的 CDN 节点拉取,速度快
支持百万级并发下载 CDN 天生为高并发设计
减轻 OSS 压力 只有少量回源请求

4)存储层设计: 压缩上传与原子发布

a、压缩流构建(GZIP + Protobuf

使用 GZIPOutputStream 包裹输出流,自动压缩:

OutputStream rawStream = storage.getOutputStream(tempFilename);
GZIPOutputStream gos = new GZIPOutputStream(rawStream);
CodedOutputStream cos = CodedOutputStream.newInstance(gos);

// 写入每条消息(delimited)
for (UserTag record : batch) {
    UserTagProto proto = convertToProto(record);
    proto.writeDelimitedTo(cos); // 自动添加长度前缀
}
cos.flush();
gos.close(); // 触发压缩结束

b、原子发布机制(临时文件 + rename

  • 若任务中断,临时文件存在,下次可恢复;
  • 客户端只看到完整发布的文件,不会读到半成品;
  • 支持多种底层存储(本地/OSS/S3)统一语义。
// 写入 .tmp 临时文件
String tempFile = filename + ".tmp";
try (OutputStream os = storage.getOutputStream(tempFile)) {
    writeData(os);
}

// 原子性发布:rename to final name
storage.publish(tempFile, filename); 

5)版本通知机制:Redis 替代 ZooKeeper

a、 ZooKeeper 的局限性(不推荐用于事件广播)

风险 说明
ZK 瞬时推送风暴 一次 setData(),ZK 向数千台机器发送事件
ZK 延迟升高 大量 Watcher 导致协调服务响应变慢
客户端处理不过来 数千台机器同时发起下载 → 网络洪峰
ZK 不是消息广播系统 它是协调服务,不适合大规模事件通知

b、推荐方案:Redis 轻量级版本中心

  • TTL 设置:所有 key 设置 7 天过期,与快照生命周期一致。
  • 下游行为:
    • 每隔 1~5 分钟轮询一次 snapshot:{dataSource}:latest
    • 发现版本变化后,下载对应的 manifest 并校验各分片。
 当前最新版本号
snapshot:user_tag:latest → "1730486400"

# 版本详情(Hash)
snapshot:user_tag:1730486400.version → "1730486400"
snapshot:user_tag:1730486400.createTime → "1730486400000"
snapshot:user_tag:1730486400.recordCount → "9876543"
snapshot:user_tag:1730486400.manifestUrl → "manifest/user_tag_v1730486400_manifest.json"

6) 清理策略:保留最近 7 天快照

防止存储无限增长。支持回滚到过去一周内的任意版本。降低备份成本。

public void cleanupOld(String dataSource, long currentVersion) {
    try {
        String prefix = dataSource + "_v";

        for (String file : storage.listFiles(prefix)) {
            Long ver = extractVersion(file);
            if (ver != null && ver < currentVersion) {
                storage.delete(file);
                log.info("🗑️ 删除旧快照文件: {}", file);
            }
        }

        // 清理旧 manifest 和 checkpoint
        cleanupByPrefix("manifest/" + prefix, currentVersion);
        cleanupByPrefix("checkpoints/" + prefix, currentVersion);

    } catch (IOException e) {
        log.error("清理失败", e);
    }
}

private void cleanupByPrefix(String prefix, long currentVersion) throws IOException {
    for (String file : storage.listFiles(prefix)) {
        Long ver = extractVersion(file);
        if (ver != null && ver < currentVersion) {
            storage.delete(file);
            log.info("🗑️ 删除旧辅助文件: {}", file);
        }
    }
}

7)增量衔接:记录全量开始时间

  • 关键字段:

    • startTime: 当前全量任务开始的时间戳(即上次全量完成后第一次增量的起点)

    • 也可使用 max(update_time) 作为更精确的起点,取决于业务语义。

  • 后续增量消费方式:

    来源 拉取方式
    MySQL Binlog update_time > startTime 查询
    Kafka 从对应时间点的 offset 开始消费
    Redis Stream XREAD with $ or timestamp

8)错误处理与健壮性设计

a、健壮性设计

风险 应对措施
写入中途崩溃 使用临时文件,下次启动可判断是否继续
断点续传 记录已处理的最大 user_id,失败后从中断处恢复
ZK 连接断开 自动重连 + 本地缓存最后版本
内存溢出 流式处理,无中间集合缓存
文件未完全写入 只有全部写完才发布版本

b、验证方式

| 验证项 | 方法 | | ———- | ————————————————- | | 断点续传 | kill 进程 → 重启 → 查看日志是否从上次位置继续 | | 数据一致 | 查看下游日志是否打印 ` 加载成功,共 X 条 | | 文件完整性 | cat /data-preload/snapshots/manifest.json | | 断点文件 | cat /data-preload/checkpoints/checkpoint_v.txt` |

9)文件命名规范 & 示例

  • dataSource: 数据源名称(如 user_tag, item_profile
  • version: System.currentTimeMillis(),全局唯一且有序
  • shardId: [0, N-1]
  • .pb.gz: Protobuf + Gzip 压缩格式
full_data_v{version}_{shardId}.pb.gz

示例:
full_data_v1730486400000_0.pb.gz
full_data_v1730486400000_1.pb.gz
...
full_data_v1730486400000_99.pb.gz

10)性能估算(以 1000 万条为例)

指标 数值
原始数据大小 ~10 GB
Protobuf 大小 ~4 GB
Protobuf + Gzip ~1.5 GB
分片数量 100 个
平均每个文件 ~15 MB
导出耗时 5–10 分钟(DB 负载正常)
内存占用 < 200 MB(流式处理)
CPU 占用 中等(Protobuf 序列化)

11)方案优势总结

维度 说明
高效性 Protobuf + Gzip 极致压缩,节省 85% 存储
一致性 临时文件 + ZK 原子发布,杜绝脏读
可维护性 自动清理旧版本,防止数据膨胀
可观测性 版本信息丰富,支持监控告警
扩展性 分片结构天然支持水平扩展
兼容性 Protobuf 跨语言,易于被 Go/Python/C++ 客户端解析

1)流读取,而不是 csv

维度 流式文本读取 EasyExcel 读 CSV
适用格式 纯文本 CSV(每行一个记录) 官方不主推 CSV,需额外配置,功能弱于 Excel
内存占用 极低(逐行读,无缓存) 较低(SAX 模式),但 CSV 解析不如原生流高效
性能 高(无反射、无注解解析) 中等(有对象映射开销)
开发效率 需手动解析字段(split 或 JSON) 可用 @ExcelProperty 自动映射(但 CSV 支持差)
错误处理 精细(可定位到具体行) 较粗(依赖 EasyExcel 异常回调)
依赖 无额外依赖 需引入 easyexcel
CSV 特性支持 需自己处理引号、转义、分隔符等 EasyExcel 的 CSV reader 功能较弱,对 RFC4180 支持不完整

二、增量写入方案

基于 Redis 的增量数据写入方案,用于将消息队列中的增量数据高效、有序地持久化到 Redis 中,并支持按时间窗口和分片(shard)组织数据,同时具备容错恢复能力。

1、背景与目标

1)为什么需要增量快照?

  • 全量快照成本高:每次重新拉取所有用户标签数据(如 user_tag)会消耗大量 I/O、网络和内存。
  • 实时性要求:业务需要近实时感知用户标签变更(如打标、去标),不能依赖小时级/天级全量。
  • 资源隔离:全量与增量应解耦,避免互相干扰。

2)设计目标

  • 高吞吐、低延迟地处理来自消息队列的增量数据;
  • 保证数据顺序性(通过全局递增 offset);
  • 支持按时间范围查询(通过时间索引);
  • 自动分片存储,避免单个 ZSET 过大影响性能;
  • 具备故障恢复能力:当 Redisoffset 丢失时,能从数据库恢复;
  • 控制内存占用:通过 TTL 和分片数量限制自动清理旧数据。

2、方案选型

1)方案对比

a、 Kafka + Redis ZSET + 分片偏移

非常适合数百~数千台机器

1)原理

  • 数据按 offset 写入Redis ZSETscore = offsetmember = data

  • 每个 shard 对应一个 ZSET key(如 incr:dataset:shard0

  • 客户端本地维护 currentOffset,按 offset 范围拉取

  • 分片大小由 maxMembersSize 控制(如每 shard 1000 条)

  • shard 时主动探测下一个 shard 防漏

2)优点

  • 天然支持高并发读
    • Redis 单实例可支撑数万 QPS
    • ZSET rangeByScoreO(logN + M),性能良好。
  • 精确 offset 控制:基于 score 的偏移机制,支持任意 offset 拉取,适合“按位点恢复”。
  • 无需中间件依赖:仅依赖 Redis,部署简单。
  • 支持全量+增量统一模型:全量可视为 指定 offset 开始拉。

3)缺点

  • 偏移量管理复杂
    • 写入端:生产者需按 offset 写入正确 shard(需协调)。
    • 客户端:需要自行维护偏移量,容易出错
  • 空窗口问题
    • 分片边界:若某 shard 无数据,需主动探测下一 shard
    • 切换边界易出错:如 maxMembersSize 变化导致 shard 计算不一致

b、Kafka 广播模式 (首选)

百台以内较合适;数百台勉强可用(需大集群+分区调优);数千台以上不推荐(除非做分层代理)。

1)原理

  • 每台机器启动时生成唯一 group.id(如 hotcache-${host}-${pid}
  • 所有机器订阅同一个 topic,各自独立消费全部 partition
  • offsetKafka 自动管理(__consumer_offsets)

2)优点

  • offset 自动持久化Kafka 自动记录每个 group 的消费位点,重启后自动 resume
  • 断网追赶能力强:只要数据未被 retention 清除(如保留 7 天),可从任意 offset 拉取。
  • 高吞吐、持久化存储Kafka 本身为日志设计,磁盘顺序写,支持 TB 级数据。

3)缺点

  • 运维复杂度高
    • 需维护 Kafka 集群、ZooKeeper/KRaft、监控 lag
    • 连接数爆炸:每台机器一个 consumer,数千台 → 数千个 TCP 连接 + 消费者协调开销。
  • 带宽压力大:每台机器都要拉全量增量数据(广播语义),网络带宽 × N
  • Kafka 集群压力
    • 数千消费者同时拉取同一 topicbroker 负载高
    • 启动风暴:服务批量重启时,所有机器同时拉历史数据,Kafka broker 瞬间被打满。

4)适用场景

  • 数据一致性要求极高的场景
  • 消息量不是特别巨大的情况
  • 客户端数量在数百台以内

5)伪代码

// 简化版实现
@KafkaListener(topics = "#{'${cache.topics}'.split(',')}")
public void handleCacheUpdate(ConsumerRecord<String, CacheUpdateMessage> record) {
    try {
        applyCacheUpdate(record.value());
        // 异步提交偏移量
        kafkaTemplate.commitSync();
    } catch (Exception e) {
        // 处理失败,记录异常但不提交偏移量
        log.error("Failed to process cache update", e);
    }
}

// 服务启动时全量加载 + 增量追赶
@PostConstruct
public void init() {
    // 1. 从OSS加载全量
    loadFullDataFromOSS();
    
    // 2. 寻找合适的Kafka偏移量开始位置
    long startOffset = calculateStartOffset();
    
    // 3. 从指定位置开始消费
    kafkaConsumer.seekToBeginning(Collections.singletonList(partition));
    kafkaConsumer.seek(partition, startOffset);
}

d、Redis Stream + 消费者组

1)原理

  • 使用 Redis 5.0+Stream 数据结构
  • 创建 consumer group,每台机器作为独立 consumer 加入(可共享 group,也可独立)
  • 但你要的是广播,所以每台机器应使用独立 group

2)优点

  • 支持消费组 + offset 自动管理:比 ZSET 更规范。
  • 内存可控Stream 支持 MAXLEN ~ N 自动截断,避免无限增长。

3)缺点

  • 仍是广播模式:每台机器独立 group → 数据全量复制,带宽浪费同 Kafka
  • Redis 单点瓶颈:
    • Stream 写入是单线程(Redis 主线程),高吞吐下易成瓶颈。
    • 数万台 consumer 同时读,Redis CPU 和网络打满。
  • 无水平扩展:
    • Redis ClusterStream 必须在一个 slot(即一个 master),无法分片。
    • 所有读写压在一个 Redis masterStream 无法分片)

4)适用规模

  • 几十 ~ 数百台:可用,优于 ZSET(因有 ACK 和自动截断)。
  • 数千台以上不推荐Redis 成为单点瓶颈,且广播流量大。

2)深度分析

a、服务重启如何加载全量 + 增量?

Kafka 最标准,ZSET 最轻量

  • Kafka:全量从 OSS,增量从 committed offset 拉取(天然支持)。
  • Redis Stream:全量 OSS,增量从本地存储的 last-id 开始 XREAD
  • Redis ZSET:全量 OSS,增量从本地 offset 开始计算 shard 拉取(你已实现)。

b、元数据与连接爆炸问题

维度 Kafka Redis Stream Redis ZSET(分片方案)
内存占用 磁盘存储,内存用于缓存(多节点) 常驻内存 :单节点承载全部压力 常驻内存, shard 可分散多节点
机器扩展 partition + broker 无法扩展(单 key 锁死,单点必崩 shard + Redis 节点
运维管理 高(需监控 lagISR、磁盘) 中(监控内存、连接、trim 低(监控 Redis Cluster
元数据规模 O(消费者数 × 分区数) O(消费者数 + pending 数),极易内存爆炸 O(shard 数)
连接数影响 高连接数,但 分散到多 Broker 高连接数 全压单 Redis 节点,长连接 + 高频监听 → 主线程阻塞 连接池复用,连接分散到 Cluster,短请求 → 低延迟、高吞吐

3、核心组件概览

组件 职责
SnapshotGlobalConfig 配置中心,管理各数据集(如 user_tag)的全量 & 增量参数
IncrementalWriteService 消费消息队列(MQ/Kafka)中的增量事件,写入 Redis
SnapshotPathEnum 定义 Redis 中各类 Key 的命名规范(路径模板)
DatasetIncrementalConfig 增量相关配置项(如分片大小、TTL、落库周期等)

1)关键配置项说明

hot-snapshot:
  datasets:
    user_tag:
      incremental:
        max-members-size: 4000        # 每个 ZSET 最多存 4000 条
        db-save-offset-interval: 100  # 每 100 条落库一次 offset
        ttl-seconds: 86400            # 数据保留 24 小时
        time2-offset-interval: 60     # 每 60 秒记录一次时间偏移(实际代码未用此值!)

2)Redis 数据结构设计

用途 Key 示例 说明
全局 offset user_tag:incr:latestOffset INCR 生成唯一 ID
增量分片 user_tag:incr:shard:0 ZSET,member=data, score=offset
时间偏移映射 user_tag:incr:timeOffset:1730985600 value=offset
时间段记录 user_tag:incr:timePeriod ZSET,记录所有时间戳(用于清理

4、增量写入全流程详解

package com.healerjean.proj.hotcache.incr;


import com.healerjean.proj.hotcache.config.DatasetIncrementalConfig;
import com.healerjean.proj.hotcache.config.IncrementalExecutionConfig;
import com.healerjean.proj.hotcache.config.SnapshotGlobalConfig;
import com.healerjean.proj.hotcache.enums.SnapshotPathEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.Duration;
import java.time.LocalDateTime;

/**
 * 消息队列消费者 - 处理增量数据写入
 *
 * @author zhangyujin
 * @date 2025/11/3
 */
@Slf4j
@Component
public class IncrementalWriteService {


    /***
     * snapshotGlobalConfig
     */
    @Resource
    private SnapshotGlobalConfig snapshotGlobalConfig;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 增量任务写入
     *
     * @param datasetName datasetName
     */
    public void write(String data, String datasetName) {
        IncrementalExecutionConfig executionConfig = snapshotGlobalConfig.instanceIncrementalConfig(datasetName);
        DatasetIncrementalConfig incrementalConfig = executionConfig.getIncrementalConfig();
        Long ttlSeconds = incrementalConfig.getTtlSeconds();
        LocalDateTime dateTime = LocalDateTime.now();
        Duration ttl = Duration.ofSeconds(ttlSeconds);
        // 1. 获取全局唯一 offset


        String offsetKey =  SnapshotPathEnum.REDIS_INCR_LATEST_OFFSET_KEY.format(datasetName);
        Long offset = redisTemplate.opsForValue().increment(offsetKey);
        // 2. 如果 Redis 返回 null(如 key 不存在且 incr 失败),从 DB 恢复
        if (offset == null || offset <= 0) {
            offset = restoreOffsetFromDb(datasetName, offsetKey);
        }

        // 2. 根据 offset 计算分片 ID(每 MAX_MEMBERS_PER_ZSET 一个分片)
        Integer maxMembersSize = incrementalConfig.getMaxMembersSize();
        long shardId = (offset - 1) / maxMembersSize;
        String zsetKey =  SnapshotPathEnum.REDIS_INCR_CURRENT_SHARD_KEY.format(datasetName, shardId);

        // 3. 写入 ZSET(score = offset)
        Boolean added = redisTemplate.opsForZSet().add(zsetKey, data, (double) offset);
        if (Boolean.TRUE.equals(added)) {
            redisTemplate.expire(zsetKey, ttl);
        }

        // 4. 写入时间索引
        long incrTimeSecond = executionConfig.getIncrIntervalTimeSecond(dateTime, 1);
        String timePeriodKey =  SnapshotPathEnum.REDIS_INCR_TIME_PERIOD_KEY.format(datasetName);
        String timeOffset =  SnapshotPathEnum.REDIS_INCR_TIME_OFFSET_KEY.format(datasetName, String.valueOf(incrTimeSecond));
        redisTemplate.opsForValue().set(timeOffset, offset.toString(), ttl);
        redisTemplate.opsForZSet().add(timePeriodKey, String.valueOf(incrTimeSecond), (double) incrTimeSecond);
        Long currentSize = redisTemplate.opsForZSet().size(timePeriodKey);
        if (currentSize != null && currentSize > maxMembersSize) {
            long removeCount = currentSize - maxMembersSize;
            redisTemplate.opsForZSet().removeRange(timePeriodKey, 0, removeCount - 1);
        }
        redisTemplate.expire(timePeriodKey, ttl);

        Integer dbSaveOffsetInterval = incrementalConfig.getDbSaveOffsetInterval();
        if (offset % dbSaveOffsetInterval == 0) {
            // 6. 将 offset 写入数据库
        }

    }

    /**
     * 从数据库恢复
     *
     * @param datasetName datasetName
     * @param offsetKey   offsetKey
     * @return {@link Long}
     */
    private long restoreOffsetFromDb(String datasetName, String offsetKey) {
        String lockKey = "lock:offset:init:" + datasetName;
        Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", Duration.ofSeconds(10));
        if (Boolean.FALSE.equals(locked)) {
            throw new RuntimeException("Interrupted while waiting for offset init");
        }
        try {
            // 从 DB 读取最新 offset
            // Long dbOffset = offsetRepository.getCurrentOffsetFromDb(datasetName);
            // long nextOffset = dbOffset + 1;
            // redisTemplate.opsForValue().set(offsetKey, String.valueOf(nextOffset));
            // return nextOffset;
        } finally {
            redisTemplate.delete(lockKey);
        }
        return 1;
    }

}

1)获取全局唯一 offset

💡 若 Redis 初次启动或 key 不存在,INCR 返回 1;若失败(如集群故障),则尝试从 DB 恢复(见容错机制)。

String offsetKey = "user_tag:incr:latestOffset";
Long offset = redisTemplate.opsForValue().increment(offsetKey);
  • 使用 RedisINCR 原子操作生成递增 offset。
  • 作用:作为每条增量数据的“唯一 ID”和排序依据(ZSETscore)。

2)容灾兜底 —— 从 DB 恢复 offset(可选)

设计意图:保证系统在 Redis 不可用时仍能安全恢复,避免 offset 重置导致数据重复/丢失

  • Redis 异常(如集群故障、key 被误删),尝试从数据库恢复最新 offset
  • 使用 Redis 分布式锁 防止并发初始化冲突
if (offset == null || offset <= 0) {
    offset = restoreOffsetFromDb(datasetName, offsetKey);
}

3)计算分片 ID 并写入 ZSet

long shardId = (offset - 1) / maxMembersSize;
String zsetKey = REDIS_INCR_CURRENT_SHARD_KEY.format(datasetName, shardId);
  • 分片策略:每 maxMembersSize 条数据一个 ZSet(如 10 万条/分片)
  • Score = offset:保证按写入顺序排序,便于下游按 offset 范围拉取
  • 自动过期:每个分片独立设置 TTL,避免内存无限增长

4)写入 ZSET(按 offset 排序)

redisTemplate.opsForZSet().add(zsetKey, data, (double) offset);
redisTemplate.expire(zsetKey, ttlSeconds); // 如 86400s = 24h
  • ZSET 结构:
    • member = 序列化后的业务数据(如 JSON 字符串)
    • score = offset(保证全局有序)
  • 设置 TTL 自动过期,避免无限堆积。

5)建立时间索引(Time-to-Offset 映射)

long incrTimeSecond = executionConfig.getIncrIntervalTimeSecond(dateTime, 1); // 按秒对齐
String timeOffsetKey = "user_tag:incr:timeOffset:" + incrTimeSecond;
redisTemplate.opsForValue().set(timeOffsetKey, String.valueOf(offset), ttl);
  • 作用:支持“按时间查询该时刻之后的所有增量”。
  • 例如:想知道今天 10:00 之后有哪些变更?先查 timeOffset:1730985600 得到 offset=12345,再从 offset=12345 开始拉取。

5、代码实现

1)普通 redis (并发风险)

package com.healerjean.proj.hotcache.incr;


import com.healerjean.proj.hotcache.config.DatasetIncrementalConfig;
import com.healerjean.proj.hotcache.config.IncrementalExecutionConfig;
import com.healerjean.proj.hotcache.config.SnapshotGlobalConfig;
import com.healerjean.proj.hotcache.enums.SnapshotPathEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.Duration;
import java.time.LocalDateTime;

/**
 * 消息队列消费者 - 处理增量数据写入
 *
 * @author zhangyujin
 * @date 2025/11/3
 */
@Slf4j
@Component
public class IncrementalWriteService {


    /***
     * snapshotGlobalConfig
     */
    @Resource
    private SnapshotGlobalConfig snapshotGlobalConfig;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 增量任务写入
     *
     * @param datasetName datasetName
     */
    public void write(String data, String datasetName) {
        IncrementalExecutionConfig executionConfig = snapshotGlobalConfig.instanceIncrementalConfig(datasetName);
        DatasetIncrementalConfig incrementalConfig = executionConfig.getIncrementalConfig();
        Long ttlSeconds = incrementalConfig.getTtlSeconds();
        LocalDateTime dateTime = LocalDateTime.now();
        Duration ttl = Duration.ofSeconds(ttlSeconds);

        // 1. 获取全局唯一 offset
        String offsetKey = SnapshotPathEnum.REDIS_INCR_LATEST_OFFSET_KEY.format(datasetName);
        Long offset = redisTemplate.opsForValue().increment(offsetKey);
        // 2. 如果 Redis 返回 null(如 key 不存在且 incr 失败),从 DB 恢复
        if (offset == null || offset <= 0) {
            offset = restoreOffsetFromDb(datasetName, offsetKey);
        }else {
            // 6. todo 将 offset 写入数据库
        }

        // 2. 根据 offset 计算分片 ID(每 MAX_MEMBERS_PER_ZSET 一个分片)
        Integer maxMembersSize = incrementalConfig.getMaxMembersSize();
        long shardId = (offset - 1) / maxMembersSize;
        String zsetKey = SnapshotPathEnum.REDIS_INCR_CURRENT_SHARD_KEY.format(datasetName, shardId);

        // 3. 写入 ZSET(score = offset)
        Boolean added = redisTemplate.opsForZSet().add(zsetKey, data, (double) offset);
        if (Boolean.TRUE.equals(added)) {
            redisTemplate.expire(zsetKey, ttl);
        }

        // 4. 写入时间索引
        long incrTimeSecond = executionConfig.getIncrIntervalTimeSecond(dateTime, 1);
        String timePeriodKey = SnapshotPathEnum.REDIS_INCR_TIME_PERIOD_KEY.format(datasetName);
        String timeOffset = SnapshotPathEnum.REDIS_INCR_TIME_OFFSET_KEY.format(datasetName, String.valueOf(incrTimeSecond));
        redisTemplate.opsForValue().set(timeOffset, offset.toString(), ttl);
        redisTemplate.opsForZSet().add(timePeriodKey, String.valueOf(incrTimeSecond), (double) incrTimeSecond);
        Long currentSize = redisTemplate.opsForZSet().size(timePeriodKey);
        if (currentSize != null && currentSize > maxMembersSize) {
            long removeCount = currentSize - maxMembersSize;
            redisTemplate.opsForZSet().removeRange(timePeriodKey, 0, removeCount - 1);
        }
        redisTemplate.expire(timePeriodKey, ttl);

        // 6. todo 将 offset 写入数据库

    }

    /**
     * 从数据库恢复
     *
     * @param datasetName datasetName
     * @param offsetKey   offsetKey
     * @return {@link Long}
     */
    private long restoreOffsetFromDb(String datasetName, String offsetKey) {
        String lockKey = "lock:offset:init:" + datasetName;
        Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", Duration.ofSeconds(10));
        if (Boolean.FALSE.equals(locked)) {
            throw new RuntimeException("Interrupted while waiting for offset init");
        }
        try {
            // 从 DB 读取最新 offset
            // Long dbOffset = offsetRepository.getCurrentOffsetFromDb(datasetName);
            // long nextOffset = dbOffset + 1;
            // redisTemplate.opsForValue().set(offsetKey, String.valueOf(nextOffset));
            // return nextOffset;
        } finally {
            redisTemplate.delete(lockKey);
        }
        return 1;
    }


}

2)LUA-redis

package com.healerjean.proj.hotcache.incr;


import com.healerjean.proj.hotcache.config.DatasetIncrementalConfig;
import com.healerjean.proj.hotcache.config.IncrementalExecutionConfig;
import com.healerjean.proj.hotcache.config.SnapshotGlobalConfig;
import com.healerjean.proj.hotcache.enums.SnapshotPathEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;

/**
 * 消息队列消费者 - 处理增量数据写入
 *
 * @author zhangyujin
 * @date 2025/11/3
 */
@Slf4j
@Component
public class IncrementalLuaWriteService {


    private final static RedisScript<String> WRITE_INCREMENTAL_SCRIPT = RedisScript.of(
            "redis.call('ZADD', KEYS[1], ARGV[2], ARGV[1]);" +
                    "redis.call('EXPIRE', KEYS[1], ARGV[3]);" +
                    "redis.call('SET', KEYS[2], ARGV[2], 'EX', ARGV[3]);" +
                    "redis.call('ZADD', KEYS[3], ARGV[4], ARGV[4]);" +
                    "redis.call('EXPIRE', KEYS[3], ARGV[3]);" +
                    "local maxMembers = tonumber(ARGV[5]);" +
                    "local currentSize = redis.call('ZCARD', KEYS[3]);" +
                    "if currentSize > maxMembers then " +
                    "  local removeCount = currentSize - maxMembers; " +
                    "  redis.call('ZREMRANGEBYRANK', KEYS[3], 0, removeCount - 1); " +
                    "end; " +
                    "return 1;",
            Long.class
    );

    /***
     * snapshotGlobalConfig
     */
    @Resource
    private SnapshotGlobalConfig snapshotGlobalConfig;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 增量任务写入
     *
     * @param datasetName datasetName
     */
    public void write(String data, String datasetName) {
        IncrementalExecutionConfig executionConfig = snapshotGlobalConfig.instanceIncrementalConfig(datasetName);
        DatasetIncrementalConfig incrementalConfig = executionConfig.getIncrementalConfig();
        Long ttlSeconds = incrementalConfig.getTtlSeconds();
        Integer maxMembersSize = incrementalConfig.getMaxMembersSize();
        LocalDateTime now = LocalDateTime.now();

        // 1. 获取全局唯一 offset
        String offsetKey = SnapshotPathEnum.REDIS_INCR_LATEST_OFFSET_KEY.format(datasetName);
        Long offset = redisTemplate.opsForValue().increment(offsetKey);
        if (offset == null || offset <= 0) {
            offset = restoreOffsetFromDb(datasetName, offsetKey);
            // 注意:restore 后建议 SET offsetKey offset+1 避免重复,此处简化
        }else {
            // 6. todo 将 offset 写入数据库
        }

        // 2. 计算 shardId 和所有 keys
        long shardId = (offset - 1) / maxMembersSize;
        String shardKey = SnapshotPathEnum.REDIS_INCR_CURRENT_SHARD_KEY.format(datasetName, shardId);
        long incrTimeSecond = executionConfig.getIncrIntervalTimeSecond(now, 1);
        String timeOffsetKey = SnapshotPathEnum.REDIS_INCR_TIME_OFFSET_KEY.format(datasetName, String.valueOf(incrTimeSecond));
        String timePeriodKey = SnapshotPathEnum.REDIS_INCR_TIME_PERIOD_KEY.format(datasetName);

        // 3. 准备 Lua 参数
        List<String> keys = Arrays.asList(shardKey, timeOffsetKey, timePeriodKey);
        List<String> args = Arrays.asList(
                data,
                String.valueOf(offset),
                String.valueOf(ttlSeconds),
                String.valueOf(incrTimeSecond),
                String.valueOf(maxMembersSize)
        );

        // 4. 执行 Lua 脚本
        redisTemplate.execute(WRITE_INCREMENTAL_SCRIPT, keys, args);
    }

    /**
     * 从数据库恢复
     *
     * @param datasetName datasetName
     * @param offsetKey   offsetKey
     * @return {@link Long}
     */
    private long restoreOffsetFromDb(String datasetName, String offsetKey) {
        String lockKey = "lock:offset:init:" + datasetName;
        Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", Duration.ofSeconds(10));
        if (Boolean.FALSE.equals(locked)) {
            throw new RuntimeException("Interrupted while waiting for offset init");
        }
        try {
            // 从 DB 读取最新 offset
            // Long dbOffset = offsetRepository.getCurrentOffsetFromDb(datasetName);
            // long nextOffset = dbOffset + 1;
            // redisTemplate.opsForValue().set(offsetKey, String.valueOf(nextOffset));
            // return nextOffset;
        } finally {
            redisTemplate.delete(lockKey);
        }
        return 1;
    }


}

6、FAQ

1)容错与恢复机制

a、Redis offset 丢失怎么办?

  • 通过分布式锁 lock:offset:init:user_tag 防止并发初始化。
  • 从数据库读取最新 offset+1 后写回 Redis。注意上锁,防止并发
  • 注意:当前代码中 DB 恢复逻辑被注释,需实现 offsetRepository.getCurrentOffsetFromDb()

b、数据重复 or 丢失?

  • 不保证 exactly-once,但可通过 offset 幂等处理(消费端去重)。
  • 建议:业务数据自带版本号或时间戳,便于去重。

三、全量加载方案

1、整体目标

  • 启动时加载最新全量快照(从本地或 OSS 下载并反序列化)。
  • 读取 Redis 中的元数据(包括 incrPullTimeLowerBound 和当前 offset)。
  • 增量拉取:
    • Redis ZSET 拉取增量数据(基于时间区间 + offset)。
    • 对内存中的全量数据做 原地更新(add / delete / modify
  • 定时任务(每5秒):
    • 根据本地记录的 offset,持续拉取新写入的增量数据。
    • 更新本地 offset

2、全量加载

1)定时-加载方案

Redis + 轮询(简单,适合容忍延迟)

  • 客户端随机延迟(最简单有效):每台机器收到“新版本可用”后,不是立即下载,而是随机等待 0 ~ T 秒再开始。

  • 单机限流:刷新期间进行限流,比如多分片同时下载等

while (true) {
    Long latest = redis.get("latest_snapshot_version");
    if (latest > currentVersion) {
        downloadFromCDN(latest);
        break;
    }
    Thread.sleep(5000); // 每 5 秒查一次
}

2)代码实现

package com.healerjean.proj.hotcache.service.pull;

import com.alibaba.fastjson.JSON;
import com.google.gson.Gson;
import com.healerjean.proj.hotcache.config.DatasetSnapshotConfig;
import com.healerjean.proj.hotcache.config.SnapshotGlobalConfig;
import com.healerjean.proj.hotcache.enums.SnapshotPathEnum;
import com.healerjean.proj.hotcache.factory.SnapshotFactory;
import com.healerjean.proj.hotcache.model.SnapshotMetadata;
import com.healerjean.proj.hotcache.model.UserTag;
import com.healerjean.proj.hotcache.service.cache.InMemoryUserTagCache;
import com.healerjean.proj.hotcache.service.serialization.DataSerializerStrategy;
import com.healerjean.proj.hotcache.service.storage.StorageServiceStrategy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;

/**
 * FullLoadToMemoryService
 *
 * @author zhangyujin
 * @date 2025/11/6
 */
@Service
@Slf4j
public class FullLoadToMemoryService {

    @Autowired
    private SnapshotGlobalConfig snapshotGlobalConfig;

    @Autowired
    private InMemoryUserTagCache userTagCache;

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    public void loadLatestFullSnapshotToMemory(String datasetName) throws Exception {
        // 1. 从 Redis 获取最新版本
        String latestKey = SnapshotPathEnum.REDIS_SNAPSHOT_LATEST_VERSION_KEY.format(datasetName);
        String latestVersion = redisTemplate.opsForValue().get(latestKey);
        if (latestVersion == null) {
            throw new IllegalStateException("No full snapshot found in Redis");
        }

        // 2. 获取元数据(含 manifestUrl)
        String metaKey = SnapshotPathEnum.REDIS_SNAPSHOT_VERSION_META_KEY.format(datasetName, latestVersion);
        String metaJson = redisTemplate.opsForValue().get(metaKey);
        SnapshotMetadata metadata = new Gson().fromJson(metaJson, SnapshotMetadata.class);

        // 3. 根据 storageStrategy 下载所有分片文件并反序列化
        DatasetSnapshotConfig snapshotConfig = metadata.getConfig();
        StorageServiceStrategy storage = SnapshotFactory.getInstance().getStorageServiceStrategy(datasetName, snapshotConfig.getStorageStrategy());
        DataSerializerStrategy dataSerializer = SnapshotFactory.getInstance().getDataSerializer(datasetName, snapshotConfig.getSerializerStrategy());
        for (SnapshotMetadata.FileInfo fileInfo : metadata.getFileInfoMap().values()) {
            try (InputStream is = storage.download(fileInfo.getFilename())) {
                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
                String line;
                while ((line = reader.readLine()) != null) {
                    if (!line.trim().isEmpty()) {
                        try {
                            UserTag userTag = (UserTag)dataSerializer.deserialize(line.getBytes(StandardCharsets.UTF_8));
                            userTagCache.put(userTag.getUserId(), userTag);
                        }catch (Exception e){
                            log.error("line:{}, fileInfo:{}", line, JSON.toJSONString(fileInfo), e);
                            throw new RuntimeException(e.getMessage(), e);
                        }
                    }
                }
            }
        }
        log.info("✅ 全量加载完成,共 {} 条记录", userTagCache.getAll().size());
    }
}

四、增量加载方案

1、背景与目标

在高并发、低延迟的用户标签系统中,内存缓存(如 InMemoryUserTagCache)是提升查询性能的关键。但如何高效、可靠地将 Redis 中的增量数据同步到内存,是一个挑战。

核心目标:从 Redis 中按偏移量(offset)或时间下界(time lower bound)拉取增量数据,并持续加载到本地内存缓存中,实现“热更新”而不中断服务。

2、核心组件概览

1)方法提供

该服务提供两个主要入口方法:

方法 功能
pullIncrementalByOffset(String datasetName) 基于偏移量offset)持续拉取增量数据
pullIncrementalByLowerBound(String datasetName) 基于时间下界定位起始 offset,再调用 pullIncrementalByOffset

2)Redis 数据结构依赖

Key 类型 枚举 用途
String REDIS_SNAPSHOT_LATEST_VERSION_KEY 存储最新快照版本号
String REDIS_SNAPSHOT_VERSION_META_KEY 存储快照元数据(含 incrPullTimeLowerBound
ZSet REDIS_INCR_TIME_PERIOD_KEY 时间戳 → 用于查找起始时间
String REDIS_INCR_TIME_OFFSET_KEY 时间戳 → offset 映射
ZSet REDIS_INCR_CURRENT_SHARD_KEY 分片增量数据(score=offset, value=事件内容)

3)关键设计

a、数据分片(Sharding)机制

  • 增量数据存储在 RedisZSET 中,按 score = offset 排序。
  • 由于 ZSET 有性能上限(如成员数限制),系统采用分片策略:
    • 每个分片最多容纳 maxMembersSize 条记录。
    • 分片 ID 计算公式: shardId = (offset - 1) / maxMembersSize
    • 对应 Redis KeyREDIS_INCR_CURRENT_SHARD_KEY.format(datasetName, shardId)

b、本地偏移量追踪

  • 每个 datasetName 维护一个原子偏移量,表示已处理的最大 offset
  • 初始值由 pullIncrementalByLowerBound 根据时间戳反查得到。
private final static Map<String, AtomicLong> LOCAL_PROCESSED_OFFSET_MAP = new ConcurrentHashMap<>();

3)代码实现

a、通过时间拉取

/**
 * 时间驱动的起始点定位
 */
public void pullIncrementalByLowerBound(String datasetName) {
    // 1、从 Redis 获取最新快照版本(latestVersion)
    String latestKey = SnapshotPathEnum.REDIS_SNAPSHOT_LATEST_VERSION_KEY.format(datasetName);
    String latestVersion = redisTemplate.opsForValue().get(latestKey);
    if (latestVersion == null) {
        throw new IllegalStateException("No full snapshot found in Redis");
    }

    // 2. 解析其元数据 SnapshotMetadata,获取 incrPullTimeLowerBound(时间戳)
    String metaKey = SnapshotPathEnum.REDIS_SNAPSHOT_VERSION_META_KEY.format(datasetName, latestVersion);
    String metaJson = redisTemplate.opsForValue().get(metaKey);
    SnapshotMetadata metadata = new Gson().fromJson(metaJson, SnapshotMetadata.class);
    long incrPullTimeLowerBound = metadata.getIncrPullTimeLowerBound();
    String timePeriodKey = SnapshotPathEnum.REDIS_INCR_TIME_PERIOD_KEY.format(datasetName);

    // 3、 查找 >= 时间下界的第一个时间点
    Set<String> candidates = redisTemplate.opsForZSet().rangeByScore(timePeriodKey, (double) incrPullTimeLowerBound, Double.MAX_VALUE, 0, 1);
    if (CollectionUtils.isEmpty(candidates)) {
        LOCAL_PROCESSED_OFFSET_MAP.put(datasetName, new AtomicLong(0));
        log.info("No time period found >= {} for dataset {}", incrPullTimeLowerBound, datasetName);
        return;
    }

    // 4、通过 REDIS_INCR_TIME_OFFSET_KEY 反查对应的 offset
    String firstTimeStr = candidates.iterator().next();
    String offsetKey = SnapshotPathEnum.REDIS_INCR_TIME_OFFSET_KEY.format(datasetName, firstTimeStr);
    String offsetValue = redisTemplate.opsForValue().get(offsetKey);
    long latestOffset = Long.parseLong(offsetValue);

    // 5、初始化 LOCAL_PROCESSED_OFFSET_MAP 并启动 pullIncrementalByOffset
    LOCAL_PROCESSED_OFFSET_MAP.put(datasetName, new AtomicLong(latestOffset));
    pullIncrementalByOffset(datasetName);
}

b、通过游标拉取

@Autowired
private SnapshotGlobalConfig snapshotGlobalConfig;

@Autowired
private InMemoryUserTagCache userTagCache;

@Resource
private RedisTemplate<String, String> redisTemplate;

private static final long PULL_BATCH_SIZE = 300L;

/**
 * 本地已处理的偏移量(必须先通过 pullIncrementalByLowerBound 初始化)
 */
private final static Map<String, AtomicLong> LOCAL_PROCESSED_OFFSET_MAP = new ConcurrentHashMap<>();


/**
 * 增量拉取流程
 *
 * @param datasetName datasetName
 */
public void pullIncrementalByOffset(String datasetName) {
    if (!LOCAL_PROCESSED_OFFSET_MAP.containsKey(datasetName)) {
        log.warn("Offset not initialized for dataset: {}. Skipping pull. Call pullIncrementalByLowerBound first.", datasetName);
        return;
    }
    IncrementalExecutionConfig executionConfig = snapshotGlobalConfig.instanceIncrementalConfig(datasetName);
    DatasetIncrementalConfig incrementalConfig = executionConfig.getIncrementalConfig();
    Integer maxMembersSize = incrementalConfig.getMaxMembersSize();
    AtomicLong atomicCurrentOffset = LOCAL_PROCESSED_OFFSET_MAP.get(datasetName);

    while (true) {
        // 1、计算当前应读取的 shardId,
        long currentOffset = atomicCurrentOffset.get();
        String latestKey = SnapshotPathEnum.REDIS_INCR_LATEST_OFFSET_KEY.format(datasetName);
        String latestOffsetStr = redisTemplate.opsForValue().get(latestKey);
        long latestOffset = Long.parseLong(latestOffsetStr);
        if (currentOffset >= latestOffset) {
            // 当前 shard 已追上最新 offset → 结束
            log.debug("Dataset {} caught up to latest offset {}, sleeping...", datasetName, latestOffset);
            break;
        }

        // 2、从 Redis ZSET 拉取一批数据
        long shardId = currentOffset / maxMembersSize;
        String shardKey = SnapshotPathEnum.REDIS_INCR_CURRENT_SHARD_KEY.format(datasetName, shardId);
        long nextOffset = currentOffset + 1;
        double maxScore = Math.min(nextOffset + PULL_BATCH_SIZE - 1, latestOffset);
        Set<ZSetOperations.TypedTuple<String>> tuples = redisTemplate.opsForZSet().rangeByScoreWithScores(shardKey, (double) nextOffset, maxScore);

        // 3、防止拉取数据为空,且当前 offset 未更新,则继续拉取
        if (CollectionUtils.isEmpty(tuples)) {
            // 当前 shard 无数据,尝试跳转到下一个 shard 起始位置
            long nextShardStart = (shardId + 1) * maxMembersSize;
            long jumpTo = Math.min(nextShardStart, latestOffset);
            // 如果跳转幅度很大(比如超过一个 batch),先小步试探
            if (jumpTo - currentOffset > PULL_BATCH_SIZE * 2) {
                // 只前进一小步,避免跨度过大跳过异常数据
                jumpTo = currentOffset + 1;
            }

            if (jumpTo > currentOffset) {
                atomicCurrentOffset.set(jumpTo);
                log.debug("Advanced offset to {} for dataset {}", jumpTo, datasetName);
                continue;
            }
            // 已到最新,但仍无数据 → 可能数据延迟,稍后重试
            log.debug("No data found in shard {} for dataset {}, but not caught up yet.", shardId, datasetName);
            break;
        }

        // 4、批量处理 & 更新偏移量
        for (ZSetOperations.TypedTuple<String> data : tuples) {
            Double score = data.getScore();
            String value = data.getValue();
        }

        // 5、更新本地 offset 为本次拉取的最大 score
        // 更新 offset 为本次最大 score(注意:不再 +1)
        long maxProcessedOffset = tuples.stream()
                .mapToLong(t -> Objects.requireNonNull(t.getScore()).longValue())
                .max()
                .orElse(currentOffset);

        atomicCurrentOffset.set(maxProcessedOffset);
        log.debug("Pulled {} records for dataset {}, offset updated to {}",
                tuples.size(), datasetName, maxProcessedOffset);
    }
}

五、数据验证方案

1、可观测性验证

1)关键验证清单

验证类别 关键问题 验证手段
数据完整 全量是否丢数据? 行数对比 + 抽样校验
原子发布 是否读到半成品? 中断测试 + 文件检查
容错恢复 Redis 挂了能否恢复? 清空 offset + 重启
性能达标 能否支撑5000台? 压测 + CDN 并发测试
自动清理 存储是否会爆炸? 模拟多版本 + 检查清理

2)必须具备的监控项:

类别 监控项 验证方式
全量 快照生成耗时、文件大小、recordCount 查看日志或 Prometheus 指标
增量 当前 offset、每秒处理量、分片数量 Redis 监控 + 应用指标
存储 OSS/S3 文件数量、总大小 存储平台控制台
一致性 各节点缓存差异率 日志采样比对
错误 序列化失败、网络超时、DB 连接异常 告警日志

2、数据准确性验证

1)数据完整性验证

  • 目标确认全量快照包含所有应有记录,无遗漏。
  • 方法:
    • 对比源数据库(如 MySQL)中 user_tag 表的总行数 vs 快照元数据中的 recordCount
    • 抽样校验:随机选取若干 user_id,检查其在快照文件和数据库中的字段是否一致(如 tag_value, update_time)。
    • 检查分片总数 × 平均每片记录数 ≈ 总记录数(避免某分片丢失

2)文件原子性与一致性验证

  • 目标确保客户端不会读到“半成品”快照
  • 方法:
    • 在快照生成过程中 kill 进程,重启后检查:
      • 是否存在 .tmp 文件(说明未完成发布)。
      • 确保Redis 中的 latest 版本是否仍为旧版本(未错误更新)。
    • 发布完成后,检查所有分片文件是否存在且命名规范(无 .tmp 后缀)。

3)压缩与序列化正确性验证

  • 目标Protobuf + GZIP 能正确反序列化。
  • 方法:
    • 手动下载一个 .pb.gz 文件,用工具(如 protoc --decode 或自研解析脚本)解压并解析。
    • 验证解析出的数据结构与原始记录一致(字段名、类型、值)。

4)5000 台机器数据一致性验证

  • 目标:所有节点最终状态基本一致(秒级延迟内)。
  • 方法:
    • 在一台机器上查询某个 user_id 的标签。
    • 同时在其他多台机器上查询同一 user_id
    • 汇总数据定时上报,通过日志采集 + 聚合分析实现大规模比对。

3、性能与稳定性验证

1)全量导出性能验证

  • 指标:
    • 导出耗时 ≤ 10分钟(1000万条)。
    • 内存占用 < 200MB(流式处理)。
    • CPU 使用率平稳,无长时间 GC。
  • 方法:
    • 在生产类似环境压测,监控资源使用。
    • 检查日志中是否有 OOM、超时、连接池耗尽等错误。

2)增量吞吐验证

  • 指标:
    • 支持 ≥ 1万条/秒的增量写入。
    • Redis ZSET 单分片 ≤ 4000 条,避免大 Key
  • 方法:
    • 使用压测工具模拟高并发增量事件。
    • 监控 Redis 内存、CPU、网络带宽。
    • 检查 LOCAL_PROCESSED_OFFSET_MAP 是否持续增长(无卡顿)。

3)CDN 下载并发验证

  • 目标5000 台机器能快速下载快照。
  • 方法:
    • 模拟 1000~5000 台客户端并发从 CDN 下载 .pb.gz 文件。
    • 测量平均下载时间、失败率。
    • 确认 CDN 缓存命中率 > 99%(首台回源,其余命中边缘节点)。

4)CPU 影响

“少分配、慢处理、错峰干、分隔离”,内存服务的性能瓶颈往往不在计算,而在 对象创建过多 → 触发频繁 GC → STW(Stop-The-World)卡顿

目标 具体怎么做
GC 流式处理 + 避免中间对象 + 用原生数组 + 对象池(谨慎)
CPU 错峰全量 + 限流增量 + 分批拉取(每批 ≤1000 条)
优内存 扁平结构 + 不可变全量快照 + 避免 HashMap 嵌套
JVM 固定堆 + G1/ZGC + 监控 GC 日志
做隔离 拉取和服务分离进程,或用 cgroupCPU

a、控制拉取节奏 → 避免 CPU 突刺

全量 + 多源增量同时拉,CPU 瞬间打满,GC 跟着暴增。

  • 错峰调度全量任务
    • 不要所有数据源同时开始全量;
    • 通过配置中心或 cron 控制启动时间。
  • 限制增量消费速率
    • 给每个数据源的增量拉取设置 最大吞吐上限(如 5000 条/秒);
    • 超过就 sleep 或丢弃背压(根据业务容忍度);
  • 流式处理,禁止“全载入再处理”
    • 全量导出时,边读边解析边更新内存,不要先把 1000 万条全 load 到 List 里;
    • cursor / scroll API 分批拉(每批 1000 条),处理完一批再拉下一批。

b、内存结构优化 → 减少 GC Roots

  • 使用紧凑内存布局

    • Map<Long, String>HashMap 节点对象多,GC 压力大)。

    • 例如:把 user_id → tag 映射,用两个平行数组实现(类似列式存储):

      user_ids: [1001, 1002, 1003, ...]  
      tags:     ["VIP", "NORMAL", "TEST", ...]
      
      Map<Long, String>(HashMap 节点对象多,GC 压力大)
      
  • 避免嵌套对象和长链引用

    • 对象图越扁平,GC 扫描越快;
    • 不要用 User -> Profile -> Settings -> Map<...> 这种深结构。
  • 定期“冻结”全量数据

    • 全量加载完成后,把数据结构标记为 不可变(Immutable
    • 后续增量只更新增量缓存,查询时 merge
    • 不可变对象更容易被 JVM 优化,且不会在老年代频繁修改。

ContactAuthor