Appearance
Spring Batch ItemReader 详解:数据读取的艺术
本文将带你深入理解 Spring Batch 的核心组件 ItemReader,掌握高效处理不同数据源的技巧
🧩 ItemReader 核心概念
ItemReader
是 Spring Batch 中负责数据读取的核心接口。它就像一个智能的数据管道,能够从各种数据源(文件、数据库、XML等)读取数据,并将其转换为 Spring Batch 可处理的格式。
类比理解
想象 ItemReader 就像一个多功能的阅读器:
- 对文件:它是高效的扫描仪
- 对数据库:它是智能的 SQL 执行器
- 对 XML:它是精密的解析器
ItemReader 接口定义
kotlin
interface ItemReader<T> {
fun read(): T?
throws Exception,
UnexpectedInputException,
ParseException,
NonTransientResourceException
}
IMPORTANT
read()
方法是 ItemReader 的核心契约:
- 每次调用返回一个数据项
- 没有更多数据时返回
null
- 支持多种异常处理场景
📂 三种主要数据源处理方式
1. 平面文件处理 (Flat File)
适用于 CSV、固定宽度文本等格式文件
kotlin
@Configuration
open class FlatFileReaderConfig {
@Bean
open fun customerItemReader(): FlatFileItemReader<Customer> {
return FlatFileItemReaderBuilder<Customer>()
.name("customerItemReader")
.resource(ClassPathResource("data/customers.csv"))
.delimited()
.names("firstName", "lastName", "email")
.fieldSetMapper(BeanWrapperFieldSetMapper<Customer>().apply {
setTargetType(Customer::class.java)
})
.build()
}
}
注意事项
- 确保文件路径正确,否则会抛出
NonTransientResourceException
- 列名定义必须与 CSV 头部匹配
- 大型文件处理时考虑分块读取(chunk processing)
2. XML 文件处理
使用 StAX 解析器高效处理 XML 数据
kotlin
@Configuration
open class XmlReaderConfig {
@Bean
open fun tradeItemReader(): StaxEventItemReader<Trade> {
return StaxEventItemReaderBuilder<Trade>()
.name("tradeItemReader")
.resource(ClassPathResource("data/trades.xml"))
.addFragmentRootElements("trade")
.unmarshaller(Jaxb2Marshaller().apply {
setClassesToBeBound(Trade::class.java)
})
.build()
}
}
最佳实践
- 使用 XSD 验证 XML 结构有效性
- 复杂 XML 考虑使用 XPath 表达式定位元素
- 优先使用 StAX 而非 DOM 解析器(内存效率更高)
3. 数据库读取
通过 JDBC 或 ORM 框架访问关系型数据库
kotlin
@Configuration
open class DatabaseReaderConfig {
@Bean
open fun productItemReader(dataSource: DataSource): JdbcCursorItemReader<Product> {
return JdbcCursorItemReaderBuilder<Product>()
.name("productItemReader")
.dataSource(dataSource)
.sql("SELECT id, name, price FROM products WHERE status = 'ACTIVE'")
.rowMapper(BeanPropertyRowMapper(Product::class.java))
.build()
}
}
性能警告
WARNING
处理百万级数据时:
- 避免使用
JdbcCursorItemReader
(内存消耗大) - 改用
JdbcPagingItemReader
分页查询 - 确保 SQL 有合适的索引 :::
🔄 ItemReader 工作流程
NOTE
关键流程说明:
read()
方法被重复调用直到返回null
- 每个数据项独立处理(支持事务回滚)
- 无数据时正常结束(不抛出异常)
⚙️ 高级特性与最佳实践
重启机制
当作业意外中断后重启时:
kotlin
@Bean
open fun restartableReader(): JdbcCursorItemReader<Order> {
return JdbcCursorItemReaderBuilder<Order>()
.name("orderReader")
.dataSource(dataSource)
.sql("SELECT id, amount FROM orders ORDER BY id")
.rowMapper(BeanPropertyRowMapper(Order::class.java))
.currentItemCount(100) // [!code highlight] // 从第100条记录开始
.saveState(true) // [!code highlight] // 启用状态保存
.build()
}
重启要点
- 设置
saveState=true
保存读取位置 - 使用
currentItemCount
指定重启位置 - 确保数据源支持随机访问(数据库可以,文件流不行)
多数据源读取
kotlin
@Bean
open fun compositeReader(): CompositeItemReader<Data> {
val readers = listOf(
fileReader(),
dbReader(),
apiReader()
)
return CompositeItemReaderBuilder<Data>()
.delegates(readers)
.build()
}
kotlin
@Bean
open fun parallelReader(): SynchronizedItemStreamReader<Data> {
return SynchronizedItemStreamReaderBuilder<Data>()
.delegate(fileReader())
.build()
}
@Bean
open fun parallelStep(): Step {
return stepBuilderFactory.get("parallelStep")
.<Data, Data>chunk(100)
.reader(parallelReader())
.taskExecutor(taskExecutor()) // [!code highlight] // 启用多线程
.build()
}
🚫 常见错误与解决方案
kotlin
fun read(): Customer? {
try {
// 读取逻辑...
} catch (ex: Exception) {
when (ex) {
is FlatFileParseException -> {
// 文件解析错误处理
logger.error("行 ${ex.lineNumber} 解析失败: ${ex.input}")
}
is SQLException -> {
// 数据库错误处理
throw NonTransientResourceException("数据库访问错误", ex)
}
else -> throw ex
}
}
return null
}
错误类型 | 原因 | 解决方案 |
---|---|---|
UnexpectedInputException | 输入格式不符 | 验证输入源格式一致性 |
ParseException | 数据转换失败 | 检查字段映射规则 |
NonTransientResourceException | 资源不可用 | 检查文件/DB连接可用性 |
📚 扩展学习资源
- 官方 ItemReader 实现列表
- 自定义 ItemReader 模板:
kotlin
class CustomItemReader : ItemReader<Data> {
private var items = listOf(...)
private var index = 0
override fun read(): Data? {
return if (index < items.size) {
items[index++]
} else {
null // 返回null表示读取完成
}
}
}
- 性能优化技巧:
- 批处理大小 = 事务块大小
- 数据库读取使用分页(Paging)
- 文件处理使用缓冲流
::: success 总结 掌握 ItemReader
是高效批处理的关键!通过本文您已学会: ✅ 三种主要数据源的读取方法
✅ 重启机制与状态管理
✅ 异常处理最佳实践
✅ 性能优化核心技巧
现在可以尝试在您的批处理作业中应用这些知识了! :::