背景
最近公司需要实现一个报表功能,需要导出近一段时间内的数据,时间段由用户在页面填写,作为 SQL 参数查询过滤,因为涉及到查询的信息详细,需要关联的表过多和字段过多,以及数据量过大,可能会有性能方面的问题
方案一: 多线程分页多 sheet+ 队列(发布订阅)
网上有很多的方案说了,用多线程分批处理,自己也写了个 demo 实现,多线程分批分页查询 + 发布订阅队列 + 分 sheet 写入,虽然功能实现了,最终测试 500 万数据,结果需要将近 8 分钟了,感觉不太满意这种方式,后面想到我们公司的服务器有大数据方面环境,想着能不能用大数据解决,最终决定用 spark
方案二:spark+cxv
经过查找资料和自己学习,基本掌握了一点 spark 的知识和整合 Java +springboot,发现 spark 天生支持这种大数据量的整合处理
基本思路就是,通过 spark 大数据引擎分布式连接 JDBC 读取数据库的数据,然后写入到 CSV 文件到 hdfs 或者到本地文件系统,最终从 hdfs 或者本地写入搭配 response 响应流给前端下载
@GetMapping("/download-csv")
public ResponseEntity<InputStreamResource> downloadCsv(HttpServletRequest request, HttpServletResponse response) throws IOException {
return indexService.exportBySpark(request,response);
}
public ResponseEntity<InputStreamResource> exportBySpark(HttpServletRequest request, HttpServletResponse response) {
SparkSQLToCSV sparkSQLToCSV = new SparkSQLToCSV();
String path = sparkSQLToCSV.readToLocalFileBySpark();
Path csvDirectory = Paths.get(path);
// 确保目录存在
if (!Files.isDirectory(csvDirectory)) {
return ResponseEntity.notFound().build();
}
// 找到目录下的part-文件
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(csvDirectory, "part-*")) {
for (Path entry : dirStream) {
// 仅获取第一个匹配的文件
String filename = entry.getFileName().toString();
Path file = csvDirectory.resolve(filename);
InputStreamResource resource = new InputStreamResource(Files.newInputStream(file));
// 设置Content-Disposition响应头,用户会看到的下载文件名
String contentDisposition = String.format("attachment; filename=\"%s\"", "data-sql.csv");
deleteDirectory(csvDirectory);
// 返回文件流
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_DISPOSITION, contentDisposition)
.contentType(MediaType.parseMediaType("text/csv"))
.body(resource);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
// 如果没有找到文件,返回404
return ResponseEntity.notFound().build();
}
public class SparkSQLToCSV {
public String readToLocalFileBySpark() {
long start = System.currentTimeMillis();
SparkSession spark = SparkSession.builder()
.appName("Spark SQL to CSV Example")
.master("local[*]") // Use local[*] for development/testing
.getOrCreate();
String jdbcUrl = "jdbc:mysql://localhost:3306/xiaohu_test";
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "root");
connectionProperties.put("driver", "com.mysql.jdbc.Driver");
//模拟sql
String sql = "( SELECT * FROM employees1 ) as employees ";
String path = "/xxxx/localfile/spark";
spark.read()
.jdbc(jdbcUrl, sql, connectionProperties)
.withColumn(
"hireTime",
functions.date_format(functions.col("hireTime"), "yyyy-MM-dd HH🇲🇲ss"))
.repartition(1)
.write()
.format("csv")
.option("header", true)
.mode(SaveMode.Overwrite)
.csv(path);
spark.stop();
long end = System.currentTimeMillis();
System.out.println("spark花费 " + (end -start) /1000 + "秒");
return path;
}
}
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.0</version>
</dependency>
测试
最终结果测试 100 万数据导出花费时间:
仅仅只需要 34s 就能导出 500w 的数据