Msql千万级优化
前言
Github:https://github.com/HealerJean
一、分页查询优化
问题 SQL
解释:表中的字段越多下面的优化越明显,否则即使使用了下面的优化,也可能没有那么明显
通过下面的可以观察到 当达到
1000万的时候,查询时间到了37s,太可怕了
select * from tb_ams_inf_repay_stat limit 0,10 ;
# 0.003s
select * from tb_ams_inf_repay_stat limit 10000,10 ;
# 1万 0.023s
select * from tb_ams_inf_repay_stat limit 100000,10 ;
# 10万 0.191s
select * from tb_ams_inf_repay_stat limit 1000000,10 ;
# 100万 1.942s
select * from tb_ams_inf_repay_stat limit 10000000,10 ;
# 1000万 37.323s
1、简单优化
1)子查询
0.23s 简直要飞起来了
1、先使用覆盖索引
index查询 ,我们只查询id索引这一个字段,比select *或者多个字段快多了,因为只要我们写上这些字段,我们只需要10个,但是从第一条开始到1000万条其实是都要去扫描的2、然后再进行索引范围内
range查询
0.23s
select *
from tb_ams_inf_repay_stat
where id > (select id from tb_ams_inf_repay_stat limit 1000000, 1)
limit 0,10 ;
--推荐使用
select * from table where id > 243800 order by id limit 10;
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
|---|---|---|---|---|---|---|---|---|---|
| 1 | PRIMARY | tb_ams_inf_repay_stat | range | PRIMARY | PRIMARY | 8 | NULL | 3258410 | Using where |
| 2 | SUBQUERY | tb_ams_inf_repay_stat | index | NULL | idx_orgcd_loannum | 216 | NULL | 19753500 | Using index |
2)Join 连接
-- 0.31
SELECT *
FROM tb_ams_inf_repay_stat a
JOIN (select id from tb_ams_inf_repay_stat limit 1000010, 10) b ON a.ID = b.id
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
|---|---|---|---|---|---|---|---|---|---|
| 1 | PRIMARY | <derived2> | ALL | NULL | NULL | NULL | NULL | 1000020 | NULL |
| 1 | PRIMARY | a | eq_ref | PRIMARY | PRIMARY | 8 | b.id | 1 | NULL |
| 2 | DERIVED | tb_ams_inf_repay_stat | index | NULL | idx_orgcd_loannum | 216 | NULL | 19753500 | Using index |
2、其他优化
1)带有条件的,id 连续的查询(between)
0.03s
select * from tb_ams_inf_repay_stat where id between 1000000 and 1000010 ;
2)带有条件,id 不连续的查询,考虑建立索引
20s 慢死了
select * from tb_ams_inf_repay_stat where org_cd = 'xmsd' limit 1000000,10 ;
-- org_cd建立索引
select *
from tb_ams_inf_repay_stat
where org_cd = 'xmsd'
and id > (select id from tb_ams_inf_repay_stat where org_cd = 'xmsd' limit 1000000,1)
limit 0,10 ;
0.2s 可以说相当的快了
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
|---|---|---|---|---|---|---|---|---|---|
| 1 | PRIMARY | NULL | NULL | NULL | NULL | NULL | NULL | NULL | ~~~~ |
| 2 | SUBQUERY | tb_ams_inf_repay_stat | ref | idx_orgcd_loannum | idx_orgcd_loannum | 93 | const | 1 | Using where; Using index |
二、千万测试数据生成
1、存储过程
1)表
create table if not exists `user_demo`
(
`id` bigint unsigned not null auto_increment comment '主键',
`name` varchar(32) not null default '' comment '姓名',
`age` int not null default 0 comment '年龄',
`phone` varchar(32) not null default '' comment '电话',
`email` varchar(64) not null default '' comment '邮箱',
`start_time` datetime default null comment '开始时间',
`end_time` datetime default null comment '结束时间',
`valid_flag` int not null default 1 comment '1有效 0 废弃',
`create_time` datetime not null default current_timestamp comment '创建时间',
`update_time` datetime not null default current_timestamp on update current_timestamp comment '更新时间',
primary key (`id`)
) engine = innodb
default charset = utf8;
2)存储过程
drop procedure if exists test_batch_create;
create procedure test_batch_create(in loop_counts int, in date varchar(50))
begin
declare i int;
set i = 0;
set autocommit = 0; -- 关闭自动提交事务,提高插入效率
while i < loop_counts
do
insert into `user_demo` (name, age, phone, email, start_time, end_time, valid_flag)
values (concat('张', floor(rand() * 2 * i)), floor(rand() * i), floor(rand() * 3 * i),
concat( floor(rand() * 2 * i), '@gmail.com'), date_add(date ,interval i day), date_add(date ,interval i*2 day), 1);
set i = i + 1;
end while;
commit;
end;
3)执行
CALL test_batch_create(10, '2023-07-01');
三、三种大数据量查询
1、三种大数据查询方式对比
| 对比维度 | 流式查询(Streaming) |
LIMIT 分页 |
id > last_id LIMIT size |
|---|---|---|---|
| 原理 | 数据库逐行返回,客户端边读边处理 | 跳过 offset 行,取 size 条 |
基于主键,从上次结束位置继续 |
| 性能(大数据量) | 最优 | 差,offset 越大越慢) |
好,但依赖索引) |
| 内存占用(客户端) | 低,逐条处理 | 中等,每页加载 | 中等,每页加载 |
| 数据库连接占用 | 高,需长时间保持 | 低,短连接 | 低,短连接 |
| 实现复杂度 | 高 | 低 | 中 |
| 是否支持随机跳页 | 不支持 | 支持(如第100页) | 不支持(只能下一页 |
| 适用场景 | 导出、ETL、实时处理 |
前端分页展示 | 后台任务、消息队列拉取 |
2、三种查询介绍
1)流式查询
- 流式查询 = 服务器端游标 + 客户端逐行读取
- 优点:内存低、响应快(首条数据快)、快照固定
- 缺点:连接长、不能跳页、实现复杂
@GetMapping(value = "selectList", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@ResponseBody
public ResponseBean selectList() {
List<User> users = Lists.newArrayList();
try(SqlSession sqlSession = sqlSessionFactory.openSession()){
UserMapper mapper = sqlSession.getMapper(UserMapper.class);
mapper.getStreamAll(new ResultHandler<User>() {
private int batchCount = 0;
private List<User> batch = new ArrayList<>(1000);
@Override
public void handleResult(ResultContext<? extends User> resultContext) {
User user = resultContext.getResultObject();
batch.add(user);
batchCount++;
// 每 1000 条处理一次(如批量入库、写文件)
if (batchCount % 1000 == 0) {
processBatch(batch);
batch.clear();
sqlSession.commit();
}
}
private void processBatch(List<User> batch) {
// 例如:批量插入、导出文件、发送消息等
}
});
sqlSession.commit();
}
log.info("用户管理--------selectLis:【{}】", JsonUtils.toJsonString(users));
return ResponseBean.buildSuccess(users);
}
a、工作原理
- 建立连接后,发送 SQL 查询:
SELECT * FROM user JDBC驱动设置流式模式:- 调用
Statement.setFetchSize(Integer.MIN_VALUE)(但每次读取的块大小由驱动决定)Integer.MIN_VALUE是MySQL JDBC驱动的“魔法值”,表示“请用流式”- 告诉数据库:“我要流式读取,不要一次性把所有结果加载到内存。
- 调用
- 数据库开始返回数据块(
Chunk):- 数据库不会一条一条发,而是按网络包大小或驱动缓冲区(如 4KB、8KB)分批发送。
- 例如:一次发
100条,下一次再发100条……
JDBC驱动在本地维护一个小型缓冲区:- 当缓冲区的数据被程序消费完,驱动会自动从网络读取下一批。
MyBatis将每条记录取出,回调handleResult():- 对你来说,就是“一条一条”地处理。
b、是“一条一条”读取吗?
| 问题 | 回答 |
|---|---|
| 是“一条一条”读取吗? | 从应用处理角度看:是的,每条记录都单独回调。 |
| 底层真的只传一条吗? | 不是,是“小批量分块传输”,但对开发者透明。 |
| 好处是什么? | 内存占用低,适合处理百万级数据。 |
| 风险? | 连接长时间不释放,需谨慎管理资源。 |
c、百万数据流式查询大概耗时范围
| 场景 | 预估耗时 | 说明 |
|---|---|---|
| 理想情况(本地数据库、SSD、简单查询、无处理逻辑) | 10 ~ 30 秒 | 数据库快,网络快,处理快 |
| 一般情况(内网、普通硬盘、字段较多) | 30 秒 ~ 2 分钟 | 常见企业环境 |
| 复杂情况(远程数据库、慢查询、大字段、复杂处理) | 2 ~ 10 分钟甚至更长 | 含 BLOB、JSON、计算等 |
| 问题 | 回答 |
|---|---|
| 百万数据流式查询耗时? | 通常 30 秒 ~ 2 分钟 |
| 最快能到多少? | 10+ 秒(理想环境) |
| 最慢会多久? | 10 分钟+(网络差、处理慢) |
d、fetchSize 设置解释
Integer.MIN_VALUE是“开关
| 设置 | 是否启用服务器端游标 | 是否流式 | 是否 OOM 风险 |
|---|---|---|---|
fetchSize=1000 |
❌ 否(默认客户端游标) | ❌ 不是真流式 | ✅ 有(全量加载) |
fetchSize=Integer.MIN_VALUE |
✅ 是(服务器端游标) | ✅ 真流式 | ❌ 无(分批拉取) |
fetchSize(1000)-
一次性把所有数据从服务器拉到客户端内存
-
然后在本地“假装”是分批返回(每次返回 1000 条)
- 但内存已经占满了
2)LIMIT 分页
-
LIMIT 1000000, 10为什么慢-
数据库仍需扫描前 1,000,000 条记录,即使不返回。
-
尤其是
ORDER BY存在时,可能无法利用索引覆盖。 -
这叫“偏移量陷阱(
OffsetPaginationProblem)”。
-
-
何时不要用?
-
offset > 10,000时基本不可用 -
高并发分页接口(如
/api/users?page=100)
-
-
何时可用?
-
前端展示,页数少(如 ≤ 100)
-
数据量小(如 < 10万)
-
3)id > last_id LIMIT size
优势:
- 性能稳定:
WHERE id > ? LIMIT ?走主键索引,O(log n) - 无偏移量问题,翻页飞快
- 适合“下一页”场景(如微博、消息流)
缺点:
- 不能跳页(如直接跳第10页)
- 依赖单调递增字段(如
id,create_time) - 如果
id有删除或不连续,可能导致“漏数据”或“重复”
3、常见问题答疑
1)普通查询 VS 流式
| 方式 | 普通查询 List<User> |
流式查询 ResultHandler |
|---|---|---|
| 内存使用 | 一次性加载所有数据到内存 | 每次只处理一条,内存占用小 |
| 适用场景 | 数据量小(几千、几万) | 大数据量(几十万、上百万) |
| 风险 | 可能 OOM(内存溢出) | 避免 OOM |
| 处理方式 | 先查完再处理 | 边查边处理(实时) |
2)流式 vs IdSize
| 对比项 | 流式查询(Streaming) | 自定义 ID 分页(WHERE id BETWEEN ? AND ?) |
|---|---|---|
| 场景 | 批量导出、ETL |
API 分页、消息拉取 |
| SQL 执行次数 | 1 次 | N 次(如 100 万 / 1000 = 1000 次) |
| 连接 | 长连接(风险) | 短连接(安全) |
| 内存占用 | 低(逐条处理) | 低(每页处理完就释放) |
| 数据库压力 | 低(一次连接,流式读) | 高(频繁查询,索引扫描) |
| 网络开销 | 低(一次连接,持续传输) | 高(多次连接或长连接) |
是否需要 ID 连续 |
不需要 | 需要(否则分页不准) |
| 能否并行处理 | 不能(顺序流) | 能(不同 ID 范围可多线程) |
| 能否中断/恢复 | 难(流中断就失败) | 能(记录最后 ID 即可) |
| 首次响应时间 | 稍慢(建立流需要时间) | 快(第一页很快返回) |
| 总耗时 | 通常更短 | 通常更长(N 次查询 + 间隙) |
| 实现复杂度 | 简单 | 稍复杂(需管理分页逻辑) |
3)性能对比实验
| 查询方式 | 查询第 10 页(每页 100) | 查询第 100,000 页 | 全量导出 |
|---|---|---|---|
LIMIT offset, size |
50ms | 12s | 不可行 |
WHERE id > ? LIMIT ? |
5ms | 5ms | 需自己循环 |
| 流式查询 | 不适用 | 不适用 | 8s (内存稳定) |
4)选型建议
| 你要做什么? | 选哪种? |
|---|---|
做 Excel 导出,数据量 > 10万 |
流式查询 |
| 做后台管理分页,最多翻 100 页 | LIMIT |
做 App 信息流,无限下拉 |
Id > last_id |
| 做定时任务,处理未同步数据 | Id > last_max_id |
| 做实时计算,监听数据变化 | 以上都不行 → 用 CDC(如 Debezium) |
5)流式查询如何避免占满连接
场景:安全地流式查询 1000 万条订单数据,用于导出 Excel,不阻塞主业务连接池,不引发 OOM,支持中断后继续。
- 专用数据源:为流式查询配置独立的连接池(如 HikariCP 单独配置)
- 限制并发:同一时间只允许 1~2 个流式任务(加锁)
- 超时控制:设置
queryTimeout和socketTimeout - 分批流式:每 1 万条
commit一次,避免事务过长 - 异步处理:使用
@Async或线程池处理流数据
| 问题 | 解决方案 |
|---|---|
| 占满主连接池 | 独立数据源,最大3连接 |
| 并发太多 | 加锁 |
| 连接太久 | 设置 max-lifetime + 分批 commit |
| 事务过长 | 每 1万条 commit 一次 |
| 卡住不响应 | 设置 queryTimeout + connection-timeout |
| 失败重来 | Redis 记录 lastId,支持续传 |
a、专用数据源
application.yml
spring:
datasource:
# 主数据源(业务用)
master:
jdbc-url: jdbc:mysql://localhost:3306/app_db?useSSL=false&useUnicode=true
username: root
password: root
hikari:
maximum-pool-size: 20
minimum-idle: 5
# 专用数据源(流式查询用)
streaming:
jdbc-url: jdbc:mysql://localhost:3306/app_db?useSSL=false&useUnicode=true&defaultFetchSize=1000
username: root
password: root
hikari:
pool-name: StreamingHikariPool
maximum-pool-size: 3 # 严格限制:最多3个流式任务
minimum-idle: 0
max-lifetime: 1800000 # 30分钟,避免长连接僵死
idle-timeout: 600000 # 10分钟空闲超时
connection-timeout: 30000 # 30秒获取连接超时
StreamingDataSourceConfig.java
关键参数说明:
maximum-pool-size: 3:最多3个流式任务并行defaultFetchSize=1000:JDBC默认 fetch 大小,启用流式max-lifetime:防止连接太久被防火墙断开
@Configuration
@ConditionalOnProperty(prefix = "spring.datasource.streaming", name = "jdbc-url")
public class StreamingDataSourceConfig {
@Bean("streamingDataSource")
@ConfigurationProperties("spring.datasource.streaming")
public DataSource streamingDataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}
@Bean("streamingSqlSessionFactory")
public SqlSessionFactory sqlSessionFactory(@Qualifier("streamingDataSource") DataSource dataSource)
throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
// 可设置插件、typeHandler等
return bean.getObject();
}
}
b、Mapper 层启用流式查询
@Options(fetchSize = Integer.MIN_VALUE)是 MyBatis 启用流式的关键!
@Mapper
public interface OrderMapper {
@Select("SELECT id, user_id, amount, create_time FROM orders WHERE id > #{lastId} ORDER BY id")
@Options(fetchSize = Integer.MIN_VALUE) // 启用服务器端游标
void streamOrders(@Param("lastId") Long lastId, ResultHandler<Order> handler);
}
c、服务层实现分批处理 + 自动提交
- 核心逻辑:
SqlSession手动管理事务- 每处理
BATCH_SIZE条commit()一次 - lastId` 记录位置,可用于断点续传
- 解决的四大问题
| 问题 | 后果 | 分批提交如何解决 |
|---|---|---|
| 1. 事务过长(Long-running Transaction) | 锁表、回滚段膨胀、主从延迟 | 主动提交,拆分为多个短事务 |
| 2. 内存溢出(OOM) | JVM 堆内存耗尽,服务崩溃 |
释放 MyBatis 一级缓存 |
| 3. 连接被长时间占用 | 连接池耗尽,影响其他业务 | 减少单次连接持有时间 |
| 4. 故障后需重头再来 | 任务失败,进度丢失 | 结合 lastId 可断点续传 |
问题1:事务过长有啥问题?最致命?
-
极端情况:导出 1000 万条数据,事务持续 30 分钟 → 整个
orders表几乎不可写! MyBatis默认开启事务(SqlSession级别)- 如果你不
commit(),整个流式查询会在一个事务中执行到底 - 数据库会:
- 长时间持有锁(如
InnoDB的next-key lock) - 不断增长回滚段(
undolog) - 阻塞其他
DML操作 - 导致主从复制延迟(
binlog写入延迟)
- 长时间持有锁(如
问题2:内存溢出(OOM)——MyBatis 隐藏陷阱
答案:很多人以为“流式查询 = 不占内存”,但 MyBatis 的一级缓存(Local Cache)会缓存所有已查询的对象引用!即使你用 ResultHandler 逐条处理MyBatis 仍会把每一条结果缓存在 SqlSession 内部。1000 万条订单,每条 1KB → 至少占用 10GB 内存!
@Service
public class OrderExportService {
@Autowired
@Qualifier("streamingSqlSessionFactory")
private SqlSessionFactory streamingSqlSessionFactory;
private static final int BATCH_SIZE = 10_000; // 每处理1万条 commit 一次
public void exportAllOrders() {
try (SqlSession sqlSession = streamingSqlSessionFactory.openSession(ExecutorType.SIMPLE)) {
OrderMapper mapper = sqlSession.getMapper(OrderMapper.class);
AtomicLong counter = new AtomicLong(0);
AtomicLong lastId = new AtomicLong(0);
ResultHandler<Order> handler = resultContext -> {
Order order = resultContext.getResultObject();
lastId.set(order.getId());
// 业务处理:写入 Excel、发 MQ、写文件等
processOrder(order);
long count = counter.incrementAndGet();
// 每 10000 条提交一次,避免事务过长
if (count % BATCH_SIZE == 0) {
log.info("Committing batch at order id: {}, total: {}", lastId.get(), count);
sqlSession.commit();
}
};
// 开始流式查询
mapper.streamOrders(lastId.get(), handler);
sqlSession.commit(); // 最后一批
log.info("Export completed. Total records: {}", counter.get());
} catch (Exception e) {
log.error("Stream export failed", e);
throw new RuntimeException("Export failed", e);
}
}
private void processOrder(Order order) {
// 模拟处理:写入文件、发MQ等
// ExcelWriter.write(order);
// rabbitTemplate.convertAndSend("export.queue", order);
}
}
d、超时控制
JDBC层超时(application.yml)
spring:
datasource:
streaming:
hikari:
# 获取连接超时
connection-timeout: 30000
# SQL 执行超时(HikariCP 1.4+)
leak-detection-threshold: 600000 # 10分钟未归还连接报警
MyBatis层设置queryTimeout
@Options(fetchSize = Integer.MIN_VALUE, timeout = 300) // 300秒超时
@Select("SELECT ...")
void streamOrders(...);
e Spring 层设置异步任务超时
@Async
@Timeout(3600) // 1小时超时
public void export() { ... }
f、断点续传(避免失败后从头再来)
// 可持久化 lastId
@Scheduled(fixedDelay = 30_000)
void saveProgress() {
// 将 lastId 写入 DB 或 Redis
redisTemplate.opsForValue().set("export:lastId", lastId.get());
}
// 启动时读取
Long getLastIdFromRedis() {
return redisTemplate.opsForValue().get("export:lastId");
}
6)MyBatis 流式查询的“真相
“流式”只发生在数据库到应用的传输过程,MyBatis 内部仍是“全量缓存”。
数据库
↓ (流式传输,每次几KB)
JDBC Driver → ResultSet
↓ (逐行读取)
MyBatis Core
├──→ 调用你的 ResultHandler.onResult() ✅ 内存及时释放
└──→ 缓存到 LocalCache.list 🔴 引用一直持有,不释放!
↓
SqlSession 持有这个 list
↓
整个 JVM 堆内存持续增长 → OOM
a、 为什么还会 OOM?——MyBatis 一级缓存的“副作用
答案:MyBatis 为了支持 ExecutorType.REUSE 和 结果引用去重,默认开启了 本地缓存(Local Cache)。即使你用 ResultHandler,MyBatis 仍然会:
🔴 把每一条查询结果缓存在
SqlSession内部的一个List中!
源码证据在 DefaultResultHandler 和 CachingExecutor 中:
// DefaultResultHandler.java
private final List<Object> list;
@Override
public void handleResult(ResultContext<? extends Object> context) {
list.add(context.getResultObject()); // 看这里!每条结果都 add 进去了!
}
b、如何真正解决 OOM?——commit() 是关键
答案:定期 sqlSession.commit()当你调用 commit() 时,MyBatis 会:
- 提交事务
- 清空一级缓存(Local Cache)
- 内部那个
list被清空,所有对象引用释放 GC可以回收内存
/**
* sqlSessionFactory
*/
@Resource
private SqlSessionFactory sqlSessionFactory;
@GetMapping(value = "selectList", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@ResponseBody
public ResponseBean selectList() {
List<User> users = Lists.newArrayList();
try(SqlSession sqlSession = sqlSessionFactory.openSession()){
UserMapper mapper = sqlSession.getMapper(UserMapper.class);
mapper.getStreamAll(new ResultHandler<User>() {
private int batchCount = 0;
private List<User> batch = new ArrayList<>(1000);
@Override
public void handleResult(ResultContext<? extends User> resultContext) {
User user = resultContext.getResultObject();
batch.add(user);
batchCount++;
// 每 1000 条处理一次(如批量入库、写文件)
if (batchCount % 1000 == 0) {
processBatch(batch);
batch.clear();
sqlSession.commit();
}
}
private void processBatch(List<User> batch) {
// 例如:批量插入、导出文件、发送消息等
}
});
sqlSession.commit();
}
log.info("用户管理--------selectLis:【{}】", JsonUtils.toJsonString(users));
return ResponseBean.buildSuccess(users);
}
}
四、工具类
package com.healerjean.proj.utils.db;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* MybatisPlusQueryUtils
*
* @author zhangyujin
* @date 2023/6/25$ 12:05$
*/
public final class MybatisBatchUtils {
/**
* queryAll
*
* @param function function
* @param queryWrapper queryWrapper
* @param pageSize pageSize
* @return {@link List<R>}
*/
public static <Q, R> List<R> queryAllByLimit(BiFunction<Page<R>, Q, IPage<R>> function,
Q queryWrapper,
long pageSize) {
IPage<R> initPage = function.apply(new Page<>(1, 0, true), queryWrapper).setSize(pageSize);
List<R> result = new ArrayList<>();
for (int i = 1; i <= initPage.getPages(); i++) {
IPage<R> page = function.apply(new Page<>(i, pageSize, false), queryWrapper);
if (CollectionUtils.isEmpty(page.getRecords())) {
break;
}
result.addAll(page.getRecords());
}
return result;
}
/**
* queryAll
*
* @param function function
* @param query query
* @param minMax minMax
* @return {@link List<R>}
*/
public static <Q, R> List<R> queryAllByIdSize(BiFunction<IdQueryBO, Q, List<R>> function,
Q query,
IdQueryBO minMax) {
List<R> result = Lists.newArrayList();
Long minId = minMax.getMinId();
Long size = minMax.getSize();
while (minId != null) {
IdQueryBO idQueryBO = new IdQueryBO(minId, size);
List<R> dbList = function.apply(idQueryBO, query);
if (CollectionUtils.isEmpty(dbList)) {
break;
}
minId = idQueryBO.getMaxId();
result.addAll(dbList);
}
return result;
}
/**
* queryAll
*
* @param function function
* @param query query
* @param minMax minMax
* @return {@link List<R>}
*/
public static <Q, R> List<R> queryAllByIdSub(BiFunction<IdQueryBO, Q, List<R>> function,
Q query,
IdQueryBO minMax) {
List<R> result = Lists.newArrayList();
Long minId = minMax.getMinId();
Long maxId = minMax.getMaxId();
Long size = minMax.getSize();
for (long i = minId; i <= maxId; i = i + size) {
long endId = Math.min(i + size, maxId);
boolean maxEqualFlag = endId == maxId;
IdQueryBO idQueryBO = new IdQueryBO(true, i, maxEqualFlag, endId, size);
List<R> dbList = function.apply(idQueryBO, query);
if (CollectionUtils.isEmpty(dbList)) {
break;
}
result.addAll(dbList);
if (maxEqualFlag) {
break;
}
}
return result;
}
/**
* queryAll
*
* @param executorService 线程池
* @param function 分页函数
* @param queryWrapper 查询条件
* @param pageSize pageSize 分页大小
* @param coverFunction coverFunction 对象转化
* @return {@link List< Future< List<T>>>}
*/
public static <Q, R, T> List<Future<List<T>>> queryAllByPoolLimit(CompletionService<List<T>> executorService,
BiFunction<Page<R>, Q, IPage<R>> function,
Q queryWrapper,
int pageSize,
Function<List<R>, List<T>> coverFunction) {
IPage<R> initPage = function.apply(new Page<>(1, 0, true), queryWrapper).setSize(pageSize);
List<Future<List<T>>> result = new ArrayList<>();
for (int i = 1; i <= initPage.getPages(); i++) {
int finalI = i;
Future<List<T>> future = executorService.submit(() -> {
IPage<R> page = function.apply(new Page<>(finalI, pageSize, false), queryWrapper);
return coverFunction.apply(page.getRecords());
});
result.add(future);
}
return result;
}
/**
* queryAll
*
* @param executorService 线程池
* @param function 分页函数
* @param query 查询条件
* @param minMax minMax 最小Id和最大Id
* @param coverFunction coverFunction 对象转化
*/
public static <Q, R, T> List<Future<List<T>>> queryAllByPoolIdSub(CompletionService<List<T>> executorService,
BiFunction<IdQueryBO, Q, List<R>> function,
Q query,
IdQueryBO minMax,
Function<List<R>, List<T>> coverFunction) {
Long minId = minMax.getMinId();
Long maxId = minMax.getMaxId();
Long size = minMax.getSize();
List<Future<List<T>>> result = new ArrayList<>();
for (long i = minId; i <= maxId; i = i + size) {
long endId = Math.min(i + size, maxId);
boolean maxEqualFlag = endId == maxId;
IdQueryBO idQueryBO = new IdQueryBO(true, i, maxEqualFlag, endId, size);
Future<List<T>> future = executorService.submit(() -> {
List<R> list = function.apply(idQueryBO, query);
return coverFunction.apply(list);
});
result.add(future);
if (maxEqualFlag) {
break;
}
}
return result;
}
}
1、线程池 Id 分段查询
1)线程池 Id 分段工具
package com.healerjean.proj.utils.db;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* MybatisPlusQueryUtils
*
* @author zhangyujin
* @date 2023/6/25$ 12:05$
*/
public final class MybatisBatchUtils {
/**
* queryAll
*
* @param executorService 线程池
* @param function 分页函数
* @param query 查询条件
* @param minMax minMax 最小Id和最大Id
* @param coverFunction coverFunction 对象转化
*/
public static <Q, R, T> List<Future<List<T>>> queryAllByPoolIdSub(CompletionService<List<T>> executorService,
BiFunction<IdQueryBO, Q, List<R>> function,
Q query,
IdQueryBO minMax,
Function<List<R>, List<T>> coverFunction) {
Long minId = minMax.getMinId();
Long maxId = minMax.getMaxId();
Long size = minMax.getSize();
List<Future<List<T>>> result = new ArrayList<>();
for (long i = minId; i <= maxId; i = i + size) {
long endId = Math.min(i + size, maxId);
boolean maxEqualFlag = endId == maxId;
IdQueryBO idQueryBO = new IdQueryBO(true, i, maxEqualFlag, endId, size);
Future<List<T>> future = executorService.submit(() -> {
List<R> list = function.apply(idQueryBO, query);
return coverFunction.apply(list);
});
result.add(future);
if (maxEqualFlag) {
break;
}
}
return result;
}
}
2)Manager 层
/**
* 根据查询条件获取最大Id和最小Id
*
* @param query query
* @return ImmutablePair<Long, Long>
*/
@Override
public ImmutablePair<Long, Long> queryMinAndMaxId(UserDemoQueryBO query) {
QueryWrapper<UserDemo> queryWrapper = new QueryWrapper<>();
queryWrapper.select("min(id) as \"minId\"", "max(id) as \"maxId\"");
LambdaQueryWrapper<UserDemo> lambdaQueryWrapper = queryWrapper.lambda()
.eq(Objects.nonNull(query.getId()), UserDemo::getId, query.getId())
.eq(Objects.nonNull(query.getValidFlag()), UserDemo::getValidFlag, query.getValidFlag())
.like(StringUtils.isNotBlank(query.getName()), UserDemo::getName, query.getName())
.like(StringUtils.isNotBlank(query.getPhone()), UserDemo::getPhone, query.getPhone())
.like(StringUtils.isNotBlank(query.getEmail()), UserDemo::getEmail, query.getEmail())
.like(StringUtils.isNotBlank(query.getLikeName()), UserDemo::getName, query.getLikeName())
.like(StringUtils.isNotBlank(query.getLikePhone()), UserDemo::getPhone, query.getLikePhone())
.le(Objects.nonNull(query.getQueryTime()), UserDemo::getStartTime, query.getQueryTime())
.ge(Objects.nonNull(query.getQueryTime()), UserDemo::getEndTime, query.getQueryTime());
Map<String, Object> map = userDemoDao.getMap(lambdaQueryWrapper);
return ImmutablePair.of(MapUtils.getLong(map, "minId"), MapUtils.getLong(map, "maxId"));
}
/**
* 根据id区间查询数据
*
* @param idQueryBO idQueryBO
* @return {@link List< UserDemo>}
*/
@Override
public List<UserDemo> queryUserDemoByIdSub(IdQueryBO idQueryBO, UserDemoQueryBO query) {
QueryWrapper<UserDemo> queryWrapper = new QueryWrapper<>();
if (!CollectionUtils.isEmpty(query.getSelectFields())) {
queryWrapper.select(StringUtils.join(query.getSelectFields(), ","));
}
LambdaQueryWrapper<UserDemo> lambdaQueryWrapper = queryWrapper.lambda()
.eq(Objects.nonNull(query.getId()), UserDemo::getId, query.getId())
.eq(Objects.nonNull(query.getValidFlag()), UserDemo::getValidFlag, query.getValidFlag())
.eq(StringUtils.isNotBlank(query.getName()), UserDemo::getName, query.getName())
.eq(StringUtils.isNotBlank(query.getPhone()), UserDemo::getPhone, query.getPhone())
.eq(StringUtils.isNotBlank(query.getEmail()), UserDemo::getEmail, query.getEmail())
.like(StringUtils.isNotBlank(query.getLikeName()), UserDemo::getName, query.getLikeName())
.like(StringUtils.isNotBlank(query.getLikePhone()), UserDemo::getPhone, query.getLikePhone())
.lt(Objects.nonNull(query.getQueryTime()), UserDemo::getStartTime, query.getQueryTime())
.gt(Objects.nonNull(query.getQueryTime()), UserDemo::getEndTime, query.getQueryTime());
if (idQueryBO.getMinEqualFlag()) {
lambdaQueryWrapper.ge(UserDemo::getId, idQueryBO.getMinId());
} else {
lambdaQueryWrapper.gt(UserDemo::getId, idQueryBO.getMinId());
}
if (idQueryBO.getMaxEqualFlag()) {
lambdaQueryWrapper.le(UserDemo::getId, idQueryBO.getMaxId());
} else {
lambdaQueryWrapper.lt(UserDemo::getId, idQueryBO.getMaxId());
}
return userDemoDao.list(lambdaQueryWrapper);
}
3)BO 对象 IdQueryBO
package com.healerjean.proj.utils.db;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* @author zhangyujin
* @date 2023/7/5$ 15:45$
*/
@Accessors(chain = true)
@Data
public class IdQueryBO implements Serializable {
/**
* serialVersionUID
*/
private static final long serialVersionUID = 3145985803712258405L;
/**
* 是否等于最小
*/
private Boolean minEqualFlag;
/**
* 最小Id
*/
private Long minId;
/**
* 是否等于最大
*/
private Boolean maxEqualFlag;
/**
* 最大Id
*/
private Long maxId;
/**
* 每次查询多少个
*/
private Long size;
public IdQueryBO(Long minId, Long maxId, Long size) {
this.minId = minId;
this.maxId = maxId;
this.size = size;
}
public IdQueryBO(Boolean minEqualFlag, Long minId, Boolean maxEqualFlag, Long maxId, Long size) {
this.minEqualFlag = minEqualFlag;
this.minId = minId;
this.maxEqualFlag = maxEqualFlag;
this.maxId = maxId;
this.size = size;
}
}
4)Service
/**
* 根据
*
* @param completionService completionService
* @param query queryBO
* @return List<Future < List < UserDemoExcel>>>
*/
@Override
public List<Future<List<UserDemoExcel>>> queryAllUserDemoByPoolIdSub(CompletionService<List<UserDemoExcel>> completionService, UserDemoQueryBO query) {
ImmutablePair<Long, Long> minAndMaxId = userDemoManager.queryMinAndMaxId(query);
Long minId = minAndMaxId.getLeft();
Long maxId = minAndMaxId.getRight();
IdQueryBO idQueryBO = new IdQueryBO(minId, maxId, 2L);
return MybatisBatchUtils.queryAllByPoolIdSub(completionService,
(p, q) -> userDemoManager.queryUserDemoByIdSub(p, q),
query,
idQueryBO,
UserConverter.INSTANCE::covertUserDemoPoToExcelList);
}
5)单测
@DisplayName("testQueryMinAndMaxId")
@Test
public void testQueryMinAndMaxId() {
CompletionService<List<UserDemoExcel>> completionService = new ExecutorCompletionService(ThreadPoolUtils.DEFAULT_THREAD_POOL_TASK_EXECUTOR);
UserDemoQueryBO queryBO = new UserDemoQueryBO();
List<Future<List<UserDemoExcel>>> futures = userDemoService.queryAllUserDemoByPoolIdSub(completionService,
queryBO);
List<UserDemoExcel> all = Lists.newArrayList();
for (int i = 0; i < futures.size(); i++) {
try {
Future<List<UserDemoExcel>> future = completionService.take();
List<UserDemoExcel> userDemos = future.get();
if (CollectionUtils.isEmpty(userDemos)) {
continue;
}
all.addAll(future.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
log.info("[testQueryMinAndMaxId] res:{}", JsonUtils.toString(all));
}
2、线程池 limit 分页查询
1)线程池分页读工具
package com.healerjean.proj.utils.db;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* MybatisPlusQueryUtils
*
* @author zhangyujin
* @date 2023/6/25$ 12:05$
*/
public final class MybatisBatchUtils {
/**
* queryAll
*
* @param executorService 线程池
* @param function 分页函数
* @param queryWrapper 查询条件
* @param pageSize pageSize 分页大小
* @param coverFunction coverFunction 对象转化
* @return {@link List< Future< List<T>>>}
*/
public static <Q, R, T> List<Future<List<T>>> queryAllByPoolLimit(CompletionService<List<T>> executorService,
BiFunction<Page<R>, Q, IPage<R>> function,
Q queryWrapper,
int pageSize,
Function<List<R>, List<T>> coverFunction) {
IPage<R> initPage = function.apply(new Page<>(1, 0, true), queryWrapper).setSize(pageSize);
List<Future<List<T>>> result = new ArrayList<>();
for (int i = 1; i <= initPage.getPages(); i++) {
int finalI = i;
Future<List<T>> future = executorService.submit(() -> {
IPage<R> page = function.apply(new Page<>(finalI, pageSize, false), queryWrapper);
return coverFunction.apply(page.getRecords());
});
result.add(future);
}
return result;
}
}
2)Manager 层
/**
* 获取查询条件
*
* @param query query
* @return LambdaQueryWrapper<UserDemo>
*/
@Override
public LambdaQueryWrapper<UserDemo> queryBuilderQueryWrapper(UserDemoQueryBO query) {
QueryWrapper<UserDemo> queryWrapper = new QueryWrapper<>();
if (!CollectionUtils.isEmpty(query.getSelectFields())) {
queryWrapper.select(StringUtils.join(query.getSelectFields(), ","));
}
if (!CollectionUtils.isEmpty(query.getOrderByList())) {
query.getOrderByList().forEach(item -> queryWrapper.orderBy(Boolean.TRUE, item.getDirection(),
item.getProperty()));
}
return queryWrapper.lambda()
.eq(Objects.nonNull(query.getId()), UserDemo::getId, query.getId())
.eq(Objects.nonNull(query.getValidFlag()), UserDemo::getValidFlag, query.getValidFlag())
.eq(StringUtils.isNotBlank(query.getName()), UserDemo::getName, query.getName())
.eq(StringUtils.isNotBlank(query.getPhone()), UserDemo::getPhone, query.getPhone())
.eq(StringUtils.isNotBlank(query.getEmail()), UserDemo::getEmail, query.getEmail())
.like(StringUtils.isNotBlank(query.getLikeName()), UserDemo::getName, query.getLikeName())
.like(StringUtils.isNotBlank(query.getLikePhone()), UserDemo::getPhone, query.getLikePhone())
.lt(Objects.nonNull(query.getQueryTime()), UserDemo::getStartTime, query.getQueryTime())
.gt(Objects.nonNull(query.getQueryTime()), UserDemo::getEndTime, query.getQueryTime());
}
3)Service
/**
* queryFutureAll
*
* @param completionService completionService
* @param query query
* @return List<Future < List < UserDemoExcel>>>
*/
@Override
public List<Future<List<UserDemoExcel>>> queryAllUserDemoByPoolLimit(CompletionService<List<UserDemoExcel>> completionService, UserDemoQueryBO query) {
QueryWrapper<UserDemo> queryWrapper = userDemoManager.queryBuilderQueryWrapper(query);
return MybatisBatchUtils.queryAllByPoolLimit(completionService,
(p, q) -> userDemoDao.page(p, q),
queryWrapper,
1,
UserConverter.INSTANCE::covertUserDemoPoToExcelList);
}
4)单测
@DisplayName("testQueryUserDemoByLimit")
@Test
public void testQueryUserDemoByLimit() {
CompletionService<List<UserDemoExcel>> completionService = new ExecutorCompletionService(ThreadPoolUtils.DEFAULT_THREAD_POOL_TASK_EXECUTOR);
UserDemoQueryBO queryBO = new UserDemoQueryBO();
List<Future<List<UserDemoExcel>>> futures = userDemoService.queryAllUserDemoByPoolLimit(completionService,
queryBO);
List<UserDemoExcel> all = Lists.newArrayList();
for (int i = 0; i < futures.size(); i++) {
try {
Future<List<UserDemoExcel>> future = completionService.take();
List<UserDemoExcel> userDemos = future.get();
if (CollectionUtils.isEmpty(userDemos)) {
continue;
}
all.addAll(future.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
log.info("[testQueryUserDemoByLimit] res:{}", JsonUtils.toString(all));
}


