Appearance
Spring Batch ItemReader 与 ItemWriter 完全指南 🚀
核心作用:ItemReader 负责数据读取,ItemWriter 负责数据写入,二者共同构成 Spring Batch 的数据处理核心组件 ⚙️
一、核心概念与设计原理
1.1 Spring Batch 处理模型
1.2 核心接口定义
kotlin
// ItemReader 基础接口
interface ItemReader<T> {
fun read(): T? // 返回单个数据项或null(结束)
}
// ItemWriter 基础接口
interface ItemWriter<T> {
fun write(items: List<T>) // 批量写入数据
}
二、ItemReader 详解
2.1 文件数据读取器
FlatFileItemReader(CSV/TXT文件)
kotlin
@Bean
fun flatFileItemReader(): FlatFileItemReader<Person> {
return FlatFileItemReaderBuilder<Person>()
.name("personReader")
.resource(ClassPathResource("data.csv"))
.delimited()
.names("firstName", "lastName", "age")
.fieldSetMapper { fieldSet ->
Person(
firstName = fieldSet.readString("firstName"),
lastName = fieldSet.readString("lastName"),
age = fieldSet.readInt("age")
)
}
.build()
}
文件读取注意事项
- 大文件处理需配置
linesToSkip
跳过标题行 - 设置
saveState=true
支持断点续读 - 非线程安全,禁止多线程并发访问
JSON 文件读取(JsonItemReader)
kotlin
@Bean
fun jsonItemReader(): JsonItemReader<Person> {
val mapper = jacksonObjectMapper()
return JsonItemReaderBuilder<Person>()
.name("jsonReader")
.resource(ClassPathResource("data.json"))
.jsonObjectReader(JacksonJsonObjectReader(mapper))
.build()
}
2.2 数据库读取器
JDBC 游标读取(JdbcCursorItemReader)
kotlin
@Bean
fun jdbcCursorReader(dataSource: DataSource): JdbcCursorItemReader<Person> {
return JdbcCursorItemReaderBuilder<Person>()
.name("cursorReader")
.dataSource(dataSource)
.sql("SELECT first_name, last_name, age FROM people")
.rowMapper { rs, _ ->
Person(
firstName = rs.getString("first_name"),
lastName = rs.getString("last_name"),
age = rs.getInt("age")
)
}
.build()
}
CAUTION
游标读取风险:长时间保持数据库连接,可能导致连接池耗尽
JPA 分页读取(JpaPagingItemReader)
kotlin
@Bean
fun jpaPagingReader(entityManagerFactory: EntityManagerFactory): JpaPagingItemReader<Person> {
return JpaPagingItemReaderBuilder<Person>()
.name("pagingReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT p FROM Person p WHERE p.age > :minAge")
.parameterValues(mapOf("minAge" to 18))
.pageSize(100) // 每页100条
.build()
}
2.3 消息队列读取器
Kafka 消息读取(KafkaItemReader)
kotlin
@Bean
fun kafkaItemReader(): KafkaItemReader<String, Person> {
val props = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG to "batch-group"
)
return KafkaItemReaderBuilder<String, Person>()
.name("kafkaReader")
.consumerProperties(props)
.topic("person-topic")
.partitions(0,1,2) // 监听的分区
.keyValueMapper { _, value ->
objectMapper.readValue(value, Person::class.java)
}
.build()
}
2.4 完整 ItemReader 对比表
读取器类型 | 实现类 | 适用场景 | 线程安全 |
---|---|---|---|
文件读取 | FlatFileItemReader | CSV/TXT 文件 | ❌ |
JsonItemReader | JSON 文件 | ❌ | |
StaxEventItemReader | XML 文件 | ❌ | |
数据库读取 | JdbcCursorItemReader | 小数据集 | ❌ |
JdbcPagingItemReader | 大数据集 | ✅ | |
JpaPagingItemReader | JPA 分页查询 | ✅ | |
消息队列 | KafkaItemReader | Kafka 消费 | ❌ |
AmqpItemReader | RabbitMQ 消费 | ✅ | |
NoSQL | MongoItemReader | MongoDB 查询 | ✅ |
Neo4jItemReader | Neo4j 图数据库 | ✅ |
三、ItemWriter 详解
3.1 数据库写入器
JDBC 批量写入(JdbcBatchItemWriter)
kotlin
@Bean
fun jdbcBatchWriter(dataSource: DataSource): JdbcBatchItemWriter<Person> {
return JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource)
.sql("INSERT INTO people (first_name, last_name, age) VALUES (:firstName, :lastName, :age)")
.beanMapped() // 自动映射Bean属性
.build()
}
JPA 实体写入(JpaItemWriter)
kotlin
@Bean
fun jpaItemWriter(entityManagerFactory: EntityManagerFactory): JpaItemWriter<Person> {
val writer = JpaItemWriter<Person>()
writer.entityManagerFactory = entityManagerFactory
return writer
}
3.2 文件写入器
CSV 文件写入(FlatFileItemWriter)
kotlin
@Bean
fun csvFileWriter(): FlatFileItemWriter<Person> {
return FlatFileItemWriterBuilder<Person>()
.name("csvWriter")
.resource(FileSystemResource("output.csv"))
.delimited()
.delimiter(",")
.names("firstName", "lastName", "age")
.build()
}
JSON 文件写入(JsonFileItemWriter)
kotlin
@Bean
fun jsonFileWriter(): JsonFileItemWriter<Person> {
return JsonFileItemWriterBuilder<Person>()
.name("jsonWriter")
.resource(FileSystemResource("output.json"))
.jsonObjectMarshaller(JacksonJsonObjectMarshaller())
.build()
}
3.3 消息队列写入器
Kafka 消息写入(KafkaItemWriter)
kotlin
@Bean
fun kafkaItemWriter(): KafkaItemWriter<String, Person> {
val props = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"
)
return KafkaItemWriterBuilder<String, Person>()
.kafkaTemplate(KafkaTemplate(DefaultKafkaProducerFactory(props)))
.itemKeyMapper(Person::id)
.topic("processed-persons")
.build()
}
3.4 高级写入模式
组合写入器(CompositeItemWriter)
kotlin
@Bean
fun compositeWriter(
jdbcWriter: JdbcBatchItemWriter<Person>,
kafkaWriter: KafkaItemWriter<String, Person>
): CompositeItemWriter<Person> {
return CompositeItemWriterBuilder<Person>()
.delegates(jdbcWriter, kafkaWriter)
.build()
}
条件写入(ClassifierCompositeItemWriter)
kotlin
@Bean
fun classifierWriter(
fileWriter: FlatFileItemWriter<Person>,
jdbcWriter: JdbcBatchItemWriter<Person>
): ClassifierCompositeItemWriter<Person> {
val classifier = Classifier<Person, ItemWriter<out Person>> { person ->
if (person.age > 100) fileWriter else jdbcWriter
}
val writer = ClassifierCompositeItemWriter<Person>()
writer.setClassifier(classifier)
return writer
}
3.5 完整 ItemWriter 对比表
写入器类型 | 实现类 | 适用场景 | 线程安全 |
---|---|---|---|
数据库写入 | JdbcBatchItemWriter | JDBC 批量操作 | ✅ |
JpaItemWriter | JPA 实体管理 | ✅ | |
文件写入 | FlatFileItemWriter | CSV/TXT 文件 | ❌ |
JsonFileItemWriter | JSON 文件 | ❌ | |
StaxEventItemWriter | XML 文件 | ❌ | |
消息队列 | KafkaItemWriter | Kafka 生产 | ❌ |
AmqpItemWriter | RabbitMQ 生产 | ✅ | |
NoSQL | MongoItemWriter | MongoDB 保存 | ✅ |
Neo4jItemWriter | Neo4j 图数据库 | ✅ |
四、最佳实践与性能优化 🚀
4.1 读写优化策略
kotlin
@Bean
fun optimizedReader(): JpaPagingItemReader<Person> {
return JpaPagingItemReaderBuilder<Person>()
.pageSize(500) // [!code ++] // 增大分页尺寸
.queryString("SELECT p FROM Person p")
.build()
}
kotlin
@Bean
fun optimizedWriter(): JdbcBatchItemWriter<Person> {
return JdbcBatchItemWriterBuilder<Person>()
.assertUpdates(false) // [!code ++] // 关闭更新检查
.itemSqlParameterSourceProvider(
BeanPropertyItemSqlParameterSourceProvider()
)
.build()
}
4.2 错误处理机制
kotlin
@Bean
fun faultTolerantStep(
reader: ItemReader<Person>,
writer: ItemWriter<Person>
): Step {
return stepBuilderFactory.get("safeStep")
.chunk<Person, Person>(100)
.reader(reader)
.writer(writer)
.faultTolerant()
.skipLimit(10) // 最大跳过10条错误记录
.skip(Exception::class.java)
.retryLimit(3) // 最大重试3次
.retry(DeadlockLoserDataAccessException::class.java)
.build()
}
IMPORTANT
关键配置建议:
- 使用
@EnableBatchProcessing
开启批处理功能 - 大数据集优先使用分页读取器
- 数据库写入开启批处理模式
- 生产环境必须配置错误处理机制
五、总结与应用场景
组件类型 | 推荐使用场景 | 典型实现 |
---|---|---|
ItemReader | 文件数据导入 | FlatFileItemReader |
数据库批量导出 | JdbcPagingItemReader | |
消息队列消费 | KafkaItemReader | |
ItemWriter | 数据库批量写入 | JdbcBatchItemWriter |
数据文件导出 | FlatFileItemWriter | |
消息队列生产 | KafkaItemWriter |
选择原则
- 数据来源决定Reader:数据库选JDBC/JPA,文件选FlatFile/JSON
- 写入目标决定Writer:关系数据库用JdbcBatch,NoSQL用对应实现
- 数据量决定策略:大数据集用分页+批处理
- 容错需求:配置跳过/重试策略 ::>
通过合理组合 ItemReader 和 ItemWriter,您可以构建出高效可靠的批处理应用 ✅。在实际项目中,建议根据数据源特性和业务需求灵活选择合适的实现方案。