Skip to content

Spring Batch ItemReader 与 ItemWriter 实战指南

📖 核心概念

IMPORTANT

批量处理本质:所有批处理都可以简化为三个核心步骤:读取数据处理/转换写入结果

Spring Batch 提供了三个关键接口实现批量处理:

kotlin
interface ItemReader<T> {
    fun read(): T?  // 读取单个数据项
}

interface ItemProcessor<I, O> {
    fun process(item: I): O?  // 处理/转换数据
}

interface ItemWriter<T> {
    fun write(items: MutableList<out T>)  // 批量写入数据
}

工作流程示意图

🔍 ItemReader 详解

基本功能

  • 从数据源读取单条记录
  • 当数据读取完毕时返回 null
  • 支持 重启能力(从失败点继续)

常见实现类

数据源类型实现类说明
平面文件FlatFileItemReader读取CSV/TXT文件
数据库JdbcCursorItemReader基于JDBC游标读取
JdbcPagingItemReader分页读取大数据集
XML文件StaxEventItemReader基于StAX解析XML
JSON文件JsonItemReader流式读取JSON文件
多文件输入MultiResourceItemReader合并多个文件输入源

文件读取示例 (Kotlin DSL)

kotlin
@Configuration
class BatchConfig {

    @Bean
    fun flatFileItemReader(): FlatFileItemReader<Customer> {
        return FlatFileItemReaderBuilder<Customer>()
            .name("customerReader")
            .resource(ClassPathResource("customers.csv"))
            .delimited()
            .names("firstName", "lastName", "email")
            .fieldSetMapper { fieldSet ->
                Customer(
                    firstName = fieldSet.readString("firstName"),
                    lastName = fieldSet.readString("lastName"),
                    email = fieldSet.readString("email")
                )
            }
            .build()
    }
    
    // [!code tip] 使用LineMapper处理复杂文件结构
    // 大文件建议使用游标而非分页方式
}

✍️ ItemWriter 详解

核心功能

  • 批量写入处理后的数据
  • 支持事务管理
  • 实现数据持久化

常见实现类

目标系统实现类说明
数据库JdbcBatchItemWriterJDBC批量写入
HibernateItemWriterHibernate ORM写入
文件FlatFileItemWriter写入CSV/TXT文件
XML文件StaxEventItemWriter生成XML文件
JSON文件JsonFileItemWriter写入JSON文件
消息队列AmqpItemWriter写入RabbitMQ等消息队列

数据库写入示例 (注解配置)

kotlin
@Bean
fun jdbcItemWriter(dataSource: DataSource): JdbcBatchItemWriter<Customer> {
    return JdbcBatchItemWriterBuilder<Customer>()
        .dataSource(dataSource)
        .sql("INSERT INTO customers (first_name, last_name, email) VALUES (:firstName, :lastName, :email)")
        .beanMapped()  // 自动映射对象属性到SQL参数
        .build()
}

@Bean
fun step(writer: JdbcBatchItemWriter<Customer>): Step {
    return stepBuilderFactory.get("processCustomers")
        .chunk<Customer, Customer>(10)  // 每10条记录提交一次
        .reader(flatFileItemReader())
        .writer(writer)
        .build()
}

🔄 ItemStream 接口

NOTE

ItemStream 作用:管理批处理组件的状态(如文件位置、数据库游标),实现重启能力

kotlin
interface ItemStream {
    fun open(executionContext: ExecutionContext) // 初始化资源
    fun update(executionContext: ExecutionContext) // 更新状态
    fun close() // 释放资源
}

重要实践

实现自定义 ItemReader/ItemWriter 时,应同时实现 ItemStream 接口以确保:

  • 故障恢复能力
  • 状态持久化
  • 资源正确释放

🧩 委托模式实战

场景:重用现有服务

kotlin
class ServiceItemWriter(private val customerService: CustomerService) : ItemWriter<Customer> {
    
    override fun write(items: MutableList<out Customer>) {
        items.forEach { customerService.process(it) }  
    }
}

@Bean
fun serviceDelegateWriter(customerService: CustomerService): ItemWriter<Customer> {
    return ServiceItemWriter(customerService)
}

注册委托到Step

kotlin
@Bean
fun processStep(writer: ItemWriter<Customer>): Step {
    return stepBuilderFactory.get("serviceDelegationStep")
        .chunk<Customer, Customer>(50)
        .reader(fileReader())
        .writer(writer)  // 注入委托的ItemWriter
        .build()
}

🗃️ 多文件输入处理

kotlin
@Bean
fun multiResourceReader(): MultiResourceItemReader<Customer> {
    val reader = MultiResourceItemReader()
    reader.setDelegate(fileItemReader())
    reader.setResources(Resource[] {
        ClassPathResource("customers-1.csv"),
        ClassPathResource("customers-2.csv"),
        ClassPathResource("customers-3.csv")
    })  // 设置多个文件资源
    return reader
}

⚠️ 防止状态持久化

对于无状态组件,需要显式禁用状态保存:

kotlin
@Bean
fun statelessReader(): ItemReader<Customer> {
    return object : ItemReader<Customer> {
        override fun read(): Customer? {
            // 无状态读取逻辑
        }
    }.apply {
        // 标记为无状态,不保存执行上下文
        setSaveState(false)  
    }
}

🔧 创建自定义ItemReader

自定义数据库读取器实现
kotlin
class CustomJdbcReader(
    private val jdbcTemplate: JdbcTemplate,
    private val query: String
) : ItemReader<Customer>, ItemStream {

    private var cursor: Cursor<Customer>? = null

    override fun open(executionContext: ExecutionContext) {
        cursor = jdbcTemplate.queryForCursor(query) { rs, _ ->
            Customer(
                rs.getString("first_name"),
                rs.getString("last_name"),
                rs.getString("email")
            )
        }
    }

    override fun read(): Customer? {
        return cursor?.next() ?: null
    }

    override fun close() {
        cursor?.close()
    }

    override fun update(context: ExecutionContext) {
        // 更新游标位置等状态信息
    }
}

🚀 最佳实践总结

  1. ✅ 优先使用内置实现:Spring Batch 提供了丰富的内置 Reader/Writer
  2. ✅ 合理设置chunk大小:平衡内存使用和I/O效率
  3. ✅ 实现ItemStream接口:确保组件支持重启
  4. ❌ 避免在Reader中执行业务逻辑:保持单一职责
  5. ⚠️ 大文件处理:使用游标而非分页读取超大文件

CAUTION

关键注意事项

  • 确保 ItemReader 实现是线程安全的(如果使用并行处理)
  • 在写入数据库时使用 JdbcBatchItemWriter 以获得最佳性能
  • 测试重启场景确保状态正确恢复

通过掌握这些核心组件,您可以构建高效可靠的批处理应用!💪