前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

一、分页查询优化

问题 SQL

解释:表中的字段越多下面的优化越明显,否则即使使用了下面的优化,也可能没有那么明显

通过下面的可以观察到 当达到1000万的时候,查询时间到了37s,太可怕了

select * from tb_ams_inf_repay_stat limit 0,10 ; 
#  0.003s

select * from tb_ams_inf_repay_stat  limit 10000,10 ;  
# 1 0.023s

select * from tb_ams_inf_repay_stat  limit 100000,10 ;
# 10 0.191s

select * from tb_ams_inf_repay_stat limit 1000000,10 ;
# 100 1.942s

select * from tb_ams_inf_repay_stat limit 10000000,10 ;
# 1000 37.323s

1、简单优化

1)子查询

0.23s 简直要飞起来了

1、先使用覆盖索引 index 查询 ,我们只查询 id 索引这一个字段,比 select * 或者多个字段快多了,因为只要我们写上这些字段,我们只需要 10个,但是从第一条开始到 1000万条其实是都要去扫描的

2、然后再进行索引范围内 range查询

0.23s 
select *
from tb_ams_inf_repay_stat
where id > (select id from tb_ams_inf_repay_stat limit 1000000, 1)
limit 0,10 ;


--推荐使用 
select * from table where id > 243800 order by id limit 10;
idselect_typetabletypepossible_keyskeykey_lenrefrowsExtra
1PRIMARYtb_ams_inf_repay_statrangePRIMARYPRIMARY8NULL3258410Using where
2SUBQUERYtb_ams_inf_repay_statindexNULLidx_orgcd_loannum216NULL19753500Using index

2)Join 连接

-- 0.31 

SELECT *
FROM tb_ams_inf_repay_stat a
       JOIN (select id from tb_ams_inf_repay_stat limit 1000010, 10) b ON a.ID = b.id
idselect_typetabletypepossible_keyskeykey_lenrefrowsExtra
1PRIMARY<derived2>ALLNULLNULLNULLNULL1000020NULL
1PRIMARYaeq_refPRIMARYPRIMARY8b.id1NULL
2DERIVEDtb_ams_inf_repay_statindexNULLidx_orgcd_loannum216NULL19753500Using index

2、其他优化

1)带有条件的,id 连续的查询(between

0.03s 
select * from tb_ams_inf_repay_stat  where id  between 1000000 and 1000010  	 ;

2)带有条件,id 不连续的查询,考虑建立索引

20s 慢死了
select * from tb_ams_inf_repay_stat  	where org_cd = 'xmsd'  	limit 1000000,10 ;
-- org_cd建立索引

select *
from tb_ams_inf_repay_stat
where org_cd = 'xmsd'
  and id > (select id from tb_ams_inf_repay_stat where org_cd = 'xmsd' limit 1000000,1)
limit 0,10 ;

0.2s 可以说相当的快了 
idselect_typetabletypepossible_keyskeykey_lenrefrowsExtra
1PRIMARYNULLNULLNULLNULLNULLNULLNULL~~~~
2SUBQUERYtb_ams_inf_repay_statrefidx_orgcd_loannumidx_orgcd_loannum93const1Using where; Using index

二、千万测试数据生成

1、存储过程

1)表

create table if not exists `user_demo`
(
    `id`          bigint unsigned not null auto_increment comment '主键',
    `name`        varchar(32)     not null default '' comment '姓名',
    `age`         int             not null default 0 comment '年龄',
    `phone`       varchar(32)     not null default '' comment '电话',
    `email`       varchar(64)     not null default '' comment '邮箱',
    `start_time`  datetime                 default null comment '开始时间',
    `end_time`    datetime                 default null comment '结束时间',
    `valid_flag`  int             not null default 1 comment '1有效 0 废弃',
    `create_time` datetime        not null default current_timestamp comment '创建时间',
    `update_time` datetime        not null default current_timestamp on update current_timestamp comment '更新时间',
    primary key (`id`)
) engine = innodb
  default charset = utf8;

2)存储过程

drop procedure if exists test_batch_create;
create procedure test_batch_create(in loop_counts int, in date varchar(50))
begin
    declare i int;
    set i = 0;
    set autocommit = 0; -- 关闭自动提交事务,提高插入效率
    while i < loop_counts
        do
            insert into `user_demo` (name, age, phone, email, start_time, end_time, valid_flag)
            values (concat('张', floor(rand() * 2 * i)), floor(rand() * i), floor(rand() * 3 * i),
                    concat( floor(rand() * 2 * i), '@gmail.com'), date_add(date ,interval i day), date_add(date ,interval i*2 day), 1);
            set i = i + 1;
        end while;
    commit;
end;

3)执行

CALL test_batch_create(10, '2023-07-01');

三、三种大数据量查询

1、三种大数据查询方式对比

对比维度 流式查询(Streaming LIMIT 分页 id > last_id LIMIT size
原理 数据库逐行返回,客户端边读边处理 跳过 offset 行,取 size 基于主键,从上次结束位置继续
性能(大数据量) 最优 差,offset 越大越慢) 好,但依赖索引)
内存占用(客户端) 低,逐条处理 中等,每页加载 中等,每页加载
数据库连接占用 高,需长时间保持 低,短连接 低,短连接
实现复杂度
是否支持随机跳页 不支持 支持(如第100页) 不支持(只能下一页
适用场景 导出、ETL、实时处理 前端分页展示 后台任务、消息队列拉取

2、三种查询介绍

1)流式查询

  • 流式查询 = 服务器端游标 + 客户端逐行读取
  • 优点:内存低、响应快(首条数据快)、快照固定
  • 缺点:连接长、不能跳页、实现复杂
@GetMapping(value = "selectList", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@ResponseBody
public ResponseBean selectList() {
    List<User> users = Lists.newArrayList();

    try(SqlSession sqlSession = sqlSessionFactory.openSession()){
        UserMapper mapper = sqlSession.getMapper(UserMapper.class);
        mapper.getStreamAll(new ResultHandler<User>() {
            private int batchCount = 0;
            private List<User> batch = new ArrayList<>(1000);
            @Override
            public void handleResult(ResultContext<? extends User> resultContext) {
                User user = resultContext.getResultObject();
                batch.add(user);
                batchCount++;

                // 每 1000 条处理一次(如批量入库、写文件)
                if (batchCount % 1000 == 0) {
                    processBatch(batch);
                    batch.clear();
                    sqlSession.commit();
                }
            }
            private void processBatch(List<User> batch) {
                // 例如:批量插入、导出文件、发送消息等
            }
        });
        sqlSession.commit();
    }

    log.info("用户管理--------selectLis:【{}】", JsonUtils.toJsonString(users));
    return ResponseBean.buildSuccess(users);
}

a、工作原理

  1. 建立连接后,发送 SQL 查询SELECT * FROM user
  2. JDBC 驱动设置流式模式
    • 调用 Statement.setFetchSize(Integer.MIN_VALUE)但每次读取的块大小由驱动决定
      • Integer.MIN_VALUEMySQL JDBC 驱动的“魔法值”,表示“请用流式”
      • 告诉数据库:“我要流式读取,不要一次性把所有结果加载到内存。
  3. 数据库开始返回数据块(Chunk
    • 数据库不会一条一条发,而是按网络包大小或驱动缓冲区(如 4KB、8KB)分批发送。
    • 例如:一次发 100 条,下一次再发 100 条……
  4. JDBC 驱动在本地维护一个小型缓冲区
    • 当缓冲区的数据被程序消费完,驱动会自动从网络读取下一批。
  5. MyBatis 将每条记录取出,回调 handleResult()
    • 对你来说,就是“一条一条”地处理。

b、是“一条一条”读取吗?

问题 回答
是“一条一条”读取吗? 从应用处理角度看:是的,每条记录都单独回调。
底层真的只传一条吗? 不是,是“小批量分块传输”,但对开发者透明。
好处是什么? 内存占用低,适合处理百万级数据。
风险? 连接长时间不释放,需谨慎管理资源。

c、百万数据流式查询大概耗时范围

场景 预估耗时 说明
理想情况(本地数据库、SSD、简单查询、无处理逻辑) 10 ~ 30 秒 数据库快,网络快,处理快
一般情况(内网、普通硬盘、字段较多) 30 秒 ~ 2 分钟 常见企业环境
复杂情况(远程数据库、慢查询、大字段、复杂处理) 2 ~ 10 分钟甚至更长 含 BLOB、JSON、计算等
问题 回答
百万数据流式查询耗时? 通常 30 秒 ~ 2 分钟
最快能到多少? 10+ 秒(理想环境)
最慢会多久? 10 分钟+(网络差、处理慢)

d、fetchSize 设置解释

Integer.MIN_VALUE 是“开关

设置 是否启用服务器端游标 是否流式 是否 OOM 风险
fetchSize=1000 ❌ 否(默认客户端游标) ❌ 不是真流式 ✅ 有(全量加载)
fetchSize=Integer.MIN_VALUE ✅ 是(服务器端游标) ✅ 真流式 ❌ 无(分批拉取)
  • fetchSize(1000)
  • 一次性把所有数据从服务器拉到客户端内存

  • 然后在本地“假装”是分批返回(每次返回 1000 条)

  • 但内存已经占满了

2)LIMIT 分页

  • LIMIT 1000000, 10 为什么慢

    • 数据库仍需扫描前 1,000,000 条记录,即使不返回。

    • 尤其是 ORDER BY 存在时,可能无法利用索引覆盖。

    • 这叫“偏移量陷阱(Offset Pagination Problem”。

  • 何时不要用?

    • offset > 10,000 时基本不可用

    • 高并发分页接口(如 /api/users?page=100

  • 何时可用?

    • 前端展示,页数少(如 ≤ 100)

    • 数据量小(如 < 10万)

3)id > last_id LIMIT size

优势:

  • 性能稳定:WHERE id > ? LIMIT ? 走主键索引,O(log n)
  • 无偏移量问题,翻页飞快
  • 适合“下一页”场景(如微博、消息流)

缺点:

  • 不能跳页(如直接跳第10页)
  • 依赖单调递增字段(如 id, create_time
  • 如果 id 有删除或不连续,可能导致“漏数据”或“重复”

3、常见问题答疑

1)普通查询 VS 流式

方式 普通查询 List<User> 流式查询 ResultHandler
内存使用 一次性加载所有数据到内存 每次只处理一条,内存占用小
适用场景 数据量小(几千、几万) 大数据量(几十万、上百万)
风险 可能 OOM(内存溢出) 避免 OOM
处理方式 先查完再处理 边查边处理(实时)

2)流式 vs IdSize

对比项 流式查询(Streaming) 自定义 ID 分页(WHERE id BETWEEN ? AND ?)
场景 批量导出、ETL API 分页、消息拉取
SQL 执行次数 1 次 N 次(如 100 万 / 1000 = 1000 次)
连接 长连接(风险) 短连接(安全)
内存占用 低(逐条处理) 低(每页处理完就释放)
数据库压力 低(一次连接,流式读) 高(频繁查询,索引扫描)
网络开销 低(一次连接,持续传输) 高(多次连接或长连接)
是否需要 ID 连续 不需要 需要(否则分页不准)
能否并行处理 不能(顺序流) 能(不同 ID 范围可多线程)
能否中断/恢复 难(流中断就失败) 能(记录最后 ID 即可)
首次响应时间 稍慢(建立流需要时间) 快(第一页很快返回)
总耗时 通常更短 通常更长(N 次查询 + 间隙)
实现复杂度 简单 稍复杂(需管理分页逻辑)

3)性能对比实验

查询方式 查询第 10 页(每页 100) 查询第 100,000 页 全量导出
LIMIT offset, size 50ms 12s 不可行
WHERE id > ? LIMIT ? 5ms 5ms 需自己循环
流式查询 不适用 不适用 8s (内存稳定)

4)选型建议

你要做什么? 选哪种?
Excel 导出,数据量 > 10万 流式查询
做后台管理分页,最多翻 100 页 LIMIT
App 信息流,无限下拉 Id > last_id
做定时任务,处理未同步数据 Id > last_max_id
做实时计算,监听数据变化 以上都不行 → 用 CDC(如 Debezium)

5)流式查询如何避免占满连接

场景:安全地流式查询 1000 万条订单数据,用于导出 Excel,不阻塞主业务连接池,不引发 OOM,支持中断后继续

  1. 专用数据源:为流式查询配置独立的连接池(如 HikariCP 单独配置)
  2. 限制并发:同一时间只允许 1~2 个流式任务(加锁)
  3. 超时控制:设置 queryTimeoutsocketTimeout
  4. 分批流式:每 1 万条 commit 一次,避免事务过长
  5. 异步处理:使用 @Async 或线程池处理流数据
问题 解决方案
占满主连接池 独立数据源,最大3连接
并发太多 加锁
连接太久 设置 max-lifetime + 分批 commit
事务过长 1万条 commit 一次
卡住不响应 设置 queryTimeout + connection-timeout
失败重来 Redis 记录 lastId,支持续传

a、专用数据源

application.yml

spring:
  datasource:
    # 主数据源(业务用)
    master:
      jdbc-url: jdbc:mysql://localhost:3306/app_db?useSSL=false&useUnicode=true
      username: root
      password: root
      hikari:
        maximum-pool-size: 20
        minimum-idle: 5

    # 专用数据源(流式查询用)
    streaming:
      jdbc-url: jdbc:mysql://localhost:3306/app_db?useSSL=false&useUnicode=true&defaultFetchSize=1000
      username: root
      password: root
      hikari:
        pool-name: StreamingHikariPool
        maximum-pool-size: 3          # 严格限制:最多3个流式任务
        minimum-idle: 0
        max-lifetime: 1800000         # 30分钟,避免长连接僵死
        idle-timeout: 600000          # 10分钟空闲超时
        connection-timeout: 30000     # 30秒获取连接超时

StreamingDataSourceConfig.java

关键参数说明:

  • maximum-pool-size: 3:最多 3 个流式任务并行
  • defaultFetchSize=1000JDBC 默认 fetch 大小,启用流式
  • max-lifetime:防止连接太久被防火墙断开
@Configuration
@ConditionalOnProperty(prefix = "spring.datasource.streaming", name = "jdbc-url")
public class StreamingDataSourceConfig {

    @Bean("streamingDataSource")
    @ConfigurationProperties("spring.datasource.streaming")
    public DataSource streamingDataSource() {
        return DataSourceBuilder.create().type(HikariDataSource.class).build();
    }

    @Bean("streamingSqlSessionFactory")
    public SqlSessionFactory sqlSessionFactory(@Qualifier("streamingDataSource") DataSource dataSource) 
    throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        // 可设置插件、typeHandler等
        return bean.getObject();
    }
}

b、Mapper 层启用流式查询

@Options(fetchSize = Integer.MIN_VALUE) 是 MyBatis 启用流式的关键!

@Mapper
public interface OrderMapper {

    @Select("SELECT id, user_id, amount, create_time FROM orders WHERE id > #{lastId} ORDER BY id")
    @Options(fetchSize = Integer.MIN_VALUE) // 启用服务器端游标
    void streamOrders(@Param("lastId") Long lastId, ResultHandler<Order> handler);
}

c、服务层实现分批处理 + 自动提交

  • 核心逻辑:
    • SqlSession 手动管理事务
    • 每处理 BATCH_SIZEcommit() 一次
    • lastId` 记录位置,可用于断点续传
  • 解决的四大问题
问题 后果 分批提交如何解决
1. 事务过长(Long-running Transaction) 锁表、回滚段膨胀、主从延迟 主动提交,拆分为多个短事务
2. 内存溢出(OOM) JVM 堆内存耗尽,服务崩溃 释放 MyBatis 一级缓存
3. 连接被长时间占用 连接池耗尽,影响其他业务 减少单次连接持有时间
4. 故障后需重头再来 任务失败,进度丢失 结合 lastId 可断点续传

问题1:事务过长有啥问题?最致命?

  • 极端情况:导出 1000 万条数据,事务持续 30 分钟 → 整个 orders 表几乎不可写!

  • MyBatis 默认开启事务(SqlSession 级别)
  • 如果你不 commit(),整个流式查询会在一个事务中执行到底
  • 数据库会:
    • 长时间持有锁(如 InnoDBnext-key lock
    • 不断增长回滚段(undo log
    • 阻塞其他 DML 操作
    • 导致主从复制延迟(binlog 写入延迟)

问题2:内存溢出(OOM)——MyBatis 隐藏陷阱

答案:很多人以为“流式查询 = 不占内存”,但 MyBatis 的一级缓存(Local Cache)会缓存所有已查询的对象引用!即使你用 ResultHandler 逐条处理MyBatis 仍会把每一条结果缓存在 SqlSession 内部1000 万条订单,每条 1KB → 至少占用 10GB 内存

@Service
public class OrderExportService {

    @Autowired
    @Qualifier("streamingSqlSessionFactory")
    private SqlSessionFactory streamingSqlSessionFactory;

    private static final int BATCH_SIZE = 10_000; // 每处理1万条 commit 一次

    public void exportAllOrders() {
        try (SqlSession sqlSession = streamingSqlSessionFactory.openSession(ExecutorType.SIMPLE)) {
            OrderMapper mapper = sqlSession.getMapper(OrderMapper.class);
            AtomicLong counter = new AtomicLong(0);
            AtomicLong lastId = new AtomicLong(0);

            ResultHandler<Order> handler = resultContext -> {
                Order order = resultContext.getResultObject();
                lastId.set(order.getId());

                // 业务处理:写入 Excel、发 MQ、写文件等
                processOrder(order);

                long count = counter.incrementAndGet();

                // 每 10000 条提交一次,避免事务过长
                if (count % BATCH_SIZE == 0) {
                    log.info("Committing batch at order id: {}, total: {}", lastId.get(), count);
                    sqlSession.commit();
                }
            };

            // 开始流式查询
            mapper.streamOrders(lastId.get(), handler);
            sqlSession.commit(); // 最后一批
            log.info("Export completed. Total records: {}", counter.get());

        } catch (Exception e) {
            log.error("Stream export failed", e);
            throw new RuntimeException("Export failed", e);
        }
    }

    private void processOrder(Order order) {
        // 模拟处理:写入文件、发MQ等
        // ExcelWriter.write(order);
        // rabbitTemplate.convertAndSend("export.queue", order);
    }
}

d、超时控制

  • JDBC 层超时(application.yml
spring:
  datasource:
    streaming:
      hikari:
        # 获取连接超时
        connection-timeout: 30000
        # SQL 执行超时(HikariCP 1.4+)
        leak-detection-threshold: 600000  # 10分钟未归还连接报警
  • MyBatis 层设置 queryTimeout
@Options(fetchSize = Integer.MIN_VALUE, timeout = 300) // 300秒超时
@Select("SELECT ...")
void streamOrders(...);

e Spring 层设置异步任务超时

@Async
@Timeout(3600) // 1小时超时
public void export() { ... }

f、断点续传(避免失败后从头再来)

// 可持久化 lastId
@Scheduled(fixedDelay = 30_000)
void saveProgress() {
    // 将 lastId 写入 DB 或 Redis
    redisTemplate.opsForValue().set("export:lastId", lastId.get());
}

// 启动时读取
Long getLastIdFromRedis() {
    return redisTemplate.opsForValue().get("export:lastId");
}

6)MyBatis 流式查询的“真相

“流式”只发生在数据库到应用的传输过程,MyBatis 内部仍是“全量缓存”

数据库
   ↓ (流式传输,每次几KB)
JDBC Driver → ResultSet
   ↓ (逐行读取)
MyBatis Core
   ├──→ 调用你的 ResultHandler.onResult()     ✅ 内存及时释放
   └──→ 缓存到 LocalCache.list              🔴 引用一直持有,不释放!
          ↓
     SqlSession 持有这个 list
          ↓
     整个 JVM 堆内存持续增长 → OOM

a、 为什么还会 OOM?——MyBatis 一级缓存的“副作用

答案:MyBatis 为了支持 ExecutorType.REUSE 和 结果引用去重,默认开启了 本地缓存(Local Cache)。即使你用 ResultHandlerMyBatis 仍然会:

🔴 把每一条查询结果缓存在 SqlSession 内部的一个 List 中!

源码证据在 DefaultResultHandlerCachingExecutor 中:

// DefaultResultHandler.java
private final List<Object> list;

@Override
public void handleResult(ResultContext<? extends Object> context) {
  list.add(context.getResultObject()); // 看这里!每条结果都 add 进去了!
}

b、如何真正解决 OOM?——commit() 是关键

答案:定期 sqlSession.commit()当你调用 commit() 时,MyBatis 会:

  1. 提交事务
  2. 清空一级缓存(Local Cache)
  3. 内部那个 list 被清空,所有对象引用释放
  4. GC 可以回收内存
    /**
     * sqlSessionFactory
     */
    @Resource
    private SqlSessionFactory sqlSessionFactory;


    @GetMapping(value = "selectList", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    @ResponseBody
    public ResponseBean selectList() {
        List<User> users = Lists.newArrayList();

        try(SqlSession sqlSession = sqlSessionFactory.openSession()){
            UserMapper mapper = sqlSession.getMapper(UserMapper.class);
            mapper.getStreamAll(new ResultHandler<User>() {
                private int batchCount = 0;
                private List<User> batch = new ArrayList<>(1000);
                @Override
                public void handleResult(ResultContext<? extends User> resultContext) {
                    User user = resultContext.getResultObject();
                    batch.add(user);
                    batchCount++;

                    // 每 1000 条处理一次(如批量入库、写文件)
                    if (batchCount % 1000 == 0) {
                        processBatch(batch);
                        batch.clear();
                        sqlSession.commit();
                    }
                }
                private void processBatch(List<User> batch) {
                    // 例如:批量插入、导出文件、发送消息等
                }
            });
            sqlSession.commit();
        }

        log.info("用户管理--------selectLis:【{}】", JsonUtils.toJsonString(users));
        return ResponseBean.buildSuccess(users);
    }
}

四、工具类

package com.healerjean.proj.utils.db;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
 * MybatisPlusQueryUtils
 *
 * @author zhangyujin
 * @date 2023/6/25$  12:05$
 */
public final class MybatisBatchUtils {

    /**
     * queryAll
     *
     * @param function     function
     * @param queryWrapper queryWrapper
     * @param pageSize     pageSize
     * @return {@link List<R>}
     */

    public static <Q, R> List<R> queryAllByLimit(BiFunction<Page<R>, Q, IPage<R>> function,
                                                 Q queryWrapper,
                                                 long pageSize) {

        IPage<R> initPage = function.apply(new Page<>(1, 0, true), queryWrapper).setSize(pageSize);
        List<R> result = new ArrayList<>();
        for (int i = 1; i <= initPage.getPages(); i++) {
            IPage<R> page = function.apply(new Page<>(i, pageSize, false), queryWrapper);
            if (CollectionUtils.isEmpty(page.getRecords())) {
                break;
            }
            result.addAll(page.getRecords());
        }
        return result;
    }


    /**
     * queryAll
     *
     * @param function function
     * @param query    query
     * @param minMax   minMax
     * @return {@link List<R>}
     */
    public static <Q, R> List<R> queryAllByIdSize(BiFunction<IdQueryBO, Q, List<R>> function,
                                                  Q query,
                                                  IdQueryBO minMax) {
        List<R> result = Lists.newArrayList();
        Long minId = minMax.getMinId();
        Long size = minMax.getSize();
        while (minId != null) {
            IdQueryBO idQueryBO = new IdQueryBO(minId, size);
            List<R> dbList = function.apply(idQueryBO, query);
            if (CollectionUtils.isEmpty(dbList)) {
                break;
            }
            minId = idQueryBO.getMaxId();
            result.addAll(dbList);
        }
        return result;
    }

    /**
     * queryAll
     *
     * @param function function
     * @param query    query
     * @param minMax   minMax
     * @return {@link List<R>}
     */
    public static <Q, R> List<R> queryAllByIdSub(BiFunction<IdQueryBO, Q, List<R>> function,
                                                 Q query,
                                                 IdQueryBO minMax) {
        List<R> result = Lists.newArrayList();
        Long minId = minMax.getMinId();
        Long maxId = minMax.getMaxId();
        Long size = minMax.getSize();
        for (long i = minId; i <= maxId; i = i + size) {
            long endId = Math.min(i + size, maxId);
            boolean maxEqualFlag = endId == maxId;
            IdQueryBO idQueryBO = new IdQueryBO(true, i, maxEqualFlag, endId, size);
            List<R> dbList = function.apply(idQueryBO, query);
            if (CollectionUtils.isEmpty(dbList)) {
                break;
            }
            result.addAll(dbList);
            if (maxEqualFlag) {
                break;
            }
        }
        return result;
    }


    /**
     * queryAll
     *
     * @param executorService 线程池
     * @param function        分页函数
     * @param queryWrapper    查询条件
     * @param pageSize        pageSize 分页大小
     * @param coverFunction   coverFunction 对象转化
     * @return {@link List< Future< List<T>>>}
     */
    public static <Q, R, T> List<Future<List<T>>> queryAllByPoolLimit(CompletionService<List<T>> executorService,
                                                                      BiFunction<Page<R>, Q, IPage<R>> function,
                                                                      Q queryWrapper,
                                                                      int pageSize,
                                                                      Function<List<R>, List<T>> coverFunction) {

        IPage<R> initPage = function.apply(new Page<>(1, 0, true), queryWrapper).setSize(pageSize);

        List<Future<List<T>>> result = new ArrayList<>();
        for (int i = 1; i <= initPage.getPages(); i++) {
            int finalI = i;
            Future<List<T>> future = executorService.submit(() -> {
                IPage<R> page = function.apply(new Page<>(finalI, pageSize, false), queryWrapper);
                return coverFunction.apply(page.getRecords());
            });
            result.add(future);
        }
        return result;
    }


    /**
     * queryAll
     *
     * @param executorService 线程池
     * @param function        分页函数
     * @param query           查询条件
     * @param minMax          minMax 最小Id和最大Id
     * @param coverFunction   coverFunction 对象转化
     */
    public static <Q, R, T> List<Future<List<T>>> queryAllByPoolIdSub(CompletionService<List<T>> executorService,
                                                                      BiFunction<IdQueryBO, Q, List<R>> function,
                                                                      Q query,
                                                                      IdQueryBO minMax,
                                                                      Function<List<R>, List<T>> coverFunction) {
        Long minId = minMax.getMinId();
        Long maxId = minMax.getMaxId();
        Long size = minMax.getSize();
        List<Future<List<T>>> result = new ArrayList<>();
        for (long i = minId; i <= maxId; i = i + size) {
            long endId = Math.min(i + size, maxId);
            boolean maxEqualFlag = endId == maxId;
            IdQueryBO idQueryBO = new IdQueryBO(true, i, maxEqualFlag, endId, size);
            Future<List<T>> future = executorService.submit(() -> {
                List<R> list = function.apply(idQueryBO, query);
                return coverFunction.apply(list);
            });
            result.add(future);
            if (maxEqualFlag) {
                break;
            }
        }
        return result;
    }

}

1、线程池 Id 分段查询

1)线程池 Id 分段工具

package com.healerjean.proj.utils.db;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
 * MybatisPlusQueryUtils
 *
 * @author zhangyujin
 * @date 2023/6/25$  12:05$
 */
public final class MybatisBatchUtils {



    /**
     * queryAll
     *
     * @param executorService 线程池
     * @param function        分页函数
     * @param query           查询条件
     * @param minMax          minMax 最小Id和最大Id
     * @param coverFunction   coverFunction 对象转化
     */
    public static <Q, R, T> List<Future<List<T>>> queryAllByPoolIdSub(CompletionService<List<T>> executorService,
                                                                      BiFunction<IdQueryBO, Q, List<R>> function,
                                                                      Q query,
                                                                      IdQueryBO minMax,
                                                                      Function<List<R>, List<T>> coverFunction) {
        Long minId = minMax.getMinId();
        Long maxId = minMax.getMaxId();
        Long size = minMax.getSize();
        List<Future<List<T>>> result = new ArrayList<>();
        for (long i = minId; i <= maxId; i = i + size) {
            long endId = Math.min(i + size, maxId);
            boolean maxEqualFlag = endId == maxId;
            IdQueryBO idQueryBO = new IdQueryBO(true, i, maxEqualFlag, endId, size);
            Future<List<T>> future = executorService.submit(() -> {
                List<R> list = function.apply(idQueryBO, query);
                return coverFunction.apply(list);
            });
            result.add(future);
            if (maxEqualFlag) {
                break;
            }
        }
        return result;
    }

}

2)Manager

/**
 * 根据查询条件获取最大Id和最小Id
 *
 * @param query query
 * @return ImmutablePair<Long, Long>
 */
@Override
public ImmutablePair<Long, Long> queryMinAndMaxId(UserDemoQueryBO query) {
    QueryWrapper<UserDemo> queryWrapper = new QueryWrapper<>();
    queryWrapper.select("min(id) as \"minId\"", "max(id) as \"maxId\"");
    LambdaQueryWrapper<UserDemo> lambdaQueryWrapper = queryWrapper.lambda()
            .eq(Objects.nonNull(query.getId()), UserDemo::getId, query.getId())
            .eq(Objects.nonNull(query.getValidFlag()), UserDemo::getValidFlag, query.getValidFlag())
            .like(StringUtils.isNotBlank(query.getName()), UserDemo::getName, query.getName())
            .like(StringUtils.isNotBlank(query.getPhone()), UserDemo::getPhone, query.getPhone())
            .like(StringUtils.isNotBlank(query.getEmail()), UserDemo::getEmail, query.getEmail())
            .like(StringUtils.isNotBlank(query.getLikeName()), UserDemo::getName, query.getLikeName())
            .like(StringUtils.isNotBlank(query.getLikePhone()), UserDemo::getPhone, query.getLikePhone())
            .le(Objects.nonNull(query.getQueryTime()), UserDemo::getStartTime, query.getQueryTime())
            .ge(Objects.nonNull(query.getQueryTime()), UserDemo::getEndTime, query.getQueryTime());
    Map<String, Object> map = userDemoDao.getMap(lambdaQueryWrapper);
    return ImmutablePair.of(MapUtils.getLong(map, "minId"), MapUtils.getLong(map, "maxId"));
}

/**
 * 根据id区间查询数据
 *
 * @param idQueryBO idQueryBO
 * @return {@link List< UserDemo>}
 */
@Override
public List<UserDemo> queryUserDemoByIdSub(IdQueryBO idQueryBO, UserDemoQueryBO query) {
    QueryWrapper<UserDemo> queryWrapper = new QueryWrapper<>();
    if (!CollectionUtils.isEmpty(query.getSelectFields())) {
        queryWrapper.select(StringUtils.join(query.getSelectFields(), ","));
    }
    LambdaQueryWrapper<UserDemo> lambdaQueryWrapper = queryWrapper.lambda()
            .eq(Objects.nonNull(query.getId()), UserDemo::getId, query.getId())
            .eq(Objects.nonNull(query.getValidFlag()), UserDemo::getValidFlag, query.getValidFlag())
            .eq(StringUtils.isNotBlank(query.getName()), UserDemo::getName, query.getName())
            .eq(StringUtils.isNotBlank(query.getPhone()), UserDemo::getPhone, query.getPhone())
            .eq(StringUtils.isNotBlank(query.getEmail()), UserDemo::getEmail, query.getEmail())
            .like(StringUtils.isNotBlank(query.getLikeName()), UserDemo::getName, query.getLikeName())
            .like(StringUtils.isNotBlank(query.getLikePhone()), UserDemo::getPhone, query.getLikePhone())
            .lt(Objects.nonNull(query.getQueryTime()), UserDemo::getStartTime, query.getQueryTime())
            .gt(Objects.nonNull(query.getQueryTime()), UserDemo::getEndTime, query.getQueryTime());

    if (idQueryBO.getMinEqualFlag()) {
        lambdaQueryWrapper.ge(UserDemo::getId, idQueryBO.getMinId());
    } else {
        lambdaQueryWrapper.gt(UserDemo::getId, idQueryBO.getMinId());
    }

    if (idQueryBO.getMaxEqualFlag()) {
        lambdaQueryWrapper.le(UserDemo::getId, idQueryBO.getMaxId());
    } else {
        lambdaQueryWrapper.lt(UserDemo::getId, idQueryBO.getMaxId());
    }
    return userDemoDao.list(lambdaQueryWrapper);
}

3)BO 对象 IdQueryBO

package com.healerjean.proj.utils.db;

import lombok.Data;
import lombok.experimental.Accessors;

import java.io.Serializable;

/**
 * @author zhangyujin
 * @date 2023/7/5$  15:45$
 */
@Accessors(chain = true)
@Data
public class IdQueryBO implements Serializable {

    /**
     * serialVersionUID
     */
    private static final long serialVersionUID = 3145985803712258405L;

    /**
     * 是否等于最小
     */
    private Boolean minEqualFlag;
    /**
     * 最小Id
     */
    private Long minId;

    /**
     * 是否等于最大
     */
    private Boolean maxEqualFlag;
    /**
     * 最大Id
     */
    private Long maxId;

    /**
     * 每次查询多少个
     */
    private Long size;

    public IdQueryBO(Long minId, Long maxId, Long size) {
        this.minId = minId;
        this.maxId = maxId;
        this.size = size;
    }

    public IdQueryBO(Boolean minEqualFlag, Long minId, Boolean maxEqualFlag, Long maxId, Long size) {
        this.minEqualFlag = minEqualFlag;
        this.minId = minId;
        this.maxEqualFlag = maxEqualFlag;
        this.maxId = maxId;
        this.size = size;
    }
}

4)Service

    /**
     * 根据
     *
     * @param completionService completionService
     * @param query             queryBO
     * @return List<Future < List < UserDemoExcel>>>
     */
    @Override
    public List<Future<List<UserDemoExcel>>> queryAllUserDemoByPoolIdSub(CompletionService<List<UserDemoExcel>> completionService, UserDemoQueryBO query) {
        ImmutablePair<Long, Long> minAndMaxId = userDemoManager.queryMinAndMaxId(query);
        Long minId = minAndMaxId.getLeft();
        Long maxId = minAndMaxId.getRight();
        IdQueryBO idQueryBO = new IdQueryBO(minId, maxId, 2L);
        return MybatisBatchUtils.queryAllByPoolIdSub(completionService,
                (p, q) -> userDemoManager.queryUserDemoByIdSub(p, q),
                query,
                idQueryBO,
                UserConverter.INSTANCE::covertUserDemoPoToExcelList);
    }

5)单测

@DisplayName("testQueryMinAndMaxId")
@Test
public void testQueryMinAndMaxId() {
    CompletionService<List<UserDemoExcel>> completionService = new ExecutorCompletionService(ThreadPoolUtils.DEFAULT_THREAD_POOL_TASK_EXECUTOR);

    UserDemoQueryBO queryBO = new UserDemoQueryBO();
    List<Future<List<UserDemoExcel>>> futures = userDemoService.queryAllUserDemoByPoolIdSub(completionService, 
                                                                                            queryBO);
    List<UserDemoExcel> all = Lists.newArrayList();
    for (int i = 0; i < futures.size(); i++) {
        try {
            Future<List<UserDemoExcel>> future = completionService.take();
            List<UserDemoExcel> userDemos = future.get();
            if (CollectionUtils.isEmpty(userDemos)) {
                continue;
            }
            all.addAll(future.get());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    log.info("[testQueryMinAndMaxId] res:{}", JsonUtils.toString(all));
}

2、线程池 limit 分页查询

1)线程池分页读工具

package com.healerjean.proj.utils.db;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
 * MybatisPlusQueryUtils
 *
 * @author zhangyujin
 * @date 2023/6/25$  12:05$
 */
public final class MybatisBatchUtils {

    /**
     * queryAll
     *
     * @param executorService 线程池
     * @param function        分页函数
     * @param queryWrapper    查询条件
     * @param pageSize        pageSize 分页大小
     * @param coverFunction   coverFunction 对象转化
     * @return {@link List< Future< List<T>>>}
     */
    public static <Q, R, T> List<Future<List<T>>> queryAllByPoolLimit(CompletionService<List<T>> executorService,
                                                                      BiFunction<Page<R>, Q, IPage<R>> function,
                                                                      Q queryWrapper,
                                                                      int pageSize,
                                                                      Function<List<R>, List<T>> coverFunction) {

        IPage<R> initPage = function.apply(new Page<>(1, 0, true), queryWrapper).setSize(pageSize);

        List<Future<List<T>>> result = new ArrayList<>();
        for (int i = 1; i <= initPage.getPages(); i++) {
            int finalI = i;
            Future<List<T>> future = executorService.submit(() -> {
                IPage<R> page = function.apply(new Page<>(finalI, pageSize, false), queryWrapper);
                return coverFunction.apply(page.getRecords());
            });
            result.add(future);
        }
        return result;
    }


}

2)Manager

/**
 * 获取查询条件
 *
 * @param query query
 * @return LambdaQueryWrapper<UserDemo>
 */
@Override
public LambdaQueryWrapper<UserDemo> queryBuilderQueryWrapper(UserDemoQueryBO query) {
    QueryWrapper<UserDemo> queryWrapper = new QueryWrapper<>();
    if (!CollectionUtils.isEmpty(query.getSelectFields())) {
        queryWrapper.select(StringUtils.join(query.getSelectFields(), ","));
    }
    if (!CollectionUtils.isEmpty(query.getOrderByList())) {
        query.getOrderByList().forEach(item -> queryWrapper.orderBy(Boolean.TRUE, item.getDirection(), 
                                                                    item.getProperty()));
    }
    return queryWrapper.lambda()
            .eq(Objects.nonNull(query.getId()), UserDemo::getId, query.getId())
            .eq(Objects.nonNull(query.getValidFlag()), UserDemo::getValidFlag, query.getValidFlag())
            .eq(StringUtils.isNotBlank(query.getName()), UserDemo::getName, query.getName())
            .eq(StringUtils.isNotBlank(query.getPhone()), UserDemo::getPhone, query.getPhone())
            .eq(StringUtils.isNotBlank(query.getEmail()), UserDemo::getEmail, query.getEmail())
            .like(StringUtils.isNotBlank(query.getLikeName()), UserDemo::getName, query.getLikeName())
            .like(StringUtils.isNotBlank(query.getLikePhone()), UserDemo::getPhone, query.getLikePhone())
            .lt(Objects.nonNull(query.getQueryTime()), UserDemo::getStartTime, query.getQueryTime())
            .gt(Objects.nonNull(query.getQueryTime()), UserDemo::getEndTime, query.getQueryTime());
}

3)Service

/**
 * queryFutureAll
 *
 * @param completionService completionService
 * @param query             query
 * @return List<Future < List < UserDemoExcel>>>
 */
@Override
public List<Future<List<UserDemoExcel>>> queryAllUserDemoByPoolLimit(CompletionService<List<UserDemoExcel>> completionService, UserDemoQueryBO query) {
    QueryWrapper<UserDemo> queryWrapper = userDemoManager.queryBuilderQueryWrapper(query);
    return MybatisBatchUtils.queryAllByPoolLimit(completionService,
            (p, q) -> userDemoDao.page(p, q),
            queryWrapper,
            1,
            UserConverter.INSTANCE::covertUserDemoPoToExcelList);
}

4)单测

@DisplayName("testQueryUserDemoByLimit")
@Test
public void testQueryUserDemoByLimit() {
    CompletionService<List<UserDemoExcel>> completionService = new ExecutorCompletionService(ThreadPoolUtils.DEFAULT_THREAD_POOL_TASK_EXECUTOR);

    UserDemoQueryBO queryBO = new UserDemoQueryBO();
    List<Future<List<UserDemoExcel>>> futures = userDemoService.queryAllUserDemoByPoolLimit(completionService, 
                                                                                            queryBO);
    List<UserDemoExcel> all = Lists.newArrayList();
    for (int i = 0; i < futures.size(); i++) {
        try {
            Future<List<UserDemoExcel>> future = completionService.take();
            List<UserDemoExcel> userDemos = future.get();
            if (CollectionUtils.isEmpty(userDemos)) {
                continue;
            }
            all.addAll(future.get());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    log.info("[testQueryUserDemoByLimit] res:{}", JsonUtils.toString(all));
}

ContactAuthor