Skip to content

创建自定义 ItemReader 和 ItemWriter 教程

引言

在批处理应用中,Spring Batch 提供了多种开箱即用的 ItemReaderItemWriter 实现。但当这些通用实现无法满足特定业务需求时,我们就需要创建自定义实现。本教程将详细讲解如何创建自定义的 ItemReaderItemWriter,并实现可重启特性,确保批处理作业在中断后能从中断点继续执行。

核心概念

  • ItemReader:负责从数据源读取数据(如数据库、文件、API等)
  • ItemWriter:负责处理并写入数据(如数据库、文件、消息队列等)
  • 可重启性:作业中断后能从中断点继续处理的关键特性
  • ItemStream:提供状态保存和恢复机制的核心接口

自定义 ItemReader 实现

基础 ItemReader 示例

下面是一个简单的 ItemReader 实现,它从提供的列表中读取数据:

kotlin
class CustomItemReader<T>(private val items: MutableList<T>) : ItemReader<T> {
    
    override fun read(): T? {
        return if (items.isNotEmpty()) {
            items.removeAt(0)
        } else {
            null // 返回 null 表示读取完成
        }
    }
}
代码说明
  1. 构造函数接收一个可变列表作为数据源
  2. read() 方法每次移除并返回列表的第一个元素
  3. 当列表为空时返回 null,符合 ItemReader 的完成约定

使用示例:

kotlin
// 测试代码
fun testCustomItemReader() {
    val items = mutableListOf("1", "2", "3")
    val reader = CustomItemReader(items)
    
    assertEquals("1", reader.read())
    assertEquals("2", reader.read())
    assertEquals("3", reader.read())
    assertNull(reader.read()) // 返回 null 表示读取完成
}

实现可重启的 ItemReader

基础实现不支持重启功能,作业中断后只能重新开始。要实现可重启功能,我们需要实现 ItemStream 接口:

kotlin
class RestartableItemReader<T>(
    private val items: List<T> // 注意:这里使用不可变列表
) : ItemReader<T>, ItemStream {
    
    private var currentIndex = 0
    private val CURRENT_INDEX_KEY = "current.index"
    
    override fun read(): T? {
        return if (currentIndex < items.size) {
            items[currentIndex++] 
        } else {
            null
        }
    }
    
    // 从执行上下文中恢复状态
    override fun open(executionContext: ExecutionContext) {
        currentIndex = executionContext.getInt(CURRENT_INDEX_KEY, 0) 
    }
    
    // 保存当前状态到执行上下文
    override fun update(executionContext: ExecutionContext) {
        executionContext.putInt(CURRENT_INDEX_KEY, currentIndex) 
    }
    
    override fun close() = Unit // 无资源需要关闭
}

状态保存机制

  • open():作业启动时调用,从 ExecutionContext 恢复上次保存的状态
  • update():处理过程中定期调用,保存当前状态到 ExecutionContext
  • close():作业结束时调用,释放资源

测试重启功能:

kotlin
fun testRestartableReader() {
    // 第一次执行(处理到索引1)
    val items = listOf("1", "2", "3")
    val reader = RestartableItemReader(items)
    
    val context = ExecutionContext()
    (reader as ItemStream).open(context)
    
    assertEquals("1", reader.read())
    (reader as ItemStream).update(context) // 保存状态
    
    // 模拟作业中断后重启
    val restartedReader = RestartableItemReader(items)
    (restartedReader as ItemStream).open(context) // 恢复状态
    
    assertEquals("2", restartedReader.read()) // 从中断点继续
}

状态键名注意事项

ExecutionContext 中存储状态时,键名必须唯一

kotlin
// 推荐做法:使用类名作为前缀
private val CURRENT_INDEX_KEY = "${this::class.simpleName}.current.index"

避免多个同类型组件在同一 Step 中使用时发生键名冲突

自定义 ItemWriter 实现

基础 ItemWriter 示例

下面是一个简单的 ItemWriter 实现,它将数据写入内存列表:

kotlin
class CustomItemWriter<T> : ItemWriter<T> {
    
    private val output = mutableListOf<T>()
    
    override fun write(items: List<T>) {
        output.addAll(items) 
    }
    
    fun getOutput(): List<T> = output
}

使用示例:

kotlin
fun testCustomItemWriter() {
    val writer = CustomItemWriter<String>()
    writer.write(listOf("A", "B", "C"))
    
    assertEquals(listOf("A", "B", "C"), writer.getOutput())
}

实现可重启的 ItemWriter

实现可重启的 ItemWriterItemReader 类似,都需要实现 ItemStream 接口:

kotlin
class RestartableItemWriter<T> : ItemWriter<T>, ItemStream {
    
    private val output = mutableListOf<T>()
    private var itemCount = 0
    private val ITEM_COUNT_KEY = "item.count"
    
    override fun write(items: List<T>) {
        output.addAll(items)
        itemCount += items.size 
    }
    
    override fun open(executionContext: ExecutionContext) {
        itemCount = executionContext.getInt(ITEM_COUNT_KEY, 0) 
    }
    
    override fun update(executionContext: ExecutionContext) {
        executionContext.putInt(ITEM_COUNT_KEY, itemCount) 
    }
    
    override fun close() = Unit
    
    fun getOutput(): List<T> = output
}

实际应用场景

在真实项目中,ItemWriter 通常:

  1. 委托给其他可重启的写入器(如文件写入器)
  2. 写入事务性资源(数据库),本身不需要额外状态
  3. 需要维护状态时(如写入数量统计),才需实现 ItemStream

最佳实践总结

kotlin
// 1. 优先使用不可变数据源
class SafeItemReader<T>(private val items: List<T>) : ItemReader<T> 

// 2. 实现ItemStream接口支持重启
class RestartableReader<T> : ItemReader<T>, ItemStream

// 3. 使用唯一键名保存状态
private val STATE_KEY = "${this::class.simpleName}.state"
kotlin
// 1. 批量写入提高性能
override fun write(items: List<T>) {
    repository.saveAll(items)
}

// 2. 仅在需要时维护状态
class StatsWriter : ItemWriter<T>, ItemStream {
    private var writeCount = 0
    // ...实现状态保存/恢复
}

// 3. 利用事务保证数据一致性
@Transactional
override fun write(items: List<T>) {
    // 写入操作
}

常见错误

  1. 状态键冲突:多个组件使用相同状态键名
  2. 非线程安全:在并发环境下共享可变状态
  3. 状态过大:在ExecutionContext中存储大型对象
  4. 忽略事务:写入操作未放在事务中执行

结语

掌握自定义 ItemReaderItemWriter 的实现是构建灵活批处理系统的关键。通过实现 ItemStream 接口,我们可以轻松添加重启功能,确保作业的容错性数据一致性

在实际项目中,建议:

  1. ✅ 优先使用 Spring Batch 内置实现
  2. ✅ 仅在必要时创建自定义组件
  3. ✅ 充分测试重启场景
  4. ✅ 监控状态存储大小

下一步学习

  • 深入了解 ItemProcessor 的实现
  • 学习批处理作业的监控与管理
  • 探索 Spring Batch 与 Spring Cloud Task 的集成