Spring Batch(6)——数据库批数据读写

全文共 3404 个字

前序文章陆续介绍了批处理的基本概念Job使用Step控制Item的结构以及扁平文件的读写。本文将接着前面的内容说明数据库如何进行批处理读写。

数据读取

数据库是绝大部分系统要用到的数据存储工具,因此针对数据库执行批量数据处理任务也是很常见的需求。数据的批量处理与常规业务开发不同,如果一次性读取百万条,对于任何系统而言肯定都是不可取的。为了解决这个问题Spring Batch提供了2套数据读取方案:

  • 基于游标读取数据
  • 基于分页读取数据

游标读取数据

对于有经验大数据工程师而言数据库游标的操作应该是非常熟悉的,因为这是从数据库读取数据流标准方法,而且在Java中也封装了ResultSet这种面向游标操作的数据结构。

ResultSet一直都会指向结果集中的某一行数据,使用next方法可以让游标跳转到下一行数据。Spring Batch同样使用这个特性来控制数据的读取:

  1. 在初始化时打开游标。
  2. 每一次调用ItemReader::read方法就从ResultSet获取一行数据并执行next
  3. 返回可用于数据处理的映射结构(map、dict)。

在一切都执行完毕之后,框架会使用回调过程调用ResultSet::close来关闭游标。由于所有的业务过程都绑定在一个事物之上,所以知道到Step执行完毕或异常退出调用执行close。下图展示了数据读取的过程:

Spring Batch(6)——数据库批数据读写

SQL语句的查询结果称为数据集(对于大部分数据库而言,其SQL执行结果会产生临时的表空间索引来存放数据集)。游标开始会停滞在ID=2的位置,一次ItemReader执行完毕后会产生对应的实体FOO2,然后游标下移直到最后的ID=6。最后关闭游标。

JdbcCursorItemReader

JdbcCursorItemReader是使用游标读取数据集的ItemReader实现类之一。它使用JdbcTemplate中的DataSource控制ResultSet,其过程是将ResultSet的每行数据转换为所需要的实体类。

JdbcCursorItemReader的执行过程有三步:

  1. 通过DataSource创建JdbcTemplate
  2. 设定数据集的SQL语句。
  3. 创建ResultSet到实体类的映射。 大致如下:
//随风溜达的向日葵 chkui.com
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());

除了上面的代码,JdbcCursorItemReader还有其他属性:

属性名称 说明
ignoreWarnings 标记当执行SQL语句出现警告时,是输出日志还是抛出异常,默认为true——输出日志
fetchSize 预通知JDBC驱动全量数据的个数
maxRows 设置ResultSet从数据库中一次读取记录的上限
queryTimeout 设置执行SQL语句的等待超时时间,单位秒。当超过这个时间会抛出DataAccessException
verifyCursorPosition 对游标位置进行校验。由于在RowMapper::mapRow方法中ResultSet是直接暴露给使用者的,因此有可能在业务代码层面调用了ResultSet::next方法。将这个属性设置为true,在框架中会有一个位置计数器与ResultSet保持一致,当执行完Reader后位置不一致会抛出异常。
saveState 标记读取的状态是否被存放到ExecutionContext中。默认为true
driverSupportsAbsolute 告诉框架是指直接使用ResultSet::absolute方法来指定游标位置,使用这个属性需要数据库驱动支持。建议在支持absolute特性的数据库上开启这个特性,能够明显的提升性能。默认为false
setUseSharedExtendedConnection 标记读取数据的游标是否与Step其他过程绑定成同一个事物。默认为false,表示读取数据的游标是单独建立连接的,具有自身独立的事物。如果设定为true需要用ExtendedConnectionDataSourceProxy包装DataSource用于管理事物过程。此时游标的创建标记为'READ_ONLY'、'HOLD_CURSORS_OVER_COMMIT'。需要注意的是该属性需要数据库支持3.0以上的JDBC驱动。

可执行源码

源码在下列地址的items子项目:

执行JdbcCursorItemReader的代码在org.chenkui.spring.batch.sample.items.JdbcReader。启动位置是org.chenkui.spring.batch.sample.database.cursor.JdbcCurosrApplication

在运行代码之前请先在数据库中执行以下DDL语句,并添加部分测试数据。

CREATE TABLE `tmp_test_weather` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `siteid` varchar(64) NOT NULL COMMENT '业务主键',
  `month` varchar(64) NOT NULL COMMENT '日期',
  `type` varchar(64) NOT NULL COMMENT '气象类型',
  `value` int(11) NOT NULL COMMENT '值',
  `ext` varchar(255) DEFAULT NULL COMMENT '扩展数据',
  PRIMARY KEY (`id`)
) ;

运行代码:

//随风溜达的向日葵 chkui.com
public class JdbcReader {

    @Bean
    public RowMapper<WeatherEntity> weatherEntityRowMapper() {

        return new RowMapper<WeatherEntity>() {
            public static final String SITEID_COLUMN = "siteId"; // 设置映射字段
            public static final String MONTH_COLUMN = "month";
            public static final String TYPE_COLUMN = "type";
            public static final String VALUE_COLUMN = "value";
            public static final String EXT_COLUMN = "ext";

            @Override
            // 数据转换
            public WeatherEntity mapRow(ResultSet resultSet, int rowNum) throws SQLException {
                WeatherEntity weatherEntity = new WeatherEntity();
                weatherEntity.setSiteId(resultSet.getString(SITEID_COLUMN));
                weatherEntity.setMonth(resultSet.getString(MONTH_COLUMN));
                weatherEntity.setType(WeatherEntity.Type.valueOf(resultSet.getString(TYPE_COLUMN)));
                weatherEntity.setValue(resultSet.getInt(VALUE_COLUMN));
                weatherEntity.setExt(resultSet.getString(EXT_COLUMN));
                return weatherEntity;
            }
        };
    }

    @Bean
    public ItemReader<WeatherEntity> jdbcCursorItemReader(
        @Qualifier("weatherEntityRowMapper") RowMapper<WeatherEntity> rowMapper, DataSource datasource) {
        JdbcCursorItemReader<WeatherEntity> itemReader = new JdbcCursorItemReader<>();
        itemReader.setDataSource(datasource); //设置DataSource
        //设置读取的SQL
        itemReader.setSql("SELECT siteId, month, type, value, ext from TMP_TEST_WEATHER");
        itemReader.setRowMapper(rowMapper); //设置转换
        return itemReader;
    }
}

HibernateCursorItemReader

在Java体系中数据库操作常见的规范有JPAORM,Spring Batch提供了HibernateCursorItemReader来实现HibernateTemplate,它可以通过Hibernate框架进行游标的控制。

需要注意的是:使用Hibernate框架来处理批量数据到目前为止一直都有争议,核心原因是Hibernate最初是为在线联机事物型系统开发的。不过这并不意味着不能使用它来处理批数据,解决此问题就是让Hibernate使用StatelessSession用来保持游标,而不是standard session一次读写,这将导致Hibernate的缓存机制和数据脏读检查失效,进而影响批处理的过程。关于Hibernate的状态控制机制请阅读官方文档。

HibernateCursorItemReader使用过程与JdbcCursorItemReader没多大差异都是逐条读取数据然后控制状态链接关闭。只不过他提供了Hibernate所使用的HSQL方案。

@Bean
public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
    HibernateCursorItemReader<WeatherEntity> itemReader = new HibernateCursorItemReader<>();
    itemReader.setName("hibernateCursorItemReader");
    itemReader.setQueryString("from WeatherEntity tmp_test_weather");
    itemReader.setSessionFactory(sessionFactory);
    return itemReader;
}

public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
    return new HibernateCursorItemReaderBuilder<CustomerCredit>()
            .name("creditReader")
            .sessionFactory(sessionFactory)
            .queryString("from CustomerCredit")
            .build();
}

如果没有特别的需要,不推荐使用Hibernate

StoredProcedureItemReader

存储过程是在同一个数据库中处理大量数据的常用方法。StoredProcedureItemReader的执行过程和JdbcCursorItemReader一致,但是底层逻辑是先执行存储过程,然后返回存储过程执行结果游标。不同的数据库存储过程游标返回会有一些差异:

  1. 作为一个ResultSet返回。(SQL Server, Sybase, DB2, Derby以及MySQL)
  2. 参数返回一个 ref-cursor实例。比如Oracle、PostgreSQL数据库,这类数据库存储过程是不会直接return任何内容的,需要从传参获取。
  3. 返回存储过程调用后的返回值。

针对以上3个类型,配置上有一些差异:

//随风溜达的向日葵 chkui.com
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
    StoredProcedureItemReader reader = new StoredProcedureItemReader();

    reader.setDataSource(dataSource);
    reader.setProcedureName("sp_processor_weather");
    reader.setRowMapper(new weatherEntityRowMapper());
	
    reader.setRefCursorPosition(1);//第二种类型需要指定ref-cursor的参数位置

    reader.setFunction(true);//第三种类型需要明确的告知reader通过返回获取

    return reader;
}

使用存储过程处理数据的好处是可以实现针对库内的数据进行合并、分割、排序等处理。如果数据在同一个数据库,性能也明显好于通过Java处理。

分页读取数据

相对于游标,还有一个办法是进行分页查询。分页查询意味着再进行批处理的过程中同一个SQL会多次执行。在联机型事物系统中分页查询常用于列表功能,每一次查询需要指定开始位置和结束位置。

JdbcPagingItemReader

分页查询的默认实现类是JdbcPagingItemReader,它的核心功能是用分页器PagingQueryProvider进行分页控制。由于不同的数据库分页方法差别很大,所以针对不同的数据库有不同的实现类。框架提供了SqlPagingQueryProviderFactoryBean用于检查当前数据库并自动注入对应的PagingQueryProvider

JdbcPagingItemReader会从数据库中一次性读取一整页的数据,但是调用Reader的时候还是会一行一行的返回数据。框架会自行根据运行情况确定什么时候需要执行下一个分页的查询。

分页读取数据执行源码

执行JdbcPagingItemReader的代码在org.chenkui.spring.batch.sample.items.pageReader。启动位置是org.chenkui.spring.batch.sample.database.paging.JdbcPagingApplication

//随风溜达的向日葵 chkui.com
public class pageReader {
    final private boolean wrapperBuilder = false;
    @Bean
    //设置 queryProvider
    public SqlPagingQueryProviderFactoryBean queryProvider(DataSource dataSource) {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        provider.setDataSource(dataSource);
        provider.setSelectClause("select id, siteid, month, type, value, ext");
        provider.setFromClause("from tmp_test_weather");
        provider.setWhereClause("where id>:start");
        provider.setSortKey("id");

        return provider;
    }

    @Bean
    public ItemReader<WeatherEntity> jdbcPagingItemReader(DataSource dataSource,
            PagingQueryProvider queryProvider,
            RowMapper<WeatherEntity> rowMapper) {

        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("start", "1");
        JdbcPagingItemReader<WeatherEntity> itemReader;
        if (wrapperBuilder) {
            itemReader = new JdbcPagingItemReaderBuilder<WeatherEntity>()
                    .name("creditReader")
                    .dataSource(dataSource)
                    .queryProvider(queryProvider)
                    .parameterValues(parameterValues)
                    .rowMapper(rowMapper)
                    .pageSize(1000)
                    .build();
        } else {
            itemReader = new JdbcPagingItemReader<>();
            itemReader.setName("weatherEntityJdbcPagingItemReader");
            itemReader.setDataSource(dataSource);
            itemReader.setQueryProvider(queryProvider);
            itemReader.setParameterValues(parameterValues);
            itemReader.setRowMapper(rowMapper);
            itemReader.setPageSize(1000);
        }
        return itemReader;
    }
}

数据写入

Spring Batch为不同类型的文件的写入提供了多个实现类,但并没有为数据库的写入提供任何实现类,而是交由开发者自己去实现接口。理由是:

  1. 数据库的写入与文件写入有巨大的差别。对于一个Step而言,在写入一份文件时需要保持对文件的打开状态从而能够高效的向队尾添加数据。如果每次都重新打开文件,从开始位置移动到队尾会耗费大量的时间(很多文件流无法在open时就知道长度)。当整个Step结束时才能关闭文件的打开状态,框架提供的文件读写类都实现了这个控制过程。

  2. 另外无论使用何种方式将数据写入文件都是"逐行进行"的(流数据写入、字符串逐行写入)。因此当数据写入与整个Step绑定为事物时还需要实现一个控制过程是:在写入数据的过程中出现异常时要擦除本次事物已经写入的数据,这样才能和整个Step的状态保持一致。框架中的类同样实现了这个过程。

  3. 但是向数据库写入数据并不需要类似于文件的尾部写入控制,因为数据库的各种链接池本身就保证了链接->写入->释放的高效执行,也不存在向队尾添加数据的问题。而且几乎所有的数据库驱动都提供了事物能力,在任何时候出现异常都会自动回退,不存在擦除数据的问题。

因此,对于数据库的写入操作只要按照常规的批量数据写入的方式即可,开发者使用任何工具都可以完成这个过程。

写入数据一个简单的实现

实现数据写入方法很多,这和常规的联机事务系统没任何区别。下面直接用JdbcTemplate实现了一个简单的数据库写入过程。

执行数据库写入的核心代码在org.chenkui.spring.batch.sample.items.JdbcWriter。启动位置是org.chenkui.spring.batch.sample.database.output.JdbcWriterApplication

//随风溜达的向日葵 chkui.com
public class JdbcWriter {

    @Bean
    public ItemWriter<WeatherEntity> jdbcBatchWriter(JdbcTemplate template) {

        return new ItemWriter<WeatherEntity>() {
            final private static String INSERt_SQL =
                      "INSERT INTO tmp_test_weather(siteid, month, type, value, ext) VALUES(?,?,?,?,?)";
            @Override
            public void write(List<? extends WeatherEntity> items) throws Exception {
                List<Object[]> batchArgs = new ArrayList<>();
                for (WeatherEntity entity : items) {
                    Object[] objects = new Object[5];
                    objects[0] = entity.getSiteId();
                    objects[1] = entity.getMonth();
                    objects[2] = entity.getType().name();
                    objects[3] = entity.getValue();
                    objects[4] = entity.getExt();
                    batchArgs.add(objects);
                }
                template.batchUpdate(INSERt_SQL, batchArgs);
            }
        };
    }
}

组合使用案例

下面是一些组合使用过程,简单实现了文件到数据库、数据库到文件的过程。文件读写的过程已经在文件读写中介绍过,这里会重复使用之前介绍的文件读写的功能。

下面的案例是将data.csv中的数据写入到数据库,然后再将数据写入到out-data.csv。案例组合使用已有的item完成任务:flatFileReaderjdbcBatchWriterjdbcCursorItemReadersimpleProcessorflatFileWriter。这种ReaderProcessorWriter组合的方式也是完成一个批处理工程的常见开发方式。

案例的运行代码在org.chenkui.spring.batch.sample.database.complex包中,使用了2个Step来完成任务,一个将数据读取到数据库,一个将数据进行过滤,然后再写入到文件:

//随风溜达的向日葵 chkui.com
public class FileComplexProcessConfig {
    @Bean
    // 配置Step1
    public Step file2DatabaseStep(StepBuilderFactory builder,
            @Qualifier("flatFileReader") ItemReader<WeatherEntity> reader,
            @Qualifier("jdbcBatchWriter") ItemWriter<WeatherEntity> writer) {
        return builder.get("file2DatabaseStep") // 创建
                .<WeatherEntity, WeatherEntity>chunk(50) // 分片
                .reader(reader) // 读取
                .writer(writer) // 写入
                .faultTolerant() // 开启容错处理
                .skipLimit(20) // 跳过设置
                .skip(Exception.class) // 跳过异常
                .build();
    }

    @Bean
    // 配置Step2
    public Step database2FileStep(StepBuilderFactory builder,
            @Qualifier("jdbcCursorItemReader") ItemReader<WeatherEntity> reader,
            @Qualifier("simpleProcessor") ItemProcessor<WeatherEntity, MaxTemperatureEntiry> processor,
            @Qualifier("flatFileWriter") ItemWriter<MaxTemperatureEntiry> writer) {
        return builder.get("database2FileStep") // 创建
                .<WeatherEntity, MaxTemperatureEntiry>chunk(50) // 分片
                .reader(reader) // 读取
                .processor(processor) //
                .writer(writer) // 写入
                .faultTolerant() // 开启容错处理
                .skipLimit(20) // 跳过设置
                .skip(Exception.class) // 跳过异常
                .build();
    }

    @Bean
    public Job file2DatabaseJob(@Qualifier("file2DatabaseStep") Step step2Database,
            @Qualifier("database2FileStep") Step step2File, JobBuilderFactory builder) {
        return builder.get("File2Database").start(step2Database).next(step2File).build();
    }
}