睿阳知识库 睿阳知识库
首页
  • npm库配置
  • PC端

    • npm常用命令
    • vue问题记录
    • SEO基础知识及优化
    • 禁止别人调试我的前端页面代码
  • 移动端

    • 小程序
    • Risun.js使用说明
  • Java

    • Maven库配置
    • RSP开发框架
    • RSP框架插件
  • .NET

    • Nuget库配置
  • Python

    • Pypi库配置
  • 常见问题

    • Word转Pdf字体错乱
    • 使用Jacob进行Word导出PDF
  • 自动部署
  • 前端带路径
  • Linux

    • 应用部署
  • Windows

    • 应用部署
  • 视频监控
  • MySQL系列~
  • 应用高可用
  • 静态代码扫描
  • OpenSSH版本升级
  • 区块链~
  • 软件过程文档目录
  • 著作权申请须知及申报示例
  • 项目申报技巧
  • 项目竣工资料清单
  • 科技项目申报流程及注意事项
  • 初级职称申报
  • 产品需求文档基础知识
  • 产品经理需了解的技术知识
  • 墨刀原型设计指南
  • 文档规范
  • 文档规范
  • 投标工作总结(一)
  • 科技项目申报心得体会
  • 数字孪生
  • AI
  • RSP基础平台
  • RMCloud
  • 区块链
  • 网络态势感知
  • 国产化
  • 数据湖
  • 贡献度
  • 文档编写说明
  • Markdown教程
工作链接
首页
  • npm库配置
  • PC端

    • npm常用命令
    • vue问题记录
    • SEO基础知识及优化
    • 禁止别人调试我的前端页面代码
  • 移动端

    • 小程序
    • Risun.js使用说明
  • Java

    • Maven库配置
    • RSP开发框架
    • RSP框架插件
  • .NET

    • Nuget库配置
  • Python

    • Pypi库配置
  • 常见问题

    • Word转Pdf字体错乱
    • 使用Jacob进行Word导出PDF
  • 自动部署
  • 前端带路径
  • Linux

    • 应用部署
  • Windows

    • 应用部署
  • 视频监控
  • MySQL系列~
  • 应用高可用
  • 静态代码扫描
  • OpenSSH版本升级
  • 区块链~
  • 软件过程文档目录
  • 著作权申请须知及申报示例
  • 项目申报技巧
  • 项目竣工资料清单
  • 科技项目申报流程及注意事项
  • 初级职称申报
  • 产品需求文档基础知识
  • 产品经理需了解的技术知识
  • 墨刀原型设计指南
  • 文档规范
  • 文档规范
  • 投标工作总结(一)
  • 科技项目申报心得体会
  • 数字孪生
  • AI
  • RSP基础平台
  • RMCloud
  • 区块链
  • 网络态势感知
  • 国产化
  • 数据湖
  • 贡献度
  • 文档编写说明
  • Markdown教程
工作链接
  • 视频监控
  • MySQL系列

    • 批量插入MySQL
      • 一. 概述
      • 二. 测试代码
      • 三. 测试结果
      • 五. 结果分析
      • 六. 更多方案
    • 分库分表
    • MySQL5.7 主从复制、热备
    • MySQL5升级到8
    • MySQL5.7配置调优
  • 应用高可用
  • 静态代码扫描
  • OpenSSH升级
  • 区块链

  • 技术
  • MySQL系列
孙超
2023-03-20
目录

批量插入MySQL

# 一. 概述

日常的业务需求中,经常遇到批量插入数据库的场景。很多时候,开发人员会使用循环单条插入的方式,数据量小时(<500)还好,当数据量大时,这样处理会极大的增加数据库的压力,严重时导致数据库连接不可用,进而导致系统宕机。下面就批量插入数据库,给出了几种解决方案,并对测试结果进行了分析,请大家仔细阅读,根据实际需求,选择适配自己业务场景的方案。

# 二. 测试代码

先创建一个Springboot的工程,添加mysql、mybatis、hutool依赖

单元测试代码
@Slf4j
public class PersonMapperTests extends SpringbootMultiInsertApplicationTests {

	@Autowired
	private PersonMapper personMapper;
	
	// 百万数据,单次批量10000
	private static int dataSize = 1000000;		// 数据集数量
	private static int batchSize = 10000;	    // 单次批处理数量
	
	// 千万数据,单次批量100000
//	private static int dataSize = 10000000;		// 数据集数量
//	private static int batchSize = 100000;	    // 单次批处理数量
	
	
	private static List<PersonBO> list = new LinkedList<>();

	@BeforeEach
	public void init() {
		for (int i = 0; i < dataSize; i++) {
			PersonBO po = new PersonBO();
			po.setName("测试数据【" + i + "】");
			po.setAddress("测试地址【" + i + "】");
			po.setAge(30);
			po.setUpdateBy("系统批量插入");
			list.add(po);
		}
		log.info("*******************************数据初始化完成*******************************");
	}
	
  /**
	 * 循环单条插入,数据量小于500时,可以使用但不推荐
	 */
	@Test
	public void circulationInsert() {
		if(dataSize > 500) {
			log.info("MyBatis循环导入,数据不能超过500条!");
			return;
		}
		StopWatch stopWatch = new StopWatch("MyBatis循环导入【" + dataSize + "】条数据");
		stopWatch.start();
		for (PersonBO person : list) {
			personMapper.insertPerson(person);
		}
		list = null;
		stopWatch.stop();
		Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
	}
	
  /**
	 * MyBatis批量插入
	 */
	@Test
	public void batchInsert() {
		StopWatch stopWatch = new StopWatch("MyBatis批量导入【" + dataSize + "】条数据");
		stopWatch.start();
		if(dataSize < batchSize) {
			personMapper.insertPersons(list);
			list = null;
			stopWatch.stop();
			Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
			return;
		} 
		int count = (dataSize + batchSize - 1) / batchSize; //计算需要分多少批
		for (int i = 0; i < count; i++) {
			int fromIndex = i * batchSize; //计算每批的起始索引
		    int toIndex = Math.min(fromIndex + batchSize, dataSize); //计算每批的结束索引,注意不要越界
		    List<PersonBO> groupList = list.subList(fromIndex, toIndex); //获取子列表
			personMapper.insertPersons(groupList);
			groupList = null;
		}
		list = null;
		stopWatch.stop();
		Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
	}
	
  /**
	 * 多线程(Future)批量插入
	 */
	@Test
	public void batchInsert1() {
		StopWatch stopWatch = new StopWatch("多线程1批量导入【" + dataSize + "】条数据");
		stopWatch.start();
		
		if(dataSize < batchSize) {
			personMapper.insertPersons(list);
			list = null;
			stopWatch.stop();
			Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
			return;
		} 
		
		int nThreads = Runtime.getRuntime().availableProcessors();
		
		ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
		List<Future<Integer>> futures = new ArrayList<Future<Integer>>(nThreads);
		
		int count = (dataSize + batchSize - 1) / batchSize; //计算需要分多少批
		for (int i = 0; i < count; i++) {
			int fromIndex = i * batchSize; //计算每批的起始索引
		    int toIndex = Math.min(fromIndex + batchSize, dataSize); //计算每批的结束索引,注意不要越界
		    final List<PersonBO> groupList = list.subList(fromIndex, toIndex); //获取子列表
			Callable<Integer> task = () -> {
				personMapper.insertPersons(groupList);
				return 1;
			};
			futures.add(executorService.submit(task));
		}
		executorService.shutdown();
		if (!CollectionUtils.isEmpty(futures)) {  
    		futures.forEach(f -> {
    			for(;;) { 
    				try {
	    				if(f.isDone()) {
	    					f.get();
	    					break; 
	    				}
	    				TimeUnit.MILLISECONDS.sleep(2L);
    				} catch (InterruptedException | ExecutionException e) {
						e.printStackTrace();
					}
    			}
    		});
	    }
		list = null;
		stopWatch.stop();
		Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
	}
	
  /**
	 * 多线程(CountDownLatch)批量插入
	 */
	@Test
	public void batchInsert2() {
		StopWatch stopWatch = new StopWatch("多线程2批量导入【" + dataSize + "】条数据");
		stopWatch.start();
		if(dataSize < batchSize) {
			personMapper.insertPersons(list);
			list = null;
			stopWatch.stop();
			Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
			return;
		} 
		
		int nThreads = Runtime.getRuntime().availableProcessors();
		ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
		int count = (dataSize + batchSize - 1) / batchSize; //计算需要分多少批
		CountDownLatch latch = new CountDownLatch(count);
		for (int i = 0; i < count; i++) {
			int fromIndex = i * batchSize; //计算每批的起始索引
		    int toIndex = Math.min(fromIndex + batchSize, dataSize); //计算每批的结束索引,注意不要越界
		    final List<PersonBO> groupList = list.subList(fromIndex, toIndex); //获取子列表
			executorService.submit(() -> {
				try {
					personMapper.insertPersons(groupList);
				} finally {
					latch.countDown();
				}
			});
		}
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			executorService.shutdown();
		}
		list = null;
		stopWatch.stop();
		Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
	}
	
  /**
	 * 多线程(CompletableFuture)批量插入
	 */
	@Test
	public void batchInsert3() {
		StopWatch stopWatch = new StopWatch("多线程3批量导入【" + dataSize + "】条数据");
		stopWatch.start();
		if(dataSize < batchSize) {
			personMapper.insertPersons(list);
			list = null;
			stopWatch.stop();
			Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
			return;
		} 
		
		List<CompletableFuture<Integer>> futures = Lists.newArrayList();
		int nThreads = Runtime.getRuntime().availableProcessors();
		ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
		int count = (dataSize + batchSize - 1) / batchSize; //计算需要分多少批
		for (int i = 0; i < count; i++) {
			int fromIndex = i * batchSize; //计算每批的起始索引
		    int toIndex = Math.min(fromIndex + batchSize, dataSize); //计算每批的结束索引,注意不要越界
		    List<PersonBO> groupList = list.subList(fromIndex, toIndex); //获取子列表
			futures.add(CompletableFuture.supplyAsync(() -> groupList).thenApplyAsync(s -> personMapper.insertPersons(s), executorService));
		}
		CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
		// 调用join方法等待完成
		allOf.join();
		list = null;
		stopWatch.stop();
		Console.log(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
	}
}

# 三. 测试结果

  • 测试环境

    • 操作系统:MacOS 13.2.1
    • CPU: 2.3 GHz 八核Intel Core i9
    • 内存:32 GB 2667 MHz DDR4
    • 硬盘:APPLE SSD AP1024N
  • 测试结果

  1. 5000条数据
StopWatch 'MyBatis批量导入【5000】条数据': running time = 1386 ms
StopWatch '多线程1批量导入【5000】条数据': running time = 1210 ms	单次2000
StopWatch '多线程2批量导入【5000】条数据': running time = 1247 ms	单次2000
StopWatch '多线程3批量导入【5000】条数据': running time = 1233 ms	单次2000
  1. 1万条数据
StopWatch 'MyBatis批量导入【10000】条数据': running time = 1493 ms
StopWatch '多线程1批量导入【10000】条数据': running time = 1572 ms
StopWatch '多线程2批量导入【10000】条数据': running time = 1534 ms
StopWatch '多线程3批量导入【10000】条数据': running time = 1566 ms
  1. 10万条数据
StopWatch 'MyBatis批量导入【100000】条数据': running time = 4347 ms
StopWatch '多线程1批量导入【100000】条数据': running time = 2717 ms
StopWatch '多线程2批量导入【100000】条数据': running time = 2888 ms
StopWatch '多线程3批量导入【100000】条数据': running time = 2950 ms
  1. 100万条数据
StopWatch 'MyBatis批量导入【1000000】条数据': running time = 35 s
StopWatch '多线程1批量导入【1000000】条数据': running time = 12 s
StopWatch '多线程2批量导入【1000000】条数据': running time = 10 s
StopWatch '多线程3批量导入【1000000】条数据': running time = 10 s
  1. 1000万条数据
StopWatch 'MyBatis批量导入【10000000】条数据': running time = 340 s
StopWatch '多线程1批量导入【10000000】条数据': running time = 183 s
StopWatch '多线程2批量导入【10000000】条数据': running time = 176 s
StopWatch '多线程3批量导入【10000000】条数据': running time = 176 s

# 五. 结果分析

  • 1万条数据以下,使用mybatis批量插入的方式
  • 1万条 ~ 1000万条数据,采用多线程 + mybatis批量插入的方式
  • 亿万级别,采用多线程 + JDBC批处理和事务混合的方式

说明:

  • 数据库服务器的配置和性能:如果数据库服务器的配置较低,或者存在性能瓶颈,那么插入数据的速度就会受到限制。

  • 数据库表结构和索引设计:如果表结构和索引设计不合理,也会影响数据插入的速度。

  • 数据的大小和类型:如果每条数据的大小比较大,或者包含较多的复杂数据类型,也会降低数据插入的速度。

  • 当采用多线程时,线程不是越多越好,当线程过多时,CPU 切换上下文的时间会增加,从而导致系统的性能下降。建议线程池数不要超过系统线程数。

# 六. 更多方案

当数据级为千万级别及以上时,就不推荐在使用mybatis批量插入的方式了,可以采用JDBC批处理和事务混合。

判断:https://blog.csdn.net/qq_40093255/article/details/124603488?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-124603488-blog-103546087.235^v28^pc_relevant_t0_download&spm=1001.2101.3001.4242.1&utm_relevant_index=3

视频监控
分库分表

← 视频监控 分库分表→

最近更新
01
vue问题记录
10-11
02
RSP进度
10-09
03
贡献度
09-28
更多文章>
Copyright © 2014-2025 甘肃睿阳科技有限公司 陇ICP备15001783-1号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式