Appearance
Spring Batch ItemReader 与 ItemWriter 实战指南
📖 核心概念
IMPORTANT
批量处理本质:所有批处理都可以简化为三个核心步骤:读取数据 → 处理/转换 → 写入结果
Spring Batch 提供了三个关键接口实现批量处理:
kotlin
interface ItemReader<T> {
fun read(): T? // 读取单个数据项
}
interface ItemProcessor<I, O> {
fun process(item: I): O? // 处理/转换数据
}
interface ItemWriter<T> {
fun write(items: MutableList<out T>) // 批量写入数据
}
工作流程示意图
🔍 ItemReader 详解
基本功能
- 从数据源读取单条记录
- 当数据读取完毕时返回
null
- 支持 重启能力(从失败点继续)
常见实现类
数据源类型 | 实现类 | 说明 |
---|---|---|
平面文件 | FlatFileItemReader | 读取CSV/TXT文件 |
数据库 | JdbcCursorItemReader | 基于JDBC游标读取 |
JdbcPagingItemReader | 分页读取大数据集 | |
XML文件 | StaxEventItemReader | 基于StAX解析XML |
JSON文件 | JsonItemReader | 流式读取JSON文件 |
多文件输入 | MultiResourceItemReader | 合并多个文件输入源 |
文件读取示例 (Kotlin DSL)
kotlin
@Configuration
class BatchConfig {
@Bean
fun flatFileItemReader(): FlatFileItemReader<Customer> {
return FlatFileItemReaderBuilder<Customer>()
.name("customerReader")
.resource(ClassPathResource("customers.csv"))
.delimited()
.names("firstName", "lastName", "email")
.fieldSetMapper { fieldSet ->
Customer(
firstName = fieldSet.readString("firstName"),
lastName = fieldSet.readString("lastName"),
email = fieldSet.readString("email")
)
}
.build()
}
// [!code tip] 使用LineMapper处理复杂文件结构
// 大文件建议使用游标而非分页方式
}
✍️ ItemWriter 详解
核心功能
- 批量写入处理后的数据
- 支持事务管理
- 实现数据持久化
常见实现类
目标系统 | 实现类 | 说明 |
---|---|---|
数据库 | JdbcBatchItemWriter | JDBC批量写入 |
HibernateItemWriter | Hibernate ORM写入 | |
文件 | FlatFileItemWriter | 写入CSV/TXT文件 |
XML文件 | StaxEventItemWriter | 生成XML文件 |
JSON文件 | JsonFileItemWriter | 写入JSON文件 |
消息队列 | AmqpItemWriter | 写入RabbitMQ等消息队列 |
数据库写入示例 (注解配置)
kotlin
@Bean
fun jdbcItemWriter(dataSource: DataSource): JdbcBatchItemWriter<Customer> {
return JdbcBatchItemWriterBuilder<Customer>()
.dataSource(dataSource)
.sql("INSERT INTO customers (first_name, last_name, email) VALUES (:firstName, :lastName, :email)")
.beanMapped() // 自动映射对象属性到SQL参数
.build()
}
@Bean
fun step(writer: JdbcBatchItemWriter<Customer>): Step {
return stepBuilderFactory.get("processCustomers")
.chunk<Customer, Customer>(10) // 每10条记录提交一次
.reader(flatFileItemReader())
.writer(writer)
.build()
}
🔄 ItemStream 接口
NOTE
ItemStream 作用:管理批处理组件的状态(如文件位置、数据库游标),实现重启能力
kotlin
interface ItemStream {
fun open(executionContext: ExecutionContext) // 初始化资源
fun update(executionContext: ExecutionContext) // 更新状态
fun close() // 释放资源
}
重要实践
实现自定义 ItemReader/ItemWriter
时,应同时实现 ItemStream
接口以确保:
- 故障恢复能力
- 状态持久化
- 资源正确释放
🧩 委托模式实战
场景:重用现有服务
kotlin
class ServiceItemWriter(private val customerService: CustomerService) : ItemWriter<Customer> {
override fun write(items: MutableList<out Customer>) {
items.forEach { customerService.process(it) }
}
}
@Bean
fun serviceDelegateWriter(customerService: CustomerService): ItemWriter<Customer> {
return ServiceItemWriter(customerService)
}
注册委托到Step
kotlin
@Bean
fun processStep(writer: ItemWriter<Customer>): Step {
return stepBuilderFactory.get("serviceDelegationStep")
.chunk<Customer, Customer>(50)
.reader(fileReader())
.writer(writer) // 注入委托的ItemWriter
.build()
}
🗃️ 多文件输入处理
kotlin
@Bean
fun multiResourceReader(): MultiResourceItemReader<Customer> {
val reader = MultiResourceItemReader()
reader.setDelegate(fileItemReader())
reader.setResources(Resource[] {
ClassPathResource("customers-1.csv"),
ClassPathResource("customers-2.csv"),
ClassPathResource("customers-3.csv")
}) // 设置多个文件资源
return reader
}
⚠️ 防止状态持久化
对于无状态组件,需要显式禁用状态保存:
kotlin
@Bean
fun statelessReader(): ItemReader<Customer> {
return object : ItemReader<Customer> {
override fun read(): Customer? {
// 无状态读取逻辑
}
}.apply {
// 标记为无状态,不保存执行上下文
setSaveState(false)
}
}
🔧 创建自定义ItemReader
自定义数据库读取器实现
kotlin
class CustomJdbcReader(
private val jdbcTemplate: JdbcTemplate,
private val query: String
) : ItemReader<Customer>, ItemStream {
private var cursor: Cursor<Customer>? = null
override fun open(executionContext: ExecutionContext) {
cursor = jdbcTemplate.queryForCursor(query) { rs, _ ->
Customer(
rs.getString("first_name"),
rs.getString("last_name"),
rs.getString("email")
)
}
}
override fun read(): Customer? {
return cursor?.next() ?: null
}
override fun close() {
cursor?.close()
}
override fun update(context: ExecutionContext) {
// 更新游标位置等状态信息
}
}
🚀 最佳实践总结
- ✅ 优先使用内置实现:Spring Batch 提供了丰富的内置 Reader/Writer
- ✅ 合理设置chunk大小:平衡内存使用和I/O效率
- ✅ 实现ItemStream接口:确保组件支持重启
- ❌ 避免在Reader中执行业务逻辑:保持单一职责
- ⚠️ 大文件处理:使用游标而非分页读取超大文件
CAUTION
关键注意事项:
- 确保
ItemReader
实现是线程安全的(如果使用并行处理) - 在写入数据库时使用
JdbcBatchItemWriter
以获得最佳性能 - 测试重启场景确保状态正确恢复
通过掌握这些核心组件,您可以构建高效可靠的批处理应用!💪