文章 62
浏览 15135
百万数量级别导入导出,看我这样做

百万数量级别导入导出,看我这样做

image.png

背景

相信在业务当中,会有许多数据导入导出,一般涉及到百万数据级别数据导入导出,如果没有一个优良的性能架构,很容易产生 OOM,造成服务器崩盘,所以写下这篇文章,简单写下核心伪代码,起到抛砖引玉

功能设计

技术选型: springboot+mysql+ 线程池 +mybatisplus+aop+juc tools + easyExcel

在于数据导出涉及 IO 操作,不采用线程池的话,串行耗时较长,同时数据量较大,不对数据进行分页处理,可能会产生内存溢出。

线程池线程配置

**涉及到 io 密集和 CPU 循环,这里主要将线程数据设置为 2*n+1,这种配置是理想情况下,具体的线上业务,需要看业务,有兴趣可以看看京东写的一篇博客: **京东讲解 cpu 核心和线程数计算

static {
        CPU = Runtime.getRuntime().availableProcessors();
        N_THREAD = 2 * CPU + 1;
        EXECUTOR_SERVICE = Executors.newFixedThreadPool(N_THREAD);
    }

数据生成

  • 建表语句
CREATE TABLE `employees1` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(24) NOT NULL DEFAULT '' COMMENT '姓名',
  `age` int NOT NULL DEFAULT '0' COMMENT '年龄',
  `position` varchar(20) NOT NULL DEFAULT '' COMMENT '职位',
  `hire_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '入职时间',
  PRIMARY KEY (`id`),
  KEY `idx_name_age_position` (`name`,`age`,`position`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1800001 DEFAULT CHARSET=utf8mb3 COMMENT='员工记录表';

你可以在 MySQL 客户端直接批量随机生成 100W 数据

  • 代码提供数据生成(秒级别生成)
@LogExecutionTime
    public void generateData(Integer number) throws InterruptedException {
        //少于10W,默认给10W
        number = number < 100000 ? 100000 : number;
        //默认每个线程处理1W数据
        int group = number / 10000;
        CountDownLatch countDownLatch = new CountDownLatch(group);
        otherThreadBuildData(group, countDownLatch);
        countDownLatch.await();
    }

    private void otherThreadBuildData(int group, CountDownLatch countDownLatch) {
        for (int i = 0; i < group; i++) {
            EXECUTOR_SERVICE.execute(() -> {
                List<Employees> read = new ArrayList<>();
                for (int j = 0; j < 10000; j++) {
                    try {
                        Employees employees = Employees.create("wang" + j, j, j + "");
                        read.add(employees);
                        if (read.size() >= 2000) {
                            //读写分离思想,读线程继续读,写线程刷库
                            final List[] writers = new List[]{new ArrayList<>(read)};
                            EXECUTOR_SERVICE.execute(() -> {
                                //todo 这里事务无法生效,这里需要用代理类
                                saveBatch(writers[0], writers[0].size());
                                //方便gc
                                writers[0] = null;
                            });
                            read.clear();
                        }
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
    }

主要用到了 读写分离的思想 ,提高读写并发性能,这里写线程刷库后显示将 writers 置为 null,模仿 map 源码,方便 gc

这里注意!!! saveBatch 需要用代理类,不用通过 this 调用,会导致事务失效,我这只是模拟

countDownLatch 控制异步线程和控制主线程阻塞,等待所有线程处理完毕,唤醒主线程

电脑性能配置足够,基本上控制秒内

导出

方法 1

  • 用 concurrentHashMap 安全入队
  • 异步线程分页查询数据构建数据

** - 默认创建 20 个 sheet 页**

** - 每个异步线程处理一个 sheet 页,分页查询数据**

** 这里分页 offsite 偏移量大,分页也会有性能问题,需要用 MyBatis 流查询或者自定义 SQL,内连接索引覆盖**

** < select> id, name from user where id > #{param1} limit 0, #{param2} < /select>**

  • completeableFeture 阻塞主线程
  • 获取到 map 数据,主线程单线程写入 Excel

easyExcel 不支持多线程写入 sheet,具体可以看官方 issue:easyExcel issue

@Override
    @LogExecutionTime
    public void exportData(HttpServletRequest request, HttpServletResponse response) {
        //构建response对象
        generateExcelHead(response);

        Map<Integer, List<Employees>> map = new ConcurrentHashMap<>();

        //异步线程分页查询数据
        List<CompletableFuture<Void>> futures = otherThreadBuildData(map);

        CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();

        //主线程单次写入excel
        masterThreadExportExcel(response, map);
    }

    private List<CompletableFuture<Void>> otherThreadBuildData(Map<Integer, List<Employees>> map) {
        //统计数据库数量
        Long count = employeesMapper.selectCount();
        //默认分20个sheet页
        int page = 20;
        long size = count / page;
        List<CompletableFuture<Void>> futures = new ArrayList<>(page);
        for (int i = 0; i < page; i++) {
            int finalI = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                Page<Employees> employeesPage = new Page<>(finalI + 1L, size);
                IPage<Employees> selectedPage = employeesMapper.selectPage(employeesPage, null);
                map.put(finalI, selectedPage.getRecords());
            }, EXECUTOR_SERVICE);
            futures.add(future);
        }
        return futures;
    }

    @SneakyThrows
    private static void masterThreadExportExcel(HttpServletResponse response, Map<Integer, List<Employees>> map) {
        try (ExcelWriter excelWriter = EasyExcelFactory.write(response.getOutputStream(), Employees.class).build()) {
            //写入到excel,写入excel不能多线程写入  具体看easyexcel issue
            for (Map.Entry<Integer, List<Employees>> entry : map.entrySet()) {
                Integer sheetNo = entry.getKey();
                List<Employees> employees = entry.getValue();
                WriteSheet writeSheet = EasyExcelFactory
                        .writerSheet(sheetNo, "模板" + sheetNo)
                        .build();
                excelWriter.write(employees, writeSheet);
                logger.info("第几次写入excel {}" + sheetNo);
            }
        }
    }

导出 200W 的数据,平均基本控制在 10-15s 之内,数据也能正常导出到 Excel

方法 2

  • 利用阻塞队列,形成发布订阅模式,生产者生产消息,消费者立马消费
  • 解耦和不用等待所有 feature 执行完毕
@LogExecutionTime
    public void exportData(HttpServletRequest request, HttpServletResponse response) {
        //构建response对象
        generateExcelHead(response);

        BlockingQueue<List<Employees>> queue = new ArrayBlockingQueue<>(20);
        //异步线程分页查询数据
        otherThreadBuildData(queue);

        //主线程单次写入excel
        masterThreadExportExcel(response, queue);
    }

    private void otherThreadBuildData(BlockingQueue<List<Employees>> queue) {
        //统计数据库数量
        Long count = employeesMapper.selectCount();
        //默认分20个sheet页
        int page = 20;
        long size = count / page;
        for (int i = 0; i < page; i++) {
            int finalI = i;
            CompletableFuture.runAsync(() -> {
                Page<Employees> employeesPage = new Page<>(finalI + 1L, size);
                IPage<Employees> selectedPage = employeesMapper.selectPage(employeesPage, null);
                try {
                    queue.put(selectedPage.getRecords());
                } catch (InterruptedException e) {
                    logger.error("入队失败,失败原因" + e.getMessage());
                    //safe point
                    Thread.currentThread().interrupt();
                }
            }, EXECUTOR_SERVICE);
        }
    }

    @SneakyThrows
    private void masterThreadExportExcel(HttpServletResponse response, BlockingQueue<List<Employees>> queue) {
        try (ExcelWriter excelWriter = EasyExcelFactory.write(response.getOutputStream(), Employees.class).build()) {
            AtomicInteger atomicInteger = new AtomicInteger();
            do {
                List<Employees> employeesList = queue.take();
                int sheetNo = atomicInteger.getAndIncrement();
                if (!CollectionUtils.isEmpty(employeesList)) {
                    //写入到excel,写入excel不能多线程写入  具体看easyexcel issue
                    WriteSheet writeSheet = EasyExcelFactory
                            .writerSheet(sheetNo, "模板" + sheetNo)
                            .build();
                    excelWriter.write(employeesList, writeSheet);
                    logger.info(String.format("第%s次写入excel ",sheetNo));
                }

            } while (!queue.isEmpty());
        }
    }

导入

多线程多 sheet 导入,前提是需要 Excel 有多个 sheet

  • 获取 Excel 文件的 sheet 数量
  • 每个线程处理一个 sheet 页数据
  • 通过 easyExcel 的监听机制
  • 封装成一个一个 Callable 对象
  • 线程池统一调度
  • 监听添加数据,因为 list 是共享变量,每个线程都有自己的 list,这里用到了 Threadlocal

@LogExecutionTime
    public void importExcel(MultipartFile file) {
        //获取excel总sheet数
        int size = listSheet(file.getInputStream());
        importExcelAsync(file, size);
    }

    public void importExcelAsync(MultipartFile file, int number) {
        List<Callable<Object>> tasks = new ArrayList<>();
        for (int i = 0; i < number; i++) {
            int num = i;
            tasks.add(() -> {
                EasyExcelFactory.read(file.getInputStream(), Employees.class, indexListener)
                        .sheet(num).doRead();
                return null;
            });
        }
        try {
            EXECUTOR_SERVICE.invokeAll(tasks);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void generateExcelHead(HttpServletResponse response) throws UnsupportedEncodingException {
        response.setContentType("application/vnd.ms-excel");
        response.setCharacterEncoding("utf-8");
        String fileName = URLEncoder.encode("测试多线程sheet导出", "UTF-8") + "_" + System.currentTimeMillis();
        response.setHeader("Content-disposition", "attachment;filename=" + fileName + ".xlsx");
    }

    public int listSheet(InputStream inputStream) {
        if (inputStream == null) {
            throw new IllegalArgumentException("inputStream is null");
        }
        try (ExcelReader build = EasyExcelFactory.read(inputStream).build()) {
            return build.excelExecutor().sheetList().size();
        }
    }

导入 100W 数据大概需要 50s 左右

结尾

以上代码还是挺简单的,真实的场景肯定不是这么简单,需要有各种清洗,转换 等待 ETL,技术永远都是为业务服务的,这里只是抛砖引玉,提供一个思路,如果有更好的建议,欢迎探讨~~

详细的源代码地址: https://gitee.com/xiaohu88/export


标题:百万数量级别导入导出,看我这样做
作者:xiaohugg
地址:https://xiaohugg.top/articles/2023/11/03/1698978909487.html

人民有信仰 民族有希望 国家有力量