Appearance
Spring Batch ItemReader 与 ItemWriter 实现详解
目录
📚 装饰器模式
装饰器模式为已有读写器添加额外功能而不改变其核心逻辑。Spring Batch 提供多种装饰器:
🧵 同步读写器 (SynchronizedItemStreamReader/Writer
)
使非线程安全的读写器可在多线程环境下使用:
kotlin
@Bean
fun itemReader(): SynchronizedItemStreamReader<Person> {
val flatFileItemReader = FlatFileItemReaderBuilder<Person>()
.name("personReader")
.resource(ClassPathResource("persons.csv"))
.delimited()
.names("firstName", "lastName")
.targetType(Person::class.java)
.build()
return SynchronizedItemStreamReaderBuilder<Person>()
.delegate(flatFileItemReader)
.build()
}
kotlin
@Bean
fun itemWriter(): SynchronizedItemStreamWriter<Person> {
val flatFileItemWriter = FlatFileItemWriterBuilder<Person>()
.name("personWriter")
.resource(FileSystemResource("output.txt"))
.delimited()
.delimiter(",")
.names("firstName", "lastName")
.build()
return SynchronizedItemStreamWriterBuilder<Person>()
.delegate(flatFileItemWriter)
.build()
}
WARNING
使用同步装饰器会增加锁开销,可能影响性能。仅在多线程步骤中必须使用时才添加。
👀 可查看读取器 (SingleItemPeekableItemReader
)
提供peek()
方法预览下一条记录:
📂 多资源写入器 (MultiResourceItemWriter
)
当单文件过大时自动分割文件:
kotlin
@Bean
fun multiResourceWriter(): MultiResourceItemWriter<String> {
val delegate = FlatFileItemWriterBuilder<String>()
.name("delegateWriter")
.resource(FileSystemResource("output.txt"))
.lineAggregator { it }
.build()
return MultiResourceItemWriterBuilder<String>()
.name("multiResourceWriter")
.delegate(delegate)
.itemCountLimitPerResource(1000) // 每文件1000条记录
.resource(FileSystemResource("output"))
.resourceSuffixCreator { index -> "_$index.txt" }
.build()
}
🗂️ 分类复合写入器 (ClassifierCompositeItemWriter
)
根据规则将数据路由到不同写入器:
kotlin
@Bean
fun classifierWriter(): ClassifierCompositeItemWriter<Transaction> {
val cashWriter = CashTransactionWriter()
val creditWriter = CreditTransactionWriter()
val classifier = Classifier<Transaction, ItemWriter<out Transaction>> { transaction ->
when (transaction.type) {
TransactionType.CASH -> cashWriter
TransactionType.CREDIT -> creditWriter
}
}
return ClassifierCompositeItemWriterBuilder<Transaction>()
.classifier(classifier)
.build()
}
✉️ 消息系统读写器
RabbitMQ 读写器 (AmqpItemReader/Writer
)
kotlin
// 读取器
@Bean
fun amqpReader(): AmqpItemReader<Message> {
return AmqpItemReaderBuilder<Message>()
.amqpTemplate(rabbitTemplate)
.build()
}
// 写入器
@Bean
fun amqpWriter(): AmqpItemWriter<Message> {
return AmqpItemWriterBuilder<Message>()
.amqpTemplate(rabbitTemplate)
.build()
}
Kafka 读写器 (KafkaItemReader/Writer
)
kotlin
// 读取器
@Bean
fun kafkaReader(): KafkaItemReader<String, String> {
val props = Properties().apply {
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
put(ConsumerConfig.GROUP_ID_CONFIG, "batch-group")
}
return KafkaItemReaderBuilder<String, String>()
.name("kafkaReader")
.consumerProperties(props)
.partitions(0,1,2) // 消费指定分区
.topic("batch-topic")
.build()
}
// 写入器
@Bean
fun kafkaWriter(): KafkaItemWriter<String, String> {
return KafkaItemWriterBuilder<String, String>()
.kafkaTemplate(kafkaTemplate)
.build()
}
💾 数据库读写器
MongoDB 读写器 (MongoItemReader/Writer
)
kotlin
// 读取器
@Bean
fun mongoReader(mongoTemplate: MongoTemplate): MongoItemReader<User> {
return MongoItemReaderBuilder<User>()
.name("userReader")
.template(mongoTemplate)
.targetType(User::class.java)
.jsonQuery("{ age: { \$gt: 18 } }")
.sorts(sortBy(Sort.Direction.ASC, "name"))
.build()
}
// 写入器
@Bean
fun mongoWriter(mongoTemplate: MongoTemplate): MongoItemWriter<User> {
return MongoItemWriterBuilder<User>()
.template(mongoTemplate)
.collection("users")
.build()
}
JPA 写入器 (JpaItemWriter
)
kotlin
@Bean
fun jpaWriter(entityManagerFactory: EntityManagerFactory): JpaItemWriter<User> {
return JpaItemWriterBuilder<User>()
.entityManagerFactory(entityManagerFactory)
.build()
}
JDBC 批处理写入器 (JdbcBatchItemWriter
)
kotlin
@Bean
fun jdbcWriter(dataSource: DataSource): JdbcBatchItemWriter<User> {
return JdbcBatchItemWriterBuilder<User>()
.dataSource(dataSource)
.sql("INSERT INTO users (name, email) VALUES (:name, :email)")
.beanMapped()
.build()
}
🔍 专用读写器
LDAP 读取器 (LdifReader
)
kotlin
@Bean
fun ldifReader(): LdifReader {
return LdifReaderBuilder()
.resource(ClassPathResource("users.ldif"))
.name("ldifReader")
.build()
}
Avro 文件处理器 (AvroItemReader/Writer
)
kotlin
// 读取器
@Bean
fun avroReader(): AvroItemReader<User> {
return AvroItemReaderBuilder<User>()
.name("avroReader")
.resource(FileSystemResource("users.avro"))
.type(User::class.java)
.build()
}
// 写入器
@Bean
fun avroWriter(): AvroItemWriter<User> {
return AvroItemWriterBuilder<User>()
.resource(FileSystemResource("output.avro"))
.type(User::class.java)
.build()
}
邮件写入器 (SimpleMailMessageItemWriter
)
kotlin
@Bean
fun mailWriter(mailSender: JavaMailSender): SimpleMailMessageItemWriter {
return SimpleMailMessageItemWriterBuilder()
.mailSender(mailSender)
.build()
}
⚙️ 专用处理器
脚本处理器 (ScriptItemProcessor
)
使用脚本语言处理数据:
kotlin
@Bean
fun scriptProcessor(): ScriptItemProcessor<User, User> {
return ScriptItemProcessorBuilder<User, User>()
.script(FileSystemResource("transform.groovy"))
.build()
}
groovy:transform.groovy
// [!code highlight:1-3]
if (item.age > 65) {
item.status = "SENIOR"
}
return item
最佳实践
- 线程安全优先:多线程环境下优先选择同步装饰器
- 资源管理:使用
MultiResourceItemWriter
处理大数据量 - 错误处理:为消息系统读写器配置重试机制
- 性能优化:JDBC写入器使用批处理提升效率
通过合理组合这些组件,您可以构建高效、灵活的批处理作业。每种实现都针对特定场景优化,选择最适合您需求的实现方案。