Skip to content

Spring Batch ItemReader 与 ItemWriter 实现详解

目录

📚 装饰器模式

装饰器模式为已有读写器添加额外功能而不改变其核心逻辑。Spring Batch 提供多种装饰器:

🧵 同步读写器 (SynchronizedItemStreamReader/Writer)

使非线程安全的读写器可在多线程环境下使用:

kotlin
@Bean
fun itemReader(): SynchronizedItemStreamReader<Person> {
    val flatFileItemReader = FlatFileItemReaderBuilder<Person>()
        .name("personReader")
        .resource(ClassPathResource("persons.csv"))
        .delimited()
        .names("firstName", "lastName")
        .targetType(Person::class.java)
        .build()
    
    return SynchronizedItemStreamReaderBuilder<Person>()
        .delegate(flatFileItemReader)
        .build()
}
kotlin
@Bean
fun itemWriter(): SynchronizedItemStreamWriter<Person> {
    val flatFileItemWriter = FlatFileItemWriterBuilder<Person>()
        .name("personWriter")
        .resource(FileSystemResource("output.txt"))
        .delimited()
        .delimiter(",")
        .names("firstName", "lastName")
        .build()
    
    return SynchronizedItemStreamWriterBuilder<Person>()
        .delegate(flatFileItemWriter)
        .build()
}

WARNING

使用同步装饰器会增加锁开销,可能影响性能。仅在多线程步骤中必须使用时才添加。

👀 可查看读取器 (SingleItemPeekableItemReader)

提供peek()方法预览下一条记录:

📂 多资源写入器 (MultiResourceItemWriter)

当单文件过大时自动分割文件:

kotlin
@Bean
fun multiResourceWriter(): MultiResourceItemWriter<String> {
    val delegate = FlatFileItemWriterBuilder<String>()
        .name("delegateWriter")
        .resource(FileSystemResource("output.txt"))
        .lineAggregator { it }
        .build()
    
    return MultiResourceItemWriterBuilder<String>()
        .name("multiResourceWriter")
        .delegate(delegate)
        .itemCountLimitPerResource(1000) // 每文件1000条记录
        .resource(FileSystemResource("output"))
        .resourceSuffixCreator { index -> "_$index.txt" }
        .build()
}

🗂️ 分类复合写入器 (ClassifierCompositeItemWriter)

根据规则将数据路由到不同写入器:

kotlin
@Bean
fun classifierWriter(): ClassifierCompositeItemWriter<Transaction> {
    val cashWriter = CashTransactionWriter()
    val creditWriter = CreditTransactionWriter()
    
    val classifier = Classifier<Transaction, ItemWriter<out Transaction>> { transaction ->
        when (transaction.type) {
            TransactionType.CASH -> cashWriter
            TransactionType.CREDIT -> creditWriter
        }
    }
    
    return ClassifierCompositeItemWriterBuilder<Transaction>()
        .classifier(classifier)
        .build()
}

✉️ 消息系统读写器

RabbitMQ 读写器 (AmqpItemReader/Writer)

kotlin
// 读取器
@Bean
fun amqpReader(): AmqpItemReader<Message> {
    return AmqpItemReaderBuilder<Message>()
        .amqpTemplate(rabbitTemplate)
        .build()
}

// 写入器
@Bean
fun amqpWriter(): AmqpItemWriter<Message> {
    return AmqpItemWriterBuilder<Message>()
        .amqpTemplate(rabbitTemplate)
        .build()
}

Kafka 读写器 (KafkaItemReader/Writer)

kotlin
// 读取器
@Bean
fun kafkaReader(): KafkaItemReader<String, String> {
    val props = Properties().apply {
        put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        put(ConsumerConfig.GROUP_ID_CONFIG, "batch-group")
    }
    
    return KafkaItemReaderBuilder<String, String>()
        .name("kafkaReader")
        .consumerProperties(props)
        .partitions(0,1,2) // 消费指定分区
        .topic("batch-topic")
        .build()
}

// 写入器
@Bean
fun kafkaWriter(): KafkaItemWriter<String, String> {
    return KafkaItemWriterBuilder<String, String>()
        .kafkaTemplate(kafkaTemplate)
        .build()
}

💾 数据库读写器

MongoDB 读写器 (MongoItemReader/Writer)

kotlin
// 读取器
@Bean
fun mongoReader(mongoTemplate: MongoTemplate): MongoItemReader<User> {
    return MongoItemReaderBuilder<User>()
        .name("userReader")
        .template(mongoTemplate)
        .targetType(User::class.java)
        .jsonQuery("{ age: { \$gt: 18 } }")
        .sorts(sortBy(Sort.Direction.ASC, "name"))
        .build()
}

// 写入器
@Bean
fun mongoWriter(mongoTemplate: MongoTemplate): MongoItemWriter<User> {
    return MongoItemWriterBuilder<User>()
        .template(mongoTemplate)
        .collection("users")
        .build()
}

JPA 写入器 (JpaItemWriter)

kotlin
@Bean
fun jpaWriter(entityManagerFactory: EntityManagerFactory): JpaItemWriter<User> {
    return JpaItemWriterBuilder<User>()
        .entityManagerFactory(entityManagerFactory)
        .build()
}

JDBC 批处理写入器 (JdbcBatchItemWriter)

kotlin
@Bean
fun jdbcWriter(dataSource: DataSource): JdbcBatchItemWriter<User> {
    return JdbcBatchItemWriterBuilder<User>()
        .dataSource(dataSource)
        .sql("INSERT INTO users (name, email) VALUES (:name, :email)")
        .beanMapped()
        .build()
}

🔍 专用读写器

LDAP 读取器 (LdifReader)

kotlin
@Bean
fun ldifReader(): LdifReader {
    return LdifReaderBuilder()
        .resource(ClassPathResource("users.ldif"))
        .name("ldifReader")
        .build()
}

Avro 文件处理器 (AvroItemReader/Writer)

kotlin
// 读取器
@Bean
fun avroReader(): AvroItemReader<User> {
    return AvroItemReaderBuilder<User>()
        .name("avroReader")
        .resource(FileSystemResource("users.avro"))
        .type(User::class.java)
        .build()
}

// 写入器
@Bean
fun avroWriter(): AvroItemWriter<User> {
    return AvroItemWriterBuilder<User>()
        .resource(FileSystemResource("output.avro"))
        .type(User::class.java)
        .build()
}

邮件写入器 (SimpleMailMessageItemWriter)

kotlin
@Bean
fun mailWriter(mailSender: JavaMailSender): SimpleMailMessageItemWriter {
    return SimpleMailMessageItemWriterBuilder()
        .mailSender(mailSender)
        .build()
}

⚙️ 专用处理器

脚本处理器 (ScriptItemProcessor)

使用脚本语言处理数据:

kotlin
@Bean
fun scriptProcessor(): ScriptItemProcessor<User, User> {
    return ScriptItemProcessorBuilder<User, User>()
        .script(FileSystemResource("transform.groovy"))
        .build()
}
groovy:transform.groovy
// [!code highlight:1-3]
if (item.age > 65) {
    item.status = "SENIOR"
}
return item

最佳实践

  1. 线程安全优先:多线程环境下优先选择同步装饰器
  2. 资源管理:使用MultiResourceItemWriter处理大数据量
  3. 错误处理:为消息系统读写器配置重试机制
  4. 性能优化:JDBC写入器使用批处理提升效率

通过合理组合这些组件,您可以构建高效、灵活的批处理作业。每种实现都针对特定场景优化,选择最适合您需求的实现方案。