前回はSpring Batchを使ったバッチの設定を実装しました。今回は引き続きバッチの処理を実装していきます。

今回作成するSpringBatchを使ったアプリケーションは以下の図のようなイメージで構成しています。

今回作成するSpringBatchを使ったアプリケーションの構成イメージ

本連載で実際に作成するSpringBatchアプリケーションは、GitHub上にコミットしています。 以降に記載するソースコードでは、import文など本質的でない記述を省略している部分があるので、実行コードを作成する際は、必要に応じて適宜GitHubにあるソースコードも参照してください。

SpringBatchの処理実装

では早速、実装するクラスについて説明しましょう。まず、ジョブの最初のステップで実行されるSampleTaskletでは、org.springframework.batch.core.step.tasklet.Taskletを実装して、executeメソッド内にバッチ処理を実装します。

また、サンプルとして、ChunkContextを通じてバッチ起動時に渡した引数をStepExecutionから取得し、ログに出力する処理を実装します。加えて、後続の処理のパラメータとして、インプットファイルのパスをJobExecutionContextに設定します。

戻り値として、org.springframework.batch.repeat.RepeatStatusに適切なステータスコードを設定しておきます。

package org.debugroom.mynavi.sample.aws.sqs.app.batch.step;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SampleTasklet implements Tasklet {

    @Override
    public RepeatStatus execute(StepContribution stepContribution,
                             ChunkContext chunkContext) throws Exception {

        StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
        String param = stepExecution.getJobParameters().getString("param");
        log.info(this.getClass().getName() + "#execute() starteds. input param : " + param);
        ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();
        jobExecutionContext.put("paramBySampleTasklet", "/test.txt");
        return RepeatStatus.FINISHED;
    }
}

前回定義したジョブの処理フロー上では、続いてファイルの読み込みを行うFlatFileItemReaderが実行されます。ここで実行されるのは、設定クラスBatchAppConfigで定義したsampleFlatFileItemReaderメソッドの処理です。

上記のSampleTaskletでJobExecutionContextに設定したパラメータ(Readerで読み込むファイルのパス)を受け取り、ファイルの読み込み設定や、ファイルのデリミタ(区切り文字)、モデルクラスマッピング設定を行います。Springから提供されているorg.springframework.batch.item.file.FlatFileItemReaderクラスに、型パラメータとしてモデルクラスを設定するだけで単純なファイルの読み込み実装を簡略化できる(Readerクラスをわざわざ実装しなくてもよい)のが特徴です。なお、ここでDelimiterに設定している名前「stepParam」はマッピングするモデルクラスの変数名になります。

package org.debugroom.mynavi.sample.aws.sqs.config;

// omit

import org.springframework.batch.item.file.FlatFileItemReader;

@EnableBatchProcessing
public class BatchAppConfig extends DefaultBatchConfigurer {

// omit

    @Bean
    @StepScope
    @Value("#{jobExecutionContext['paramBySampleTasklet']}")
    public FlatFileItemReader<Sample> sampleFlatFileItemReader(String paramBySampleTasklet){

        FlatFileItemReader<Sample> flatFileItemReader = new FlatFileItemReader<>();

        flatFileItemReader.setResource(new DefaultResourceLoader().getResource(paramBySampleTasklet));

        DefaultLineMapper<Sample> defaultLineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
        delimitedLineTokenizer.setNames("stepParam");
        defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);

        BeanWrapperFieldSetMapper<Sample> beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper<>();
        beanWrapperFieldSetMapper.setTargetType(Sample.class);
        defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);

        flatFileItemReader.setLineMapper(defaultLineMapper);

        return flatFileItemReader;
    }
 }

SpringBatchが読み込んだファイルのデータをモデルクラスにマッピングした後、後続のProcessor、Writerと実行されていきますが、これらの処理は前回設定した通り、パーティショナーを通して並列実行されます。パーティショナーを使用した処理については、TERASOLUNAのバッチガイドライン「Partitioning Step」も合わせて参考にしてください。

パーティショナーでは、org.springframework.batch.core.partition.support.Partitionerを実装し、partition(int gridSize)メソッド内で、並列実行させたいスレッドの数に応じてパーティションIDを作成して、ID名とIDの値をExecutionContext#putStringに設定し、スレッドを識別する文字列とExecutionContextをペアでMapに設定して戻り値として返却する実装を行います。

package org.debugroom.mynavi.sample.aws.sqs.app.batch.partitioner;

// omit
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class SamplePartitioner implements Partitioner {

   private String param;

   @Override
   public Map<String, ExecutionContext> partition(int gridSize) {
       Map<String, ExecutionContext> executionContextMap = new HashMap<>();
       try(InputStream inputStream = getClass().getResourceAsStream(param);
               Reader reader = new InputStreamReader(inputStream);
               BufferedReader bufferedReader = new BufferedReader(reader);){
           String readLine;
           int index = 0;
           while ((readLine = bufferedReader.readLine()) != null){
               ExecutionContext executionContext = new ExecutionContext();
               executionContext.putString("partitionId", readLine);
               executionContextMap.put("partition" + index, executionContext);
               index++;
           }
        }catch(IOException e){
            e.printStackTrace();
        }
        return executionContextMap;
    }
}

後続のProcessorでは、org.springframework.batch.item.ItemProcessorを実装し、型パラメータとしてIOとなるモデルクラスを指定します。クラス内でオーバーライドしたprocessメソッドが実行されますが、パーティショナーで作成したパーティンションキーの数に応じたスレッド数で多重非同期実行されます。

Processorクラスでは、インプットとしてマッピングされたモデルクラスSampleが入力クラスとして渡されます。ProcessorクラスでインジェクションされたStepExecutionを通じて、パーティションキーを取得し、モデルの変数がパーティションキーとして設定したIDと一致する場合にのみ、ログ出力処理を行うようなサンプル処理を実装しています。

package org.debugroom.mynavi.sample.aws.sqs.app.batch.step;

// omit
import java.util.Objects;

import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Value;

@Slf4j
public class SampleProcessor implements ItemProcessor {

    @Value("#{stepExecution}")
    private StepExecution stepExecution;

    @Override
    public Sample process(Sample sample) throws Exception {
        ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
        ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();
        if(Objects.equals(sample.getStepParam(), stepExecutionContext.get("partitionId"))){
            log.info(this.getClass().getName()
              + " started. sample.stepParam:" + sample.getStepParam()
              + " stepExecution.partitionId:" + stepExecutionContext.getString("partitionId"));
        }
        return sample;
    }
}

Processorの後続で実行されるWriterはorg.springframework.batch.item.ItemWriterを実装し、型パラメータとしてインプットクラスを指定します。ここでも先ほどと同様に、パーティショナーで作成したパーティションキーの数に応じたスレッドで多重非同期実行されます。同じく、パーティションキーと一致したモデルクラスのみ実行するかたちでログ出力処理を実装しています。

package org.debugroom.mynavi.sample.aws.sqs.app.batch.step;

// omit
import java.util.Objects;

import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemWriter;

import org.springframework.beans.factory.annotation.Value;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SampleWriter implements ItemWriter {

    @Value("#{stepExecution}")
    private StepExecution stepExecution;

    @Override
    public void write(List extends Sample> samples) throws Exception {
        ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
        log.info(this.getClass().getName() + " started.");
        samples.stream()
          .filter(sample -> Objects.equals(((Sample) sample).getStepParam(), stepExecutionContext.get("partitionId")))
          .forEach(sample -> {
              log.info(this.getClass().getName() + " sample.stepParam:" + ((Sample) sample).getStepParam());
          });
        stepExecutionContext.put("status", "complete!");
    }
}

なお、並列実行した結果を元に最後に1つのファイルへ書き出す場合は、多重実行されたスレッドが全て完了するのを待ち合わせ、最終処理を行うように実装する必要があります。

ジョブの終了前後で何か処理実行が必要であれば、JobExecutionListenerSupportを継承し、beforeJobメソッドやafterJobメソッドで適宜必要な処理を行うと良いでしょう。

package org.debugroom.mynavi.sample.aws.sqs.app.batch.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;

@Slf4j
public class SampleListener extends JobExecutionListenerSupport {

    @Override
    public void afterJob(JobExecution jobExecution) {
        log.info(this.getClass().getName() + "#afterJob started.");
    }

    @Override
    public void beforeJob(JobExecution jobExecution) {
        log.info(this.getClass().getName() + "#beforeJob started.");
    }

}

実装が終わったら、早速メインクラスを実行してバッチジョブを実行させてみましょう。

// omit

.   ____          _            __ _ _
/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/  ___)| |_)| | | | | || (_| |  ) ) ) )
'  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
:: Spring Boot ::        (v2.1.7.RELEASE)

// omit

2019-09-11 01:58:11.973  INFO 66174 ––– [           main] o.d.m.s.a.s.a.b.l.SpringBatchApplication : Starting SpringBatchApplication on kawabatakouheinoiMac.local with PID 66174 (/Users/xxxxx/debugroom/mynavi-sample-aws-sqs/spring-batch/target/classes started by kawabatakouhei in /Users/xxxxxx/debugroom/mynavi-sample-aws-sqs)
2019-09-11 01:58:11.978  INFO 66174 ––– [           main] o.d.m.s.a.s.a.b.l.SpringBatchApplication : No active profile set, falling back to default profiles: default
2019-09-11 01:58:21.388  INFO 66174 ––– [           main] o.s.j.d.e.EmbeddedDatabaseFactory        : Starting embedded database: url='jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=false', username='sa'
2019-09-11 01:58:22.770  WARN 66174 ––– [           main] o.s.b.c.c.a.DefaultBatchConfigurer       : No transaction manager was provided, using a DataSourceTransactionManager
2019-09-11 01:58:22.823  INFO 66174 ––– [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: H2
2019-09-11 01:58:24.657  INFO 66174 ––– [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2019-09-11 01:58:24.936  WARN 66174 ––– [           main] o.s.b.c.l.AbstractListenerFactoryBean    : org.springframework.batch.item.ItemWriter is an interface. The implementing class will not be queried for annotation based listener configurations. If using @StepScope on a @Bean method, be sure to return the implementing class so listener annotations can be used.
2019-09-11 01:58:24.937  WARN 66174 ––– [           main] o.s.b.c.l.AbstractListenerFactoryBean    : org.springframework.batch.item.ItemProcessor is an interface. The implementing class will not be queried for annotation based listener configurations. If using @StepScope on a @Bean method, be sure to return the implementing class so listener annotations can be used.
2019-09-11 01:58:25.840  INFO 66174 ––– [           main] o.d.m.s.a.s.a.b.l.SpringBatchApplication : Started SpringBatchApplication in 19.962 seconds (JVM running for 23.551)
2019-09-11 01:58:25.843  INFO 66174 ––– [           main] o.s.b.a.b.JobLauncherCommandLineRunner   : Running default command line with: [param=1]
2019-09-11 01:58:26.322  INFO 66174 ––– [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] launched with the following parameters: [{param=1}]
2019-09-11 01:58:26.360  INFO 66174 ––– [           main] o.d.m.s.a.s.a.b.listener.SampleListener  : org.debugroom.mynavi.sample.aws.sqs.app.batch.listener.SampleListener#beforeJob started.
2019-09-11 01:58:26.397  INFO 66174 ––– [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
2019-09-11 01:58:26.445  INFO 66174 ––– [           main] o.d.m.s.a.s.a.batch.step.SampleTasklet   : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleTasklet#execute() starteds. input param : 1
2019-09-11 01:58:26.468  INFO 66174 ––– [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [partitionStep]
2019-09-11 01:58:26.871  INFO 66174 ––– [cTaskExecutor-1] o.d.m.s.a.s.a.b.step.SampleProcessor     : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleProcessor started. sample.stepParam:test2 stepExecution.partitionId:test2
2019-09-11 01:58:26.873  INFO 66174 ––– [cTaskExecutor-4] o.d.m.s.a.s.a.b.step.SampleProcessor     : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleProcessor started. sample.stepParam:test3 stepExecution.partitionId:test3
2019-09-11 01:58:26.873  INFO 66174 ––– [cTaskExecutor-3] o.d.m.s.a.s.a.b.step.SampleProcessor     : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleProcessor started. sample.stepParam:test1 stepExecution.partitionId:test1
2019-09-11 01:58:26.875  INFO 66174 ––– [cTaskExecutor-5] o.d.m.s.a.s.a.b.step.SampleProcessor     : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleProcessor started. sample.stepParam:test5 stepExecution.partitionId:test5
2019-09-11 01:58:26.875  INFO 66174 ––– [cTaskExecutor-2] o.d.m.s.a.s.a.b.step.SampleProcessor     : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleProcessor started. sample.stepParam:test4 stepExecution.partitionId:test4
2019-09-11 01:58:26.878  INFO 66174 ––– [cTaskExecutor-5] o.d.m.s.a.s.app.batch.step.SampleWriter  : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleWriter started.
2019-09-11 01:58:26.879  INFO 66174 ––– [cTaskExecutor-3] o.d.m.s.a.s.app.batch.step.SampleWriter  : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleWriter started.
2019-09-11 01:58:26.880  INFO 66174 ––– [cTaskExecutor-4] o.d.m.s.a.s.app.batch.step.SampleWriter  : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleWriter started.
2019-09-11 01:58:26.880  INFO 66174 ––– [cTaskExecutor-3] o.d.m.s.a.s.app.batch.step.SampleWriter  : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleWriter sample.stepParam:test1
2019-09-11 01:58:26.880  INFO 66174 ––– [cTaskExecutor-4] o.d.m.s.a.s.app.batch.step.SampleWriter  : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleWriter sample.stepParam:test3
2019-09-11 01:58:26.880  INFO 66174 ––– [cTaskExecutor-1] o.d.m.s.a.s.app.batch.step.SampleWriter  : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleWriter started.
2019-09-11 01:58:26.880  INFO 66174 ––– [cTaskExecutor-1] o.d.m.s.a.s.app.batch.step.SampleWriter  : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleWriter sample.stepParam:test2
2019-09-11 01:58:26.880  INFO 66174 ––– [cTaskExecutor-5] o.d.m.s.a.s.app.batch.step.SampleWriter  : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleWriter sample.stepParam:test5
2019-09-11 01:58:26.883  INFO 66174 ––– [cTaskExecutor-2] o.d.m.s.a.s.app.batch.step.SampleWriter  : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleWriter started.
2019-09-11 01:58:26.883  INFO 66174 ––– [cTaskExecutor-2] o.d.m.s.a.s.app.batch.step.SampleWriter  : org.debugroom.mynavi.sample.aws.sqs.app.batch.step.SampleWriter sample.stepParam:test4
2019-09-11 01:58:26.903  INFO 66174 ––– [           main] o.d.m.s.a.s.a.b.listener.SampleListener  : org.debugroom.mynavi.sample.aws.sqs.app.batch.listener.SampleListener#afterJob started.
2019-09-11 01:58:26.906  INFO 66174 ––– [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed with the following parameters: [{param=1}] and the following status: [COMPLETED]
2019-09-11 01:58:26.910  INFO 66174 ––– [       Thread-6] o.s.j.d.e.EmbeddedDatabaseFactory        : Shutting down embedded database: url='jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=false'

これで、SpringBootを使ってバッチジョブを実装できました。次回は、SQSキューを受け取ってこのバッチジョブを実行するアプリケーションを作成してみます。

著者紹介


川畑 光平(KAWABATA Kohei) - NTTデータ 課長代理

金融機関システム業務アプリケーション開発・システム基盤担当を経て、現在はソフトウェア開発自動化関連の研究開発・推進に従事。

Red Hat Certified Engineer、Pivotal Certified Spring Professional、AWS Certified Solutions Architect Professional等の資格を持ち、アプリケーション基盤・クラウドなどさまざまな開発プロジェクト支援にも携わる。2019 APN AWS Top Engineers & Ambassadors選出。

本連載の内容に対するご意見・ご質問は Facebook まで。