Skip to content

Spring Batch ItemReader 与 ItemWriter 完全指南 🚀

核心作用:ItemReader 负责数据读取,ItemWriter 负责数据写入,二者共同构成 Spring Batch 的数据处理核心组件 ⚙️

一、核心概念与设计原理

1.1 Spring Batch 处理模型

1.2 核心接口定义

kotlin
// ItemReader 基础接口
interface ItemReader<T> {
    fun read(): T?  // 返回单个数据项或null(结束)
}

// ItemWriter 基础接口
interface ItemWriter<T> {
    fun write(items: List<T>)  // 批量写入数据
}

二、ItemReader 详解

2.1 文件数据读取器

FlatFileItemReader(CSV/TXT文件)

kotlin
@Bean
fun flatFileItemReader(): FlatFileItemReader<Person> {
    return FlatFileItemReaderBuilder<Person>()
        .name("personReader")
        .resource(ClassPathResource("data.csv")) 
        .delimited()
        .names("firstName", "lastName", "age")
        .fieldSetMapper { fieldSet ->
            Person(
                firstName = fieldSet.readString("firstName"),
                lastName = fieldSet.readString("lastName"),
                age = fieldSet.readInt("age")
            )
        }
        .build()
}

文件读取注意事项

  1. 大文件处理需配置 linesToSkip 跳过标题行
  2. 设置 saveState=true 支持断点续读
  3. 非线程安全,禁止多线程并发访问

JSON 文件读取(JsonItemReader)

kotlin
@Bean
fun jsonItemReader(): JsonItemReader<Person> {
    val mapper = jacksonObjectMapper()
    return JsonItemReaderBuilder<Person>()
        .name("jsonReader")
        .resource(ClassPathResource("data.json"))
        .jsonObjectReader(JacksonJsonObjectReader(mapper)) 
        .build()
}

2.2 数据库读取器

JDBC 游标读取(JdbcCursorItemReader)

kotlin
@Bean
fun jdbcCursorReader(dataSource: DataSource): JdbcCursorItemReader<Person> {
    return JdbcCursorItemReaderBuilder<Person>()
        .name("cursorReader")
        .dataSource(dataSource)
        .sql("SELECT first_name, last_name, age FROM people") 
        .rowMapper { rs, _ ->
            Person(
                firstName = rs.getString("first_name"),
                lastName = rs.getString("last_name"),
                age = rs.getInt("age")
            )
        }
        .build()
}

CAUTION

游标读取风险:长时间保持数据库连接,可能导致连接池耗尽

JPA 分页读取(JpaPagingItemReader)

kotlin
@Bean
fun jpaPagingReader(entityManagerFactory: EntityManagerFactory): JpaPagingItemReader<Person> {
    return JpaPagingItemReaderBuilder<Person>()
        .name("pagingReader")
        .entityManagerFactory(entityManagerFactory)
        .queryString("SELECT p FROM Person p WHERE p.age > :minAge") 
        .parameterValues(mapOf("minAge" to 18))
        .pageSize(100)  // 每页100条
        .build()
}

2.3 消息队列读取器

Kafka 消息读取(KafkaItemReader)

kotlin
@Bean
fun kafkaItemReader(): KafkaItemReader<String, Person> {
    val props = mapOf(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
        ConsumerConfig.GROUP_ID_CONFIG to "batch-group"
    )
    
    return KafkaItemReaderBuilder<String, Person>()
        .name("kafkaReader")
        .consumerProperties(props)
        .topic("person-topic") 
        .partitions(0,1,2)    // 监听的分区
        .keyValueMapper { _, value -> 
            objectMapper.readValue(value, Person::class.java)
        }
        .build()
}

2.4 完整 ItemReader 对比表

读取器类型实现类适用场景线程安全
文件读取FlatFileItemReaderCSV/TXT 文件
JsonItemReaderJSON 文件
StaxEventItemReaderXML 文件
数据库读取JdbcCursorItemReader小数据集
JdbcPagingItemReader大数据集
JpaPagingItemReaderJPA 分页查询
消息队列KafkaItemReaderKafka 消费
AmqpItemReaderRabbitMQ 消费
NoSQLMongoItemReaderMongoDB 查询
Neo4jItemReaderNeo4j 图数据库

三、ItemWriter 详解

3.1 数据库写入器

JDBC 批量写入(JdbcBatchItemWriter)

kotlin
@Bean
fun jdbcBatchWriter(dataSource: DataSource): JdbcBatchItemWriter<Person> {
    return JdbcBatchItemWriterBuilder<Person>()
        .dataSource(dataSource)
        .sql("INSERT INTO people (first_name, last_name, age) VALUES (:firstName, :lastName, :age)") 
        .beanMapped()  // 自动映射Bean属性
        .build()
}

JPA 实体写入(JpaItemWriter)

kotlin
@Bean
fun jpaItemWriter(entityManagerFactory: EntityManagerFactory): JpaItemWriter<Person> {
    val writer = JpaItemWriter<Person>()
    writer.entityManagerFactory = entityManagerFactory
    return writer
}

3.2 文件写入器

CSV 文件写入(FlatFileItemWriter)

kotlin
@Bean
fun csvFileWriter(): FlatFileItemWriter<Person> {
    return FlatFileItemWriterBuilder<Person>()
        .name("csvWriter")
        .resource(FileSystemResource("output.csv")) 
        .delimited()
        .delimiter(",")
        .names("firstName", "lastName", "age")
        .build()
}

JSON 文件写入(JsonFileItemWriter)

kotlin
@Bean
fun jsonFileWriter(): JsonFileItemWriter<Person> {
    return JsonFileItemWriterBuilder<Person>()
        .name("jsonWriter")
        .resource(FileSystemResource("output.json"))
        .jsonObjectMarshaller(JacksonJsonObjectMarshaller()) 
        .build()
}

3.3 消息队列写入器

Kafka 消息写入(KafkaItemWriter)

kotlin
@Bean
fun kafkaItemWriter(): KafkaItemWriter<String, Person> {
    val props = mapOf(
        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"
    )
    
    return KafkaItemWriterBuilder<String, Person>()
        .kafkaTemplate(KafkaTemplate(DefaultKafkaProducerFactory(props)))
        .itemKeyMapper(Person::id) 
        .topic("processed-persons")
        .build()
}

3.4 高级写入模式

组合写入器(CompositeItemWriter)

kotlin
@Bean
fun compositeWriter(
    jdbcWriter: JdbcBatchItemWriter<Person>,
    kafkaWriter: KafkaItemWriter<String, Person>
): CompositeItemWriter<Person> {
    return CompositeItemWriterBuilder<Person>()
        .delegates(jdbcWriter, kafkaWriter) 
        .build()
}

条件写入(ClassifierCompositeItemWriter)

kotlin
@Bean
fun classifierWriter(
    fileWriter: FlatFileItemWriter<Person>,
    jdbcWriter: JdbcBatchItemWriter<Person>
): ClassifierCompositeItemWriter<Person> {
    
    val classifier = Classifier<Person, ItemWriter<out Person>> { person ->
        if (person.age > 100) fileWriter else jdbcWriter 
    }
    
    val writer = ClassifierCompositeItemWriter<Person>()
    writer.setClassifier(classifier)
    return writer
}

3.5 完整 ItemWriter 对比表

写入器类型实现类适用场景线程安全
数据库写入JdbcBatchItemWriterJDBC 批量操作
JpaItemWriterJPA 实体管理
文件写入FlatFileItemWriterCSV/TXT 文件
JsonFileItemWriterJSON 文件
StaxEventItemWriterXML 文件
消息队列KafkaItemWriterKafka 生产
AmqpItemWriterRabbitMQ 生产
NoSQLMongoItemWriterMongoDB 保存
Neo4jItemWriterNeo4j 图数据库

四、最佳实践与性能优化 🚀

4.1 读写优化策略

kotlin
@Bean
fun optimizedReader(): JpaPagingItemReader<Person> {
    return JpaPagingItemReaderBuilder<Person>()
        .pageSize(500) // [!code ++] // 增大分页尺寸
        .queryString("SELECT p FROM Person p")
        .build()
}
kotlin
@Bean
fun optimizedWriter(): JdbcBatchItemWriter<Person> {
    return JdbcBatchItemWriterBuilder<Person>()
        .assertUpdates(false) // [!code ++] // 关闭更新检查
        .itemSqlParameterSourceProvider(
            BeanPropertyItemSqlParameterSourceProvider()
        )
        .build()
}

4.2 错误处理机制

kotlin
@Bean
fun faultTolerantStep(
    reader: ItemReader<Person>,
    writer: ItemWriter<Person>
): Step {
    return stepBuilderFactory.get("safeStep")
        .chunk<Person, Person>(100)
        .reader(reader)
        .writer(writer)
        .faultTolerant()
        .skipLimit(10)  // 最大跳过10条错误记录
        .skip(Exception::class.java)
        .retryLimit(3)  // 最大重试3次
        .retry(DeadlockLoserDataAccessException::class.java)
        .build()
}

IMPORTANT

关键配置建议

  1. 使用 @EnableBatchProcessing 开启批处理功能
  2. 大数据集优先使用分页读取器
  3. 数据库写入开启批处理模式
  4. 生产环境必须配置错误处理机制

五、总结与应用场景

组件类型推荐使用场景典型实现
ItemReader文件数据导入FlatFileItemReader
数据库批量导出JdbcPagingItemReader
消息队列消费KafkaItemReader
ItemWriter数据库批量写入JdbcBatchItemWriter
数据文件导出FlatFileItemWriter
消息队列生产KafkaItemWriter

选择原则

  1. 数据来源决定Reader:数据库选JDBC/JPA,文件选FlatFile/JSON
  2. 写入目标决定Writer:关系数据库用JdbcBatch,NoSQL用对应实现
  3. 数据量决定策略:大数据集用分页+批处理
  4. 容错需求:配置跳过/重试策略 ::>

通过合理组合 ItemReader 和 ItemWriter,您可以构建出高效可靠的批处理应用 ✅。在实际项目中,建议根据数据源特性和业务需求灵活选择合适的实现方案。