批量插入MySQL
# 一. 概述
日常的业务需求中,经常遇到批量插入数据库的场景。很多时候,开发人员会使用循环单条插入的方式,数据量小时(<500)还好,当数据量大时,这样处理会极大的增加数据库的压力,严重时导致数据库连接不可用,进而导致系统宕机。下面就批量插入数据库,给出了几种解决方案,并对测试结果进行了分析,请大家仔细阅读,根据实际需求,选择适配自己业务场景的方案。
# 二. 测试代码
先创建一个Springboot的工程,添加mysql、mybatis、hutool依赖
单元测试代码
@Slf4j
public class PersonMapperTests extends SpringbootMultiInsertApplicationTests {
@Autowired
private PersonMapper personMapper;
// 百万数据,单次批量10000
private static int dataSize = 1000000; // 数据集数量
private static int batchSize = 10000; // 单次批处理数量
// 千万数据,单次批量100000
// private static int dataSize = 10000000; // 数据集数量
// private static int batchSize = 100000; // 单次批处理数量
private static List<PersonBO> list = new LinkedList<>();
@BeforeEach
public void init() {
for (int i = 0; i < dataSize; i++) {
PersonBO po = new PersonBO();
po.setName("测试数据【" + i + "】");
po.setAddress("测试地址【" + i + "】");
po.setAge(30);
po.setUpdateBy("系统批量插入");
list.add(po);
}
log.info("*******************************数据初始化完成*******************************");
}
/**
* 循环单条插入,数据量小于500时,可以使用但不推荐
*/
@Test
public void circulationInsert() {
if(dataSize > 500) {
log.info("MyBatis循环导入,数据不能超过500条!");
return;
}
StopWatch stopWatch = new StopWatch("MyBatis循环导入【" + dataSize + "】条数据");
stopWatch.start();
for (PersonBO person : list) {
personMapper.insertPerson(person);
}
list = null;
stopWatch.stop();
Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
}
/**
* MyBatis批量插入
*/
@Test
public void batchInsert() {
StopWatch stopWatch = new StopWatch("MyBatis批量导入【" + dataSize + "】条数据");
stopWatch.start();
if(dataSize < batchSize) {
personMapper.insertPersons(list);
list = null;
stopWatch.stop();
Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
return;
}
int count = (dataSize + batchSize - 1) / batchSize; //计算需要分多少批
for (int i = 0; i < count; i++) {
int fromIndex = i * batchSize; //计算每批的起始索引
int toIndex = Math.min(fromIndex + batchSize, dataSize); //计算每批的结束索引,注意不要越界
List<PersonBO> groupList = list.subList(fromIndex, toIndex); //获取子列表
personMapper.insertPersons(groupList);
groupList = null;
}
list = null;
stopWatch.stop();
Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
}
/**
* 多线程(Future)批量插入
*/
@Test
public void batchInsert1() {
StopWatch stopWatch = new StopWatch("多线程1批量导入【" + dataSize + "】条数据");
stopWatch.start();
if(dataSize < batchSize) {
personMapper.insertPersons(list);
list = null;
stopWatch.stop();
Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
return;
}
int nThreads = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
List<Future<Integer>> futures = new ArrayList<Future<Integer>>(nThreads);
int count = (dataSize + batchSize - 1) / batchSize; //计算需要分多少批
for (int i = 0; i < count; i++) {
int fromIndex = i * batchSize; //计算每批的起始索引
int toIndex = Math.min(fromIndex + batchSize, dataSize); //计算每批的结束索引,注意不要越界
final List<PersonBO> groupList = list.subList(fromIndex, toIndex); //获取子列表
Callable<Integer> task = () -> {
personMapper.insertPersons(groupList);
return 1;
};
futures.add(executorService.submit(task));
}
executorService.shutdown();
if (!CollectionUtils.isEmpty(futures)) {
futures.forEach(f -> {
for(;;) {
try {
if(f.isDone()) {
f.get();
break;
}
TimeUnit.MILLISECONDS.sleep(2L);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
});
}
list = null;
stopWatch.stop();
Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
}
/**
* 多线程(CountDownLatch)批量插入
*/
@Test
public void batchInsert2() {
StopWatch stopWatch = new StopWatch("多线程2批量导入【" + dataSize + "】条数据");
stopWatch.start();
if(dataSize < batchSize) {
personMapper.insertPersons(list);
list = null;
stopWatch.stop();
Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
return;
}
int nThreads = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
int count = (dataSize + batchSize - 1) / batchSize; //计算需要分多少批
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
int fromIndex = i * batchSize; //计算每批的起始索引
int toIndex = Math.min(fromIndex + batchSize, dataSize); //计算每批的结束索引,注意不要越界
final List<PersonBO> groupList = list.subList(fromIndex, toIndex); //获取子列表
executorService.submit(() -> {
try {
personMapper.insertPersons(groupList);
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
list = null;
stopWatch.stop();
Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
}
/**
* 多线程(CompletableFuture)批量插入
*/
@Test
public void batchInsert3() {
StopWatch stopWatch = new StopWatch("多线程3批量导入【" + dataSize + "】条数据");
stopWatch.start();
if(dataSize < batchSize) {
personMapper.insertPersons(list);
list = null;
stopWatch.stop();
Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
return;
}
List<CompletableFuture<Integer>> futures = Lists.newArrayList();
int nThreads = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
int count = (dataSize + batchSize - 1) / batchSize; //计算需要分多少批
for (int i = 0; i < count; i++) {
int fromIndex = i * batchSize; //计算每批的起始索引
int toIndex = Math.min(fromIndex + batchSize, dataSize); //计算每批的结束索引,注意不要越界
List<PersonBO> groupList = list.subList(fromIndex, toIndex); //获取子列表
futures.add(CompletableFuture.supplyAsync(() -> groupList).thenApplyAsync(s -> personMapper.insertPersons(s), executorService));
}
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
// 调用join方法等待完成
allOf.join();
list = null;
stopWatch.stop();
Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
}
}
# 三. 测试结果
测试环境
- 操作系统:MacOS 13.2.1
- CPU: 2.3 GHz 八核Intel Core i9
- 内存:32 GB 2667 MHz DDR4
- 硬盘:APPLE SSD AP1024N
测试结果
- 5000条数据
StopWatch 'MyBatis批量导入【5000】条数据': running time = 1386 ms
StopWatch '多线程1批量导入【5000】条数据': running time = 1210 ms 单次2000
StopWatch '多线程2批量导入【5000】条数据': running time = 1247 ms 单次2000
StopWatch '多线程3批量导入【5000】条数据': running time = 1233 ms 单次2000
- 1万条数据
StopWatch 'MyBatis批量导入【10000】条数据': running time = 1493 ms
StopWatch '多线程1批量导入【10000】条数据': running time = 1572 ms
StopWatch '多线程2批量导入【10000】条数据': running time = 1534 ms
StopWatch '多线程3批量导入【10000】条数据': running time = 1566 ms
- 10万条数据
StopWatch 'MyBatis批量导入【100000】条数据': running time = 4347 ms
StopWatch '多线程1批量导入【100000】条数据': running time = 2717 ms
StopWatch '多线程2批量导入【100000】条数据': running time = 2888 ms
StopWatch '多线程3批量导入【100000】条数据': running time = 2950 ms
- 100万条数据
StopWatch 'MyBatis批量导入【1000000】条数据': running time = 35 s
StopWatch '多线程1批量导入【1000000】条数据': running time = 12 s
StopWatch '多线程2批量导入【1000000】条数据': running time = 10 s
StopWatch '多线程3批量导入【1000000】条数据': running time = 10 s
- 1000万条数据
StopWatch 'MyBatis批量导入【10000000】条数据': running time = 340 s
StopWatch '多线程1批量导入【10000000】条数据': running time = 183 s
StopWatch '多线程2批量导入【10000000】条数据': running time = 176 s
StopWatch '多线程3批量导入【10000000】条数据': running time = 176 s
# 五. 结果分析
- 1万条数据以下,使用
mybatis批量插入的方式 - 1万条 ~ 1000万条数据,采用多线程 +
mybatis批量插入的方式 - 亿万级别,采用多线程 + JDBC批处理和事务混合的方式
说明:
数据库服务器的配置和性能:如果数据库服务器的配置较低,或者存在性能瓶颈,那么插入数据的速度就会受到限制。
数据库表结构和索引设计:如果表结构和索引设计不合理,也会影响数据插入的速度。
数据的大小和类型:如果每条数据的大小比较大,或者包含较多的复杂数据类型,也会降低数据插入的速度。
当采用多线程时,线程不是越多越好,当线程过多时,CPU 切换上下文的时间会增加,从而导致系统的性能下降。建议线程池数不要超过系统线程数。
# 六. 更多方案
当数据级为千万级别及以上时,就不推荐在使用mybatis批量插入的方式了,可以采用JDBC批处理和事务混合。
判断:https://blog.csdn.net/qq_40093255/article/details/124603488?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-124603488-blog-103546087.235^v28^pc_relevant_t0_download&spm=1001.2101.3001.4242.1&utm_relevant_index=3