Appearance
创建自定义 ItemReader 和 ItemWriter 教程
引言
在批处理应用中,Spring Batch 提供了多种开箱即用的 ItemReader
和 ItemWriter
实现。但当这些通用实现无法满足特定业务需求时,我们就需要创建自定义实现。本教程将详细讲解如何创建自定义的 ItemReader
和 ItemWriter
,并实现可重启特性,确保批处理作业在中断后能从中断点继续执行。
核心概念
- ItemReader:负责从数据源读取数据(如数据库、文件、API等)
- ItemWriter:负责处理并写入数据(如数据库、文件、消息队列等)
- 可重启性:作业中断后能从中断点继续处理的关键特性
- ItemStream:提供状态保存和恢复机制的核心接口
自定义 ItemReader 实现
基础 ItemReader 示例
下面是一个简单的 ItemReader
实现,它从提供的列表中读取数据:
kotlin
class CustomItemReader<T>(private val items: MutableList<T>) : ItemReader<T> {
override fun read(): T? {
return if (items.isNotEmpty()) {
items.removeAt(0)
} else {
null // 返回 null 表示读取完成
}
}
}
代码说明
- 构造函数接收一个可变列表作为数据源
read()
方法每次移除并返回列表的第一个元素- 当列表为空时返回
null
,符合ItemReader
的完成约定
使用示例:
kotlin
// 测试代码
fun testCustomItemReader() {
val items = mutableListOf("1", "2", "3")
val reader = CustomItemReader(items)
assertEquals("1", reader.read())
assertEquals("2", reader.read())
assertEquals("3", reader.read())
assertNull(reader.read()) // 返回 null 表示读取完成
}
实现可重启的 ItemReader
基础实现不支持重启功能,作业中断后只能重新开始。要实现可重启功能,我们需要实现 ItemStream
接口:
kotlin
class RestartableItemReader<T>(
private val items: List<T> // 注意:这里使用不可变列表
) : ItemReader<T>, ItemStream {
private var currentIndex = 0
private val CURRENT_INDEX_KEY = "current.index"
override fun read(): T? {
return if (currentIndex < items.size) {
items[currentIndex++]
} else {
null
}
}
// 从执行上下文中恢复状态
override fun open(executionContext: ExecutionContext) {
currentIndex = executionContext.getInt(CURRENT_INDEX_KEY, 0)
}
// 保存当前状态到执行上下文
override fun update(executionContext: ExecutionContext) {
executionContext.putInt(CURRENT_INDEX_KEY, currentIndex)
}
override fun close() = Unit // 无资源需要关闭
}
状态保存机制
open()
:作业启动时调用,从ExecutionContext
恢复上次保存的状态update()
:处理过程中定期调用,保存当前状态到ExecutionContext
close()
:作业结束时调用,释放资源
测试重启功能:
kotlin
fun testRestartableReader() {
// 第一次执行(处理到索引1)
val items = listOf("1", "2", "3")
val reader = RestartableItemReader(items)
val context = ExecutionContext()
(reader as ItemStream).open(context)
assertEquals("1", reader.read())
(reader as ItemStream).update(context) // 保存状态
// 模拟作业中断后重启
val restartedReader = RestartableItemReader(items)
(restartedReader as ItemStream).open(context) // 恢复状态
assertEquals("2", restartedReader.read()) // 从中断点继续
}
状态键名注意事项
在 ExecutionContext
中存储状态时,键名必须唯一:
kotlin
// 推荐做法:使用类名作为前缀
private val CURRENT_INDEX_KEY = "${this::class.simpleName}.current.index"
避免多个同类型组件在同一 Step 中使用时发生键名冲突
自定义 ItemWriter 实现
基础 ItemWriter 示例
下面是一个简单的 ItemWriter
实现,它将数据写入内存列表:
kotlin
class CustomItemWriter<T> : ItemWriter<T> {
private val output = mutableListOf<T>()
override fun write(items: List<T>) {
output.addAll(items)
}
fun getOutput(): List<T> = output
}
使用示例:
kotlin
fun testCustomItemWriter() {
val writer = CustomItemWriter<String>()
writer.write(listOf("A", "B", "C"))
assertEquals(listOf("A", "B", "C"), writer.getOutput())
}
实现可重启的 ItemWriter
实现可重启的 ItemWriter
与 ItemReader
类似,都需要实现 ItemStream
接口:
kotlin
class RestartableItemWriter<T> : ItemWriter<T>, ItemStream {
private val output = mutableListOf<T>()
private var itemCount = 0
private val ITEM_COUNT_KEY = "item.count"
override fun write(items: List<T>) {
output.addAll(items)
itemCount += items.size
}
override fun open(executionContext: ExecutionContext) {
itemCount = executionContext.getInt(ITEM_COUNT_KEY, 0)
}
override fun update(executionContext: ExecutionContext) {
executionContext.putInt(ITEM_COUNT_KEY, itemCount)
}
override fun close() = Unit
fun getOutput(): List<T> = output
}
实际应用场景
在真实项目中,ItemWriter
通常:
- 委托给其他可重启的写入器(如文件写入器)
- 写入事务性资源(数据库),本身不需要额外状态
- 需要维护状态时(如写入数量统计),才需实现
ItemStream
最佳实践总结
kotlin
// 1. 优先使用不可变数据源
class SafeItemReader<T>(private val items: List<T>) : ItemReader<T>
// 2. 实现ItemStream接口支持重启
class RestartableReader<T> : ItemReader<T>, ItemStream
// 3. 使用唯一键名保存状态
private val STATE_KEY = "${this::class.simpleName}.state"
kotlin
// 1. 批量写入提高性能
override fun write(items: List<T>) {
repository.saveAll(items)
}
// 2. 仅在需要时维护状态
class StatsWriter : ItemWriter<T>, ItemStream {
private var writeCount = 0
// ...实现状态保存/恢复
}
// 3. 利用事务保证数据一致性
@Transactional
override fun write(items: List<T>) {
// 写入操作
}
常见错误
- 状态键冲突:多个组件使用相同状态键名
- 非线程安全:在并发环境下共享可变状态
- 状态过大:在ExecutionContext中存储大型对象
- 忽略事务:写入操作未放在事务中执行
结语
掌握自定义 ItemReader
和 ItemWriter
的实现是构建灵活批处理系统的关键。通过实现 ItemStream
接口,我们可以轻松添加重启功能,确保作业的容错性和数据一致性。
在实际项目中,建议:
- ✅ 优先使用 Spring Batch 内置实现
- ✅ 仅在必要时创建自定义组件
- ✅ 充分测试重启场景
- ✅ 监控状态存储大小
下一步学习
- 深入了解
ItemProcessor
的实现 - 学习批处理作业的监控与管理
- 探索 Spring Batch 与 Spring Cloud Task 的集成