背景
相信在业务当中,会有许多数据导入导出,一般涉及到百万数据级别数据导入导出,如果没有一个优良的性能架构,很容易产生 OOM,造成服务器崩盘,所以写下这篇文章,简单写下核心伪代码,起到抛砖引玉
功能设计
技术选型: springboot+mysql+ 线程池 +mybatisplus+aop+juc tools + easyExcel
在于数据导出涉及 IO 操作,不采用线程池的话,串行耗时较长,同时数据量较大,不对数据进行分页处理,可能会产生内存溢出。
线程池线程配置
**涉及到 io 密集和 CPU 循环,这里主要将线程数据设置为 2*n+1,这种配置是理想情况下,具体的线上业务,需要看业务,有兴趣可以看看京东写的一篇博客: **京东讲解 cpu 核心和线程数计算
static {
CPU = Runtime.getRuntime().availableProcessors();
N_THREAD = 2 * CPU + 1;
EXECUTOR_SERVICE = Executors.newFixedThreadPool(N_THREAD);
}
数据生成
- 建表语句
CREATE TABLE `employees1` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(24) NOT NULL DEFAULT '' COMMENT '姓名',
`age` int NOT NULL DEFAULT '0' COMMENT '年龄',
`position` varchar(20) NOT NULL DEFAULT '' COMMENT '职位',
`hire_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '入职时间',
PRIMARY KEY (`id`),
KEY `idx_name_age_position` (`name`,`age`,`position`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1800001 DEFAULT CHARSET=utf8mb3 COMMENT='员工记录表';
你可以在 MySQL 客户端直接批量随机生成 100W 数据
- 代码提供数据生成(秒级别生成)
@LogExecutionTime
public void generateData(Integer number) throws InterruptedException {
//少于10W,默认给10W
number = number < 100000 ? 100000 : number;
//默认每个线程处理1W数据
int group = number / 10000;
CountDownLatch countDownLatch = new CountDownLatch(group);
otherThreadBuildData(group, countDownLatch);
countDownLatch.await();
}
private void otherThreadBuildData(int group, CountDownLatch countDownLatch) {
for (int i = 0; i < group; i++) {
EXECUTOR_SERVICE.execute(() -> {
List<Employees> read = new ArrayList<>();
for (int j = 0; j < 10000; j++) {
try {
Employees employees = Employees.create("wang" + j, j, j + "");
read.add(employees);
if (read.size() >= 2000) {
//读写分离思想,读线程继续读,写线程刷库
final List[] writers = new List[]{new ArrayList<>(read)};
EXECUTOR_SERVICE.execute(() -> {
//todo 这里事务无法生效,这里需要用代理类
saveBatch(writers[0], writers[0].size());
//方便gc
writers[0] = null;
});
read.clear();
}
} finally {
countDownLatch.countDown();
}
}
});
}
}
主要用到了 读写分离的思想 ,提高读写并发性能,这里写线程刷库后显示将 writers 置为 null,模仿 map 源码,方便 gc
这里注意!!! saveBatch 需要用代理类,不用通过 this 调用,会导致事务失效,我这只是模拟
countDownLatch 控制异步线程和控制主线程阻塞,等待所有线程处理完毕,唤醒主线程
电脑性能配置足够,基本上控制秒内
导出
方法 1
- 用 concurrentHashMap 安全入队
- 异步线程分页查询数据构建数据
** - 默认创建 20 个 sheet 页**
** - 每个异步线程处理一个 sheet 页,分页查询数据**
** 这里分页 offsite 偏移量大,分页也会有性能问题,需要用 MyBatis 流查询或者自定义 SQL,内连接索引覆盖**
** < select> id,
name
from user where id > #{param1} limit 0, #{param2} < /select>**
- completeableFeture 阻塞主线程
- 获取到 map 数据,主线程单线程写入 Excel
easyExcel 不支持多线程写入 sheet,具体可以看官方 issue:easyExcel issue
@Override
@LogExecutionTime
public void exportData(HttpServletRequest request, HttpServletResponse response) {
//构建response对象
generateExcelHead(response);
Map<Integer, List<Employees>> map = new ConcurrentHashMap<>();
//异步线程分页查询数据
List<CompletableFuture<Void>> futures = otherThreadBuildData(map);
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
//主线程单次写入excel
masterThreadExportExcel(response, map);
}
private List<CompletableFuture<Void>> otherThreadBuildData(Map<Integer, List<Employees>> map) {
//统计数据库数量
Long count = employeesMapper.selectCount();
//默认分20个sheet页
int page = 20;
long size = count / page;
List<CompletableFuture<Void>> futures = new ArrayList<>(page);
for (int i = 0; i < page; i++) {
int finalI = i;
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
Page<Employees> employeesPage = new Page<>(finalI + 1L, size);
IPage<Employees> selectedPage = employeesMapper.selectPage(employeesPage, null);
map.put(finalI, selectedPage.getRecords());
}, EXECUTOR_SERVICE);
futures.add(future);
}
return futures;
}
@SneakyThrows
private static void masterThreadExportExcel(HttpServletResponse response, Map<Integer, List<Employees>> map) {
try (ExcelWriter excelWriter = EasyExcelFactory.write(response.getOutputStream(), Employees.class).build()) {
//写入到excel,写入excel不能多线程写入 具体看easyexcel issue
for (Map.Entry<Integer, List<Employees>> entry : map.entrySet()) {
Integer sheetNo = entry.getKey();
List<Employees> employees = entry.getValue();
WriteSheet writeSheet = EasyExcelFactory
.writerSheet(sheetNo, "模板" + sheetNo)
.build();
excelWriter.write(employees, writeSheet);
logger.info("第几次写入excel {}" + sheetNo);
}
}
}
导出 200W 的数据,平均基本控制在 10-15s 之内,数据也能正常导出到 Excel
方法 2
- 利用阻塞队列,形成发布订阅模式,生产者生产消息,消费者立马消费
- 解耦和不用等待所有 feature 执行完毕
@LogExecutionTime
public void exportData(HttpServletRequest request, HttpServletResponse response) {
//构建response对象
generateExcelHead(response);
BlockingQueue<List<Employees>> queue = new ArrayBlockingQueue<>(20);
//异步线程分页查询数据
otherThreadBuildData(queue);
//主线程单次写入excel
masterThreadExportExcel(response, queue);
}
private void otherThreadBuildData(BlockingQueue<List<Employees>> queue) {
//统计数据库数量
Long count = employeesMapper.selectCount();
//默认分20个sheet页
int page = 20;
long size = count / page;
for (int i = 0; i < page; i++) {
int finalI = i;
CompletableFuture.runAsync(() -> {
Page<Employees> employeesPage = new Page<>(finalI + 1L, size);
IPage<Employees> selectedPage = employeesMapper.selectPage(employeesPage, null);
try {
queue.put(selectedPage.getRecords());
} catch (InterruptedException e) {
logger.error("入队失败,失败原因" + e.getMessage());
//safe point
Thread.currentThread().interrupt();
}
}, EXECUTOR_SERVICE);
}
}
@SneakyThrows
private void masterThreadExportExcel(HttpServletResponse response, BlockingQueue<List<Employees>> queue) {
try (ExcelWriter excelWriter = EasyExcelFactory.write(response.getOutputStream(), Employees.class).build()) {
AtomicInteger atomicInteger = new AtomicInteger();
do {
List<Employees> employeesList = queue.take();
int sheetNo = atomicInteger.getAndIncrement();
if (!CollectionUtils.isEmpty(employeesList)) {
//写入到excel,写入excel不能多线程写入 具体看easyexcel issue
WriteSheet writeSheet = EasyExcelFactory
.writerSheet(sheetNo, "模板" + sheetNo)
.build();
excelWriter.write(employeesList, writeSheet);
logger.info(String.format("第%s次写入excel ",sheetNo));
}
} while (!queue.isEmpty());
}
}
导入
多线程多 sheet 导入,前提是需要 Excel 有多个 sheet
- 获取 Excel 文件的 sheet 数量
- 每个线程处理一个 sheet 页数据
- 通过 easyExcel 的监听机制
- 封装成一个一个 Callable 对象
- 线程池统一调度
- 监听添加数据,因为 list 是共享变量,每个线程都有自己的 list,这里用到了 Threadlocal
@LogExecutionTime
public void importExcel(MultipartFile file) {
//获取excel总sheet数
int size = listSheet(file.getInputStream());
importExcelAsync(file, size);
}
public void importExcelAsync(MultipartFile file, int number) {
List<Callable<Object>> tasks = new ArrayList<>();
for (int i = 0; i < number; i++) {
int num = i;
tasks.add(() -> {
EasyExcelFactory.read(file.getInputStream(), Employees.class, indexListener)
.sheet(num).doRead();
return null;
});
}
try {
EXECUTOR_SERVICE.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void generateExcelHead(HttpServletResponse response) throws UnsupportedEncodingException {
response.setContentType("application/vnd.ms-excel");
response.setCharacterEncoding("utf-8");
String fileName = URLEncoder.encode("测试多线程sheet导出", "UTF-8") + "_" + System.currentTimeMillis();
response.setHeader("Content-disposition", "attachment;filename=" + fileName + ".xlsx");
}
public int listSheet(InputStream inputStream) {
if (inputStream == null) {
throw new IllegalArgumentException("inputStream is null");
}
try (ExcelReader build = EasyExcelFactory.read(inputStream).build()) {
return build.excelExecutor().sheetList().size();
}
}
导入 100W 数据大概需要 50s 左右
结尾
以上代码还是挺简单的,真实的场景肯定不是这么简单,需要有各种清洗,转换 等待 ETL,技术永远都是为业务服务的,这里只是抛砖引玉,提供一个思路,如果有更好的建议,欢迎探讨~~
详细的源代码地址: https://gitee.com/xiaohu88/export