Appearance
Spring Batch 多文件输入处理指南
概述
在批处理任务中,经常需要同时处理多个格式相同的文件。Spring Batch 提供了 MultiResourceItemReader
组件,可以高效地处理这种需求。本教程将详细介绍如何配置和使用该组件,并通过 Kotlin 代码示例展示最佳实践。
多文件处理场景
假设我们有以下文件目录结构:
file-1.txt file-2.txt ignored.txt
其中 file-1.txt
和 file-2.txt
格式相同,需要作为一个整体处理,而 ignored.txt
不需要处理。
核心挑战
- 如何批量读取多个文件
- 如何保证文件处理顺序一致性
- 如何处理任务重启时的状态恢复
解决方案:MultiResourceItemReader
MultiResourceItemReader
通过委托模式工作:
- 使用通配符匹配多个文件资源
- 将每个文件的读取操作委托给实际的文件读取器
- 自动处理文件切换逻辑
Kotlin 配置示例
1. 配置基础文件读取器
kotlin
// 创建处理单个文件的ItemReader
@Bean
fun flatFileItemReader(): FlatFileItemReader<YourDataClass> {
return FlatFileItemReaderBuilder<YourDataClass>()
.name("flatFileReader")
.resource(Resource()) // 此处仅为占位符,实际由MultiResource管理
.delimited()
.delimiter(",")
.names("field1", "field2", "field3")
.fieldSetMapper { fieldSet ->
YourDataClass(
field1 = fieldSet.readString("field1"),
field2 = fieldSet.readInt("field2"),
field3 = fieldSet.readDate("field3", "yyyy-MM-dd")
)
}
.build()
}
2. 配置多文件读取器
kotlin
@Bean
fun multiResourceReader(
@Value("classpath:data/input/file-*.txt")
resources: Array<Resource> // 使用通配符匹配多个文件
): MultiResourceItemReader<YourDataClass> {
return MultiResourceItemReaderBuilder<YourDataClass>()
.name("multiFileReader")
.delegate(flatFileItemReader()) // 委托给基础读取器
.resources(*resources)
.setComparator(ResourceComparator()) // 关键:确保文件顺序一致性
.build()
}
IMPORTANT
资源排序的重要性 在重启场景中,必须保证文件处理顺序一致。通过 setComparator(ResourceComparator())
确保每次运行时文件处理顺序相同
3. 完整Step配置
kotlin
@Bean
fun fileProcessingStep(
reader: MultiResourceItemReader<YourDataClass>,
processor: ItemProcessor<YourDataClass, YourOutputClass>,
writer: ItemWriter<YourOutputClass>
): Step {
return stepBuilderFactory.get("multiFileProcessingStep")
.chunk<YourDataClass, YourOutputClass>(100) // 每100条记录提交一次
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipPolicy { _, _ -> true } // 根据需要配置跳过策略
.build()
}
关键注意事项
1. 文件目录管理策略
WARNING
避免重启问题的最佳实践 添加新文件到正在处理的目录可能导致重启问题。推荐策略:
- 每个批处理任务使用独立工作目录
- 任务完成后才将文件移入处理目录
- 使用
FileSystemResource
而非ClassPathResource
以便管理文件状态
2. 错误处理机制
kotlin
// 在Step配置中添加容错机制
faultTolerant()
.skip(ParseException::class.java) // 跳过解析错误
.skipLimit(100) // 最多允许跳过100条错误记录
.retryLimit(3) // 最多重试3次
.retry(DeadlockLoserDataAccessException::class.java)
3. 性能优化技巧
性能优化建议
- 对于大文件,启用 异步处理:kotlin
.taskExecutor(SimpleAsyncTaskExecutor()) .throttleLimit(10) // 并发线程数
- 使用
saveState=false
避免小文件状态保存开销:kotlin.saveState(false) // 小文件处理时可禁用状态保存
- 文件排序策略应匹配物理存储顺序以减少磁盘寻道时间
常见问题解决
Q1: 如何处理不同格式的文件?
CAUTION
MultiResourceItemReader
要求所有文件格式相同。如果需要处理不同格式:
- 使用
ClassifierCompositeItemReader
- 根据文件名路由到不同的解析器
- 实现自定义文件分类逻辑
Q2: 任务重启后如何避免重复处理?
kotlin
@Bean
fun jobExplorer(): JobExplorer {
return JobExplorerFactoryBean().apply {
setDataSource(dataSource)
setTablePrefix("BATCH_") // 确保表前缀与配置一致
}.getObject()
}
NOTE
重启机制依赖项
- 正确配置
JobRepository
- 使用持久化数据库存储任务状态
- 确保
JobParameters
中包含唯一标识符
总结与最佳实践
✅ 正确做法 | ❌ 错误做法 |
---|---|
✅ 为每个Job使用独立输入目录 | ❌ 多个Job共享同一输入目录 |
✅ 设置资源排序比较器 | ❌ 依赖文件系统默认排序 |
✅ 处理完成后归档文件 | ❌ 保留已处理文件在原目录 |
✅ 监控文件系统变化 | ❌ 假设输入目录静态不变 |
最终配置建议
- 生产环境使用绝对路径而非 classpath:kotlin
@Value("file:/data/inputs/batch-job-${jobId}/*.csv")
- 添加文件校验前置步骤:kotlin
@BeforeStep fun validateFiles(files: Array<Resource>) { if (files.isEmpty()) throw EmptyInputException() }
- 实现文件处理进度监控
通过本教程,您应该能够高效实现Spring Batch的多文件处理功能。关键点是确保文件顺序一致性和正确处理重启场景,这将保证批处理任务的可靠性和可重复性。