Skip to content

Spring Batch ItemReader 详解:数据读取的艺术

本文将带你深入理解 Spring Batch 的核心组件 ItemReader,掌握高效处理不同数据源的技巧

🧩 ItemReader 核心概念

ItemReader 是 Spring Batch 中负责数据读取的核心接口。它就像一个智能的数据管道,能够从各种数据源(文件、数据库、XML等)读取数据,并将其转换为 Spring Batch 可处理的格式。

类比理解

想象 ItemReader 就像一个多功能的阅读器

  • 对文件:它是高效的扫描仪
  • 对数据库:它是智能的 SQL 执行器
  • 对 XML:它是精密的解析器

ItemReader 接口定义

kotlin
interface ItemReader<T> {
    fun read(): T?
        throws Exception,
        UnexpectedInputException,
        ParseException,
        NonTransientResourceException
}

IMPORTANT

read() 方法是 ItemReader 的核心契约:

  • 每次调用返回一个数据项
  • 没有更多数据时返回 null
  • 支持多种异常处理场景

📂 三种主要数据源处理方式

1. 平面文件处理 (Flat File)

适用于 CSV、固定宽度文本等格式文件

kotlin
@Configuration
open class FlatFileReaderConfig {

    @Bean
    open fun customerItemReader(): FlatFileItemReader<Customer> {
        return FlatFileItemReaderBuilder<Customer>()
            .name("customerItemReader")
            .resource(ClassPathResource("data/customers.csv")) 
            .delimited()
            .names("firstName", "lastName", "email") 
            .fieldSetMapper(BeanWrapperFieldSetMapper<Customer>().apply {
                setTargetType(Customer::class.java)
            })
            .build()
    }
}

注意事项

  1. 确保文件路径正确,否则会抛出 NonTransientResourceException
  2. 列名定义必须与 CSV 头部匹配
  3. 大型文件处理时考虑分块读取(chunk processing)

2. XML 文件处理

使用 StAX 解析器高效处理 XML 数据

kotlin
@Configuration
open class XmlReaderConfig {

    @Bean
    open fun tradeItemReader(): StaxEventItemReader<Trade> {
        return StaxEventItemReaderBuilder<Trade>()
            .name("tradeItemReader")
            .resource(ClassPathResource("data/trades.xml")) 
            .addFragmentRootElements("trade") 
            .unmarshaller(Jaxb2Marshaller().apply {
                setClassesToBeBound(Trade::class.java)
            })
            .build()
    }
}

最佳实践

  1. 使用 XSD 验证 XML 结构有效性
  2. 复杂 XML 考虑使用 XPath 表达式定位元素
  3. 优先使用 StAX 而非 DOM 解析器(内存效率更高)

3. 数据库读取

通过 JDBC 或 ORM 框架访问关系型数据库

kotlin
@Configuration
open class DatabaseReaderConfig {

    @Bean
    open fun productItemReader(dataSource: DataSource): JdbcCursorItemReader<Product> {
        return JdbcCursorItemReaderBuilder<Product>()
            .name("productItemReader")
            .dataSource(dataSource) 
            .sql("SELECT id, name, price FROM products WHERE status = 'ACTIVE'") 
            .rowMapper(BeanPropertyRowMapper(Product::class.java))
            .build()
    }
}

性能警告

WARNING

处理百万级数据时:

  • 避免使用 JdbcCursorItemReader(内存消耗大)
  • 改用 JdbcPagingItemReader 分页查询
  • 确保 SQL 有合适的索引 :::

🔄 ItemReader 工作流程

NOTE

关键流程说明

  1. read() 方法被重复调用直到返回 null
  2. 每个数据项独立处理(支持事务回滚)
  3. 无数据时正常结束(不抛出异常)

⚙️ 高级特性与最佳实践

重启机制

当作业意外中断后重启时:

kotlin
@Bean
open fun restartableReader(): JdbcCursorItemReader<Order> {
    return JdbcCursorItemReaderBuilder<Order>()
        .name("orderReader")
        .dataSource(dataSource)
        .sql("SELECT id, amount FROM orders ORDER BY id")
        .rowMapper(BeanPropertyRowMapper(Order::class.java))
        .currentItemCount(100) // [!code highlight] // 从第100条记录开始
        .saveState(true) // [!code highlight] // 启用状态保存
        .build()
}

重启要点

  1. 设置 saveState=true 保存读取位置
  2. 使用 currentItemCount 指定重启位置
  3. 确保数据源支持随机访问(数据库可以,文件流不行)

多数据源读取

kotlin
@Bean
open fun compositeReader(): CompositeItemReader<Data> {
    val readers = listOf(
        fileReader(),
        dbReader(),
        apiReader()
    )

    return CompositeItemReaderBuilder<Data>()
        .delegates(readers) 
        .build()
}
kotlin
@Bean
open fun parallelReader(): SynchronizedItemStreamReader<Data> {
    return SynchronizedItemStreamReaderBuilder<Data>()
        .delegate(fileReader())
        .build()
}

@Bean
open fun parallelStep(): Step {
    return stepBuilderFactory.get("parallelStep")
        .<Data, Data>chunk(100)
        .reader(parallelReader())
        .taskExecutor(taskExecutor()) // [!code highlight] // 启用多线程
        .build()
}

🚫 常见错误与解决方案

kotlin
fun read(): Customer? {
    try {
        // 读取逻辑...
    } catch (ex: Exception) {
        when (ex) {
            is FlatFileParseException -> { 
                // 文件解析错误处理
                logger.error("行 ${ex.lineNumber} 解析失败: ${ex.input}")
            }
            is SQLException -> { 
                // 数据库错误处理
                throw NonTransientResourceException("数据库访问错误", ex)
            }
            else -> throw ex
        }
    }
    return null
}
错误类型原因解决方案
UnexpectedInputException输入格式不符验证输入源格式一致性
ParseException数据转换失败检查字段映射规则
NonTransientResourceException资源不可用检查文件/DB连接可用性

📚 扩展学习资源

  1. 官方 ItemReader 实现列表
  2. 自定义 ItemReader 模板:
kotlin
class CustomItemReader : ItemReader<Data> {
    private var items = listOf(...)
    private var index = 0

    override fun read(): Data? {
        return if (index < items.size) {
            items[index++] 
        } else {
            null // 返回null表示读取完成
        }
    }
}
  1. 性能优化技巧
    • 批处理大小 = 事务块大小
    • 数据库读取使用分页(Paging)
    • 文件处理使用缓冲流

::: success 总结 掌握 ItemReader 是高效批处理的关键!通过本文您已学会: ✅ 三种主要数据源的读取方法
✅ 重启机制与状态管理
✅ 异常处理最佳实践
✅ 性能优化核心技巧

现在可以尝试在您的批处理作业中应用这些知识了! :::