文章 62
浏览 15135
我用spark导出500万数据

我用spark导出500万数据

背景

最近公司需要实现一个报表功能,需要导出近一段时间内的数据,时间段由用户在页面填写,作为 SQL 参数查询过滤,因为涉及到查询的信息详细,需要关联的表过多和字段过多,以及数据量过大,可能会有性能方面的问题

方案一: 多线程分页多 sheet+ 队列(发布订阅)

网上有很多的方案说了,用多线程分批处理,自己也写了个 demo 实现,多线程分批分页查询 + 发布订阅队列 + 分 sheet 写入,虽然功能实现了,最终测试 500 万数据,结果需要将近 8 分钟了,感觉不太满意这种方式,后面想到我们公司的服务器有大数据方面环境,想着能不能用大数据解决,最终决定用 spark

方案二:spark+cxv

经过查找资料和自己学习,基本掌握了一点 spark 的知识和整合 Java +springboot,发现 spark 天生支持这种大数据量的整合处理

基本思路就是,通过 spark 大数据引擎分布式连接 JDBC 读取数据库的数据,然后写入到 CSV 文件到 hdfs 或者到本地文件系统,最终从 hdfs 或者本地写入搭配 response 响应流给前端下载

@GetMapping("/download-csv")
    public ResponseEntity<InputStreamResource> downloadCsv(HttpServletRequest request, HttpServletResponse response) throws IOException {
        return indexService.exportBySpark(request,response);
    }
   public ResponseEntity<InputStreamResource> exportBySpark(HttpServletRequest request, HttpServletResponse response) {
        SparkSQLToCSV sparkSQLToCSV = new SparkSQLToCSV();
        String path = sparkSQLToCSV.readToLocalFileBySpark();
        Path csvDirectory = Paths.get(path);

        // 确保目录存在
        if (!Files.isDirectory(csvDirectory)) {
            return ResponseEntity.notFound().build();
        }

        // 找到目录下的part-文件
        try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(csvDirectory, "part-*")) {
            for (Path entry : dirStream) {
                // 仅获取第一个匹配的文件
                String filename = entry.getFileName().toString();
                Path file = csvDirectory.resolve(filename);
                InputStreamResource resource = new InputStreamResource(Files.newInputStream(file));

                // 设置Content-Disposition响应头,用户会看到的下载文件名
                String contentDisposition = String.format("attachment; filename=\"%s\"", "data-sql.csv");

               deleteDirectory(csvDirectory);

                // 返回文件流
                return ResponseEntity.ok()
                        .header(HttpHeaders.CONTENT_DISPOSITION, contentDisposition)
                        .contentType(MediaType.parseMediaType("text/csv"))
                        .body(resource);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        // 如果没有找到文件,返回404
        return ResponseEntity.notFound().build();
    }
public class SparkSQLToCSV {
    public String readToLocalFileBySpark() {
        long start = System.currentTimeMillis();
        SparkSession spark = SparkSession.builder()
                .appName("Spark SQL to CSV Example")
                .master("local[*]") // Use local[*] for development/testing
                .getOrCreate();

        String jdbcUrl = "jdbc:mysql://localhost:3306/xiaohu_test";
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", "root");
        connectionProperties.put("password", "root");
        connectionProperties.put("driver", "com.mysql.jdbc.Driver");

        //模拟sql
        String sql = "( SELECT * FROM employees1 ) as employees ";
        String path = "/xxxx/localfile/spark";
        spark.read()
                .jdbc(jdbcUrl, sql, connectionProperties)
                .withColumn(
                "hireTime",
                functions.date_format(functions.col("hireTime"), "yyyy-MM-dd HH🇲🇲ss"))
                .repartition(1)
                .write()
                .format("csv")
                .option("header", true)
                .mode(SaveMode.Overwrite)
                .csv(path);
        spark.stop();
        long end = System.currentTimeMillis();
        System.out.println("spark花费 " + (end -start) /1000  + "秒");
        return path;
    }
}
 <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.13</artifactId>
            <version>3.5.0</version>
        </dependency>

测试

最终结果测试 100 万数据导出花费时间:

image.png

image.png

image.png

image.png

仅仅只需要 34s 就能导出 500w 的数据


标题:我用spark导出500万数据
作者:xiaohugg
地址:https://xiaohugg.top/articles/2024/02/25/1708863590986.html

人民有信仰 民族有希望 国家有力量