Skip to content

Spring Batch 多文件输入处理指南

概述

在批处理任务中,经常需要同时处理多个格式相同的文件。Spring Batch 提供了 MultiResourceItemReader 组件,可以高效地处理这种需求。本教程将详细介绍如何配置和使用该组件,并通过 Kotlin 代码示例展示最佳实践。

多文件处理场景

假设我们有以下文件目录结构:

file-1.txt  file-2.txt  ignored.txt

其中 file-1.txtfile-2.txt 格式相同,需要作为一个整体处理,而 ignored.txt 不需要处理。

核心挑战

  • 如何批量读取多个文件
  • 如何保证文件处理顺序一致性
  • 如何处理任务重启时的状态恢复

解决方案:MultiResourceItemReader

MultiResourceItemReader 通过委托模式工作:

  1. 使用通配符匹配多个文件资源
  2. 将每个文件的读取操作委托给实际的文件读取器
  3. 自动处理文件切换逻辑

Kotlin 配置示例

1. 配置基础文件读取器

kotlin
// 创建处理单个文件的ItemReader
@Bean
fun flatFileItemReader(): FlatFileItemReader<YourDataClass> {
    return FlatFileItemReaderBuilder<YourDataClass>()
        .name("flatFileReader")
        .resource(Resource()) // 此处仅为占位符,实际由MultiResource管理
        .delimited()
        .delimiter(",")
        .names("field1", "field2", "field3")
        .fieldSetMapper { fieldSet ->
            YourDataClass(
                field1 = fieldSet.readString("field1"),
                field2 = fieldSet.readInt("field2"),
                field3 = fieldSet.readDate("field3", "yyyy-MM-dd")
            )
        }
        .build()
}

2. 配置多文件读取器

kotlin
@Bean
fun multiResourceReader(
    @Value("classpath:data/input/file-*.txt") 
    resources: Array<Resource> // 使用通配符匹配多个文件
): MultiResourceItemReader<YourDataClass> {
    return MultiResourceItemReaderBuilder<YourDataClass>()
        .name("multiFileReader")
        .delegate(flatFileItemReader()) // 委托给基础读取器
        .resources(*resources)
        .setComparator(ResourceComparator()) // 关键:确保文件顺序一致性
        .build()
}

IMPORTANT

资源排序的重要性 在重启场景中,必须保证文件处理顺序一致。通过 setComparator(ResourceComparator()) 确保每次运行时文件处理顺序相同

3. 完整Step配置

kotlin
@Bean
fun fileProcessingStep(
    reader: MultiResourceItemReader<YourDataClass>,
    processor: ItemProcessor<YourDataClass, YourOutputClass>,
    writer: ItemWriter<YourOutputClass>
): Step {
    return stepBuilderFactory.get("multiFileProcessingStep")
        .chunk<YourDataClass, YourOutputClass>(100) // 每100条记录提交一次
        .reader(reader)
        .processor(processor)
        .writer(writer)
        .faultTolerant()
        .skipPolicy { _, _ -> true } // 根据需要配置跳过策略
        .build()
}

关键注意事项

1. 文件目录管理策略

WARNING

避免重启问题的最佳实践 添加新文件到正在处理的目录可能导致重启问题。推荐策略:

  • 每个批处理任务使用独立工作目录
  • 任务完成后才将文件移入处理目录
  • 使用 FileSystemResource 而非 ClassPathResource 以便管理文件状态

2. 错误处理机制

kotlin
// 在Step配置中添加容错机制
faultTolerant()
    .skip(ParseException::class.java) // 跳过解析错误
    .skipLimit(100) // 最多允许跳过100条错误记录
    .retryLimit(3)  // 最多重试3次
    .retry(DeadlockLoserDataAccessException::class.java)

3. 性能优化技巧

性能优化建议

  1. 对于大文件,启用 异步处理
    kotlin
    .taskExecutor(SimpleAsyncTaskExecutor())
    .throttleLimit(10) // 并发线程数
  2. 使用 saveState=false 避免小文件状态保存开销:
    kotlin
    .saveState(false) // 小文件处理时可禁用状态保存
  3. 文件排序策略应匹配物理存储顺序以减少磁盘寻道时间

常见问题解决

Q1: 如何处理不同格式的文件?

CAUTION

MultiResourceItemReader 要求所有文件格式相同。如果需要处理不同格式:

  1. 使用 ClassifierCompositeItemReader
  2. 根据文件名路由到不同的解析器
  3. 实现自定义文件分类逻辑

Q2: 任务重启后如何避免重复处理?

kotlin
@Bean
fun jobExplorer(): JobExplorer {
    return JobExplorerFactoryBean().apply {
        setDataSource(dataSource)
        setTablePrefix("BATCH_") // 确保表前缀与配置一致
    }.getObject()
}

NOTE

重启机制依赖项

  • 正确配置 JobRepository
  • 使用持久化数据库存储任务状态
  • 确保 JobParameters 中包含唯一标识符

总结与最佳实践

正确做法错误做法
✅ 为每个Job使用独立输入目录❌ 多个Job共享同一输入目录
✅ 设置资源排序比较器❌ 依赖文件系统默认排序
✅ 处理完成后归档文件❌ 保留已处理文件在原目录
✅ 监控文件系统变化❌ 假设输入目录静态不变

最终配置建议

  1. 生产环境使用绝对路径而非 classpath:
    kotlin
    @Value("file:/data/inputs/batch-job-${jobId}/*.csv")
  2. 添加文件校验前置步骤:
    kotlin
    @BeforeStep
    fun validateFiles(files: Array<Resource>) {
        if (files.isEmpty()) throw EmptyInputException()
    }
  3. 实现文件处理进度监控

通过本教程,您应该能够高效实现Spring Batch的多文件处理功能。关键点是确保文件顺序一致性和正确处理重启场景,这将保证批处理任务的可靠性和可重复性。