Skip to content

Spring Batch 数据库读写指南:游标与分页策略详解

引言:大数据集处理的挑战 🚀

在企业应用中,数据库是批处理的核心存储机制。但批处理面临独特挑战:需要处理的数据集往往非常庞大。传统SQL查询会一次性加载全部结果到内存(例如返回100万行数据),这会导致:

  • ⚠️ 内存溢出风险
  • ⛔️ 性能严重下降
  • 🔄 资源利用率低效

Spring Batch 提供了两种优雅的解决方案:

  1. 游标式读取:像水流一样逐行处理数据
  2. 分页式读取:将大数据集拆分成小块处理

下面我们将深入探讨这两种策略的实现细节。

一、基于游标的读取策略 🎯

1.1 游标工作原理

游标是数据库原生的"数据流"解决方案。Spring Batch 的游标式 ItemReader

  • 📌 初始化时打开数据库游标
  • ➡️ 每次 read() 调用向前移动游标
  • 🧩 返回映射后的对象供处理
  • 🔒 处理完成后关闭释放资源

TIP

游标 vs 传统JDBC
传统JdbcTemplate一次性加载所有数据,而游标式读取支持流式处理,内存占用恒定,适合大数据集场景。

1.2 JdbcCursorItemReader 实现

Kotlin 配置示例

kotlin
// [!code highlight:1-3] // 配置数据源和SQL
@Bean
fun customerCreditReader(dataSource: DataSource): JdbcCursorItemReader<CustomerCredit> {
    return JdbcCursorItemReaderBuilder<CustomerCredit>()
        .dataSource(dataSource)
        .name("creditReader")
        .sql("SELECT id, name, credit FROM customer")
        .rowMapper(CustomerCreditRowMapper())
        .fetchSize(100) // [!code warning] // 设置合理的fetchSize提升性能
        .build()
}

//  // 行映射器实现
class CustomerCreditRowMapper : RowMapper<CustomerCredit> {
    override fun mapRow(rs: ResultSet, rowNum: Int): CustomerCredit {
        return CustomerCredit(
            id = rs.getInt("id"),
            name = rs.getString("name"),
            credit = rs.getBigDecimal("credit")
        )
    }
}

关键配置属性

属性默认值说明
fetchSize-建议设置(如100-1000),减少数据库往返次数
maxRows-限制结果集最大行数
queryTimeout-SQL执行超时时间(秒)
verifyCursorPositionfalse设为true可防止RowMapper内意外移动游标
setUseSharedExtendedConnectionfalse为true时需配合ExtendedConnectionDataSourceProxy

CAUTION

生产环境建议
对于超过10万行的数据集,务必设置fetchSize(如500)和maxRows(如100000),避免内存溢出和长事务问题。

1.3 StoredProcedureItemReader 实现

当需要通过存储过程获取数据时使用,支持三种方式:

Kotlin 配置示例

kotlin
@Bean
fun storedProcedureReader(dataSource: DataSource): StoredProcedureItemReader<CustomerCredit> {
    val reader = StoredProcedureItemReader<CustomerCredit>()
    reader.apply {
        this.dataSource = dataSource
        procedureName = "get_customer_credits"
        parameters = listOf(
            SqlOutParameter("ref_cursor", Types.REF_CURSOR), //  // REF_CURSOR类型
            SqlParameter("min_credit", Types.DECIMAL),
            SqlParameter("status", Types.VARCHAR)
        )
        setPreparedStatementSetter { ps -> //  // 参数绑定
            ps.setBigDecimal(1, BigDecimal.valueOf(1000))
            ps.setString(2, "ACTIVE")
        }
        rowMapper = CustomerCreditRowMapper()
        refCursorPosition = 1 //  // 指定REF_CURSOR参数位置
    }
    return reader
}
存储过程示例(Oracle)
sql
CREATE OR REPLACE PROCEDURE get_customer_credits(
    p_ref_cursor OUT SYS_REFCURSOR,
    p_min_credit IN NUMBER,
    p_status IN VARCHAR2
) AS
BEGIN
    OPEN p_ref_cursor FOR
    SELECT id, name, credit
    FROM customer
    WHERE credit > p_min_credit
    AND status = p_status;
END;

IMPORTANT

跨数据库兼容性
不同数据库对存储过程游标的支持差异较大。Oracle使用REF_CURSOR,SQL Server直接返回结果集,PostgreSQL使用函数返回。请根据数据库类型调整配置。

二、分页式读取策略 📚

2.1 分页工作原理

分页策略将大数据集拆分为多个小页:

  1. 每页执行独立查询
  2. 指定页码和页大小
  3. 使用排序键保证数据一致性

WARNING

关键要求:排序字段必须有唯一约束!否则分页时可能丢失或重复数据。

2.2 JdbcPagingItemReader 实现

Kotlin 配置示例

kotlin
@Bean
fun jdbcPagingReader(dataSource: DataSource): JdbcPagingItemReader<CustomerCredit> {
    val parameterValues = mapOf("status" to "ACTIVE")

    return JdbcPagingItemReaderBuilder<CustomerCredit>()
        .name("pagingCreditReader")
        .dataSource(dataSource)
        .queryProvider(pagingQueryProvider()) //  // 分页查询提供者
        .parameterValues(parameterValues)
        .rowMapper(CustomerCreditRowMapper())
        .pageSize(500) //  // 每页记录数
        .build()
}

@Bean
fun pagingQueryProvider(): PagingQueryProvider {
    return SqlPagingQueryProviderFactoryBean().apply {
        setDataSource(dataSource)
        setSelectClause("SELECT id, name, credit")
        setFromClause("FROM customer")
        setWhereClause("WHERE status = :status")
        setSortKey("id") //  // 排序键必须有唯一约束
    }.`object` // [!code warning] // Kotlin特殊处理
}

2.3 JpaPagingItemReader 实现

Kotlin 配置示例

kotlin
@Bean
fun jpaPagingReader(entityManagerFactory: EntityManagerFactory): JpaPagingItemReader<CustomerCredit> {
    return JpaPagingItemReaderBuilder<CustomerCredit>()
        .name("jpaCreditReader")
        .entityManagerFactory(entityManagerFactory)
        .queryString("SELECT c FROM CustomerCredit c WHERE c.status = :status ORDER BY c.id")
        .parameterValues(mapOf("status" to "ACTIVE"))
        .pageSize(300) //  // JPA分页大小
        .build()
}

性能优化建议

场景推荐策略
简单查询JdbcPagingItemReader
复杂对象映射JpaPagingItemReader
超大数据集(>100万)游标 + 适当fetchSize
中等数据集(1万-100万)分页 + 页大小500-2000

三、数据库ItemWriter最佳实践 🖊️

3.1 核心挑战:批量写入的错误处理

3.2 解决方案:逐条刷新策略

kotlin
@Entity
class CustomJpaItemWriter : ItemWriter<CustomerCredit>() {

    @PersistenceContext
    private lateinit var entityManager: EntityManager

    override fun write(items: List<CustomerCredit>) {
        items.forEach { item ->
            entityManager.persist(item)
            entityManager.flush() //  // 关键:逐条刷新
            entityManager.detach(item) // 分离实体避免内存积累
        }
    }
}

3.3 错误处理策略对比

kotlin
@Bean
fun batchWriter(entityManagerFactory: EntityManagerFactory): JpaItemWriter<CustomerCredit> {
    return JpaItemWriter<CustomerCredit>().apply {
        setEntityManagerFactory(entityManagerFactory)
        // 默认批量写入,出错时无法定位具体条目
    }
}
kotlin
@Bean
fun safeJpaWriter(): ItemWriter<CustomerCredit> {
    return ItemWriter { items ->
        items.forEach { item ->
            entityManager.persist(item)
            entityManager.flush() // 立即刷新
            entityManager.detach(item) // 释放内存
        }
    }
}

DANGER

Hibernate用户特别注意
默认的HibernateItemWriter使用批量写入模式。在写入关键数据时,强烈建议实现自定义Writer并添加flush()逻辑,否则批处理失败时将无法确定具体出错条目!

结论与最佳实践 ✅

场景推荐方案优势
大数据集流式处理游标式读取内存占用恒定
复杂查询分页JdbcPagingItemReader灵活控制页大小
JPA实体读取JpaPagingItemReader对象自动映射
高可靠写入逐条刷新Writer精确错误定位

💡 终极选择建议

  • 当处理千万级数据时:游标 + 合理fetchSize
  • 当需要复杂对象转换时:JPA分页读取
  • 关键业务数据写入时:务必实现逐条刷新

通过合理组合这些策略,您可以在保证系统稳定性的同时,高效处理任意规模的数据集!