Amazon SQSを使ったSpringアプリケーション(3)

【連載】

AWSで作るクラウドネイティブアプリケーションの基本

【第30回】Amazon SQSを使ったSpringアプリケーション(3)

[2019/11/20 08:00]川畑 光平 ブックマーク ブックマーク

開発ソフトウェア

前回はSQSへキューを送信するProducerアプリケーションを実装しました。今回から2回に渡り、SQSメッセージをポーリングして取得するConsumerアプリケーションから実行されるバッチ処理を「Spring Batch」を使って実装していきます。

今回実装する処理のイメージ

SpringBatchを使用したサンプルアプリケーション

今回作成するSpringBatchを使ったアプリケーションは、以下の図のようなイメージで構成します。

今回作成するSpringBatchを使ったアプリケーションの構成イメージ

SpringBatchの基本的な内容について、ここでは詳細な説明は行いません。必要に応じて、TERASOLUNAガイドラインの「SpringBatchのアーキテクチャ」を参照してください。ただし、このガイドラインで扱っている「TERASOLUNA Batch」では、SpringBatchを使った実装のベストプラクティスの提供に加えて、SpringBatchでは提供されていないDBポーリングによる ジョブの別プロセス非同期実行機能をライブラリとして提供していますが、本連載では、純粋にSpringBatchのみを使って実装します。

また、ガイドラインでは、SpringBootを使ってSpring Batchアプリケーションを実装する方法については述べられていませんが、本連載では、JavaのConfigクラスを使って、SpringBootをベースとした設定方法を扱います。なお、SpringBatchの処理起動はメインクラスによる実行とSQSのポーリングでメッセージを取得後実行する2パターン作成しますが、SQSへのポーリングに関しては、次回以降で解説します。

本連載で実際に作成するSpringBatchアプリケーションは、GitHub上にコミットしています。 以降に記載するソースコードでは、import文など本質的でない記述を省略している部分があるので、実行コードを作成する際は、必要に応じて適宜GitHubにあるソースコードも参照してください。

SpringBootを使ってSpringBatchアプリケーションを作成するには、前回同様、Mavenプロジェクトのpom.xmlでspring-boot-starter-batchのライブラリを追加してください。 また、モデルオブジェクトを簡素化する目的でLombokライブラリを、バッチジョブのリスタートや実行結果の管理のために作られるJobRepository向けのデータベースとしてH2を追加します。

 <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true
  </dependency>
  <dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
  </dependency>
</dependencies>

それでは、SpringBatchアプリケーションの実装に進みます。今回作成するアプリケーションのコンポーネント構成は、以下のようにバッチ処理を構成する上でよく使用される処理で構成してみます。

コンポーネント 説明 必須
SpringBatchApplication SpringBootアプリケーションを実行する起動クラス。単体で実行するために作成
BatchAppConfig バッチ処理の実行単位で作成するBean定義クラス
BatchInfraConfig ジョブ管理テーブルへのデータアクセスを定義するクラス
SampleTasklet ジョブの最初のステップで前処理として実行するTaskletクラス
SampleProcessor ジョブのステップで実行するProcessorクラス
SampleWriter ジョブのステップで実行するWriterクラス
SamplePartitioner ProcessorやWriterをマルチスレッドで並列実行する場合の分割単位を定義するクラス
SampleListener ジョブの実行前後で呼び出されるListenerクラス

SpringBatchの設定クラス実装

それでは、実装していくクラスについて説明します。まずは、SpringBoot起動クラスと、各種設定クラスです。

@SpringBootApplicaitonアノテーションが付与された起動クラスは、同一パッケージにある@Configurationアノテーションが付与された設定クラスと、 設定クラス内で@ComponentScanされたパッケージにあるクラスを読み取ります。ここでは、起動クラスと別パッケージにある設定クラスを読み取るために、 @Importを使用します。読み取るクラスは、以下の2つを作成します。

  • Batch処理を定義する設定クラス:BatchAppConfigクラス
  • ジョブ実行に必要なリソースを定義するクラス:BatchInfraConfigクラス

設定クラスは必ずしも複数である必要はなく、1つにまとめても動作上問題ありません。ただし、クラス名と役割を対応付けて作成しておいたほうが、後々混乱することなく、クラス名から設定内容を識別できるのでベターでしょう。

なお、コンポーネントスキャンを使わずに直接設定クラスを読み込む理由は、 通常バッチは単体の処理で完結するので、必要のないジョブコンポーネントの読み込みを回避して、パフォーマンスを向上させるためです。 また、下記の実装はBatchアプリケーションをローカル単体でテスト実行できるようなかたちで実装したものになります(次回以降解説するConsumerアプリケーションでは、こちらのクラスからは起動しません)。

package org.debugroom.mynavi.sample.aws.sqs.app.batch.launcher;

import org.debugroom.mynavi.sample.aws.sqs.config.BatchAppConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Import;

@Import(BatchAppConfig.class)
@SpringBootApplication
public class SpringBatchApplication {

    public static void main(String[] args) {
        String inputParam = "param=1";
        new SpringApplicationBuilder(SpringBatchApplication.class)
                .web(WebApplicationType.NONE)
                .run(new String[]{inputParam});
     }
}

起動クラスから読み込む「Batchの処理定義を記載したBatchAppConfigクラス」はバッチジョブ単位に作成し、起動クラスとジョブを1:1で組み合わせて、@Importで読み込んで実行するとよいでしょう。コードは以下の通りです。

package org.debugroom.mynavi.sample.aws.sqs.config;

// omit

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;

@Import(BatchInfraConfig.class)                                               // ...(A)
@Configuration
@EnableBatchProcessing                                                        // ...(B)
public class BatchAppConfig extends DefaultBatchConfigurer {                  // ...(C)

    @Autowired
    JobBuilderFactory jobBuilderFactory;                                      // ...(D)

    @Autowired
    StepBuilderFactory stepBuilderFactory;                                    // ...(E)

    @Bean
    public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2){
        return jobBuilderFactory.get("job")
          .listener(jobExecutionListener())
          .start(step1)
          .next(partionStep())
          .build();                                                           // ...(F)
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory
          .get("step1")
          .tasklet(sampleTasklet())
          .build();                                                           // ...(G)
    }

    @Bean
    protected Step step2(){
        return stepBuilderFactory.get("step2")
          .<Sample, Sample>chunk(10)
          .reader(sampleFlatFileItemReader(null))
          .processor(sampleProcessor())
          .writer(sampleWriter())
          .build();                                                           // ...(H)
    }

    @Bean
    protected Step partionStep(){
        return stepBuilderFactory.get("partitionStep")
          .partitioner(step2())
          .partitioner("step2", partitioner(null))
          .taskExecutor(taskExecutor())
          .build();                                                           // ...(I)
    }

    @Bean
    @StepScope
    @Value("#{jobExecutionContext['paramBySampleTasklet']}")                  // ...(J)
    public FlatFileItemReader<Sample> sampleFlatFileItemReader(String paramBySampleTasklet){

        FlatFileItemReader<Sample> flatFileItemReader = new FlatFileItemReader<>();

        flatFileItemReader.setResource(new DefaultResourceLoader().getResource(paramBySampleTasklet));

        DefaultLineMapper<Sample> defaultLineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
        delimitedLineTokenizer.setNames("stepParam");
        defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);

        BeanWrapperFieldSetMapper<Sample> beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper<>();
        beanWrapperFieldSetMapper.setTargetType(Sample.class);
        defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);

        flatFileItemReader.setLineMapper(defaultLineMapper);

        return flatFileItemReader;                                            // ...(K)
    }

    @Bean
    @StepScope
    protected ItemProcessor<Sample, Sample> sampleProcessor(){
        return new SampleProcessor();                                         // ...(L)
    }

    @Bean
    @StepScope
    protected ItemWriter<Sample> sampleWriter(){
        return new SampleWriter();                                            // ...(M)
    }

    @Bean
    protected Tasklet sampleTasklet(){
        return new SampleTasklet();                                           // ...(N)
    }

    @Bean
    protected JobExecutionListener jobExecutionListener(){
        return new SampleListener();                                          // ...(O)
    }

    @Bean
    public TaskExecutor taskExecutor(){
        SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
        simpleAsyncTaskExecutor.setConcurrencyLimit(10);
        return simpleAsyncTaskExecutor;                                       // ...(P)
    }

    @Bean
    @StepScope
    @Value("#{jobExecutionContext['paramBySampleTasklet']}")
    public Partitioner partitioner(String paramBySampleTasklet){
        return new SamplePartitioner(paramBySampleTasklet);                   // ...(Q)
    }

    @Override
    @Autowired
    public void setDataSource(@Qualifier("batchDataSource") DataSource dataSource){
        super.setDataSource(dataSource);                                      // ...(R)
    }
}

こうした設定クラスはバッチジョブ単位に作成し、起動クラスとジョブを1:1で組み合わせて、@Importで読み込んで実行するとよいでしょう。

設定クラスコードの説明は以下の通りです。

項番 説明
A BatchInfraConfig設定クラスをインポートします。
B @EnableBatchProcessingを付与することで、Batch設定クラスとして扱われます。この設定によりJobRepositoryといったSpringBatchの主要なコンポーネントが自動で構築されていくようになります。
C 今回のサンプルではリスタートや実行結果の管理のために作られるJobRepository用のDataSourceをインメモリDBへ置き換えるために使用します。DefaultBatchConfigurerを継承することにより、当設定クラス内でデータソースを置き換えるようオーバーライドします(Rを参照)
D ジョブ内でステップ処理フローを定義するためにJobBuildFactoryをインジェクションします
E ステップ内で実行する処理を定義するためにStepBuildFactoryをインジェクションします
F ジョブのステップ処理フローを定義します。ここでは、"job"というジョブ名に、リスナーとタスクレットを実行する"step1"、Readerとパーティショナーを組み込んだProcessor、Writerを実行する"step2"を定義しています
G ステップ処理を定義します。ここでは、"step1"というstep名に、SampleTaskletクラスを定義しています
H ステップ処理を定義します。ここでは、"step2"というstep名に、IOモデルクラス、チャンクサイズ、Reader、Processor、Writerクラスを定義しています
I "step2"をパーティション化して実行するステップを定義します。ここでは、Partitionerクラスおよび、各ステップを実行するTaskExecutorを定義しています
J Step間で共有するパラメータを設定します。ここではJobExcutionContext内に"paramBySampleTasklet"というパラメータをSampleTaskletクラスで設定し、FlatFileItemReaderの実行引数として受け取るかたちで実装しています
K (H)で設定した通り、SpringBatchから提供されているFlatFileItemReaderで型パラメータとして、モデルクラスとなるSampleクラスを指定してStepScopeで定義します。ここで実装している内容は次回改めて解説します
L (H)で設定した通り、ProcessorとしてSampleProcessorクラスをBean定義します
M (H)で設定した通り、WriterとしてSampleWriterクラスをBean定義します
N (G)で設定した通り、TaskletとしてSampleTaskletクラスをBean定義します
O (F)で設定した通り、リスナーとしてSampleListenerクラスをBean定義します
P (I)で設定した通り、パーティショナーを使って並列実行を行う場合のTaskExecutorクラスとして、SimpleAsyncTaskExecutorをBean定義します
Q (I)で設定した通り、Partitionerクラスとして、SamplePartitionerクラスをBean定義します。JobExcutionContext内に"paramBySampleTasklet"というパラメータ(パーティションキーの元となる読み込むファイルのパス)をコンストラクタインジェクションしてBean生成します
R (C)で設定した通り、リスタートや実行結果の管理のために作られるJobRepository用のDataSourceをインメモリDBへ置き換えるかたちで設定します

また、BatchInfraConfigクラスには複数のジョブ設定クラスから使用される内容を定義します。ここでは、JobRepositoryのためのデータソースをインメモリDBとしてH2を定義します。

package org.debugroom.mynavi.sample.aws.sqs.config;

import javax.sql.DataSource;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

@Configuration
public class BatchInfraConfig {

    @Bean(name="batchDataSource")
    public DataSource jobRepositoryEmbeddedDataSource(){
        return new EmbeddedDatabaseBuilder()
          .setType(EmbeddedDatabaseType.H2)
          .addScript("classpath:/org/springframework/batch/core/schema-h2.sql")
          .build();
    }
}

これで、Batchアプリケーションの設定クラスが作成できました。次回は、TaskletやProcessorなどのバッチ処理クラスを実装します。

著者紹介


川畑 光平(KAWABATA Kohei) - NTTデータ 課長代理

金融機関システム業務アプリケーション開発・システム基盤担当を経て、現在はソフトウェア開発自動化関連の研究開発・推進に従事。

Red Hat Certified Engineer、Pivotal Certified Spring Professional、AWS Certified Solutions Architect Professional等の資格を持ち、アプリケーション基盤・クラウドなどさまざまな開発プロジェクト支援にも携わる。2019 APN AWS Top Engineers & Ambassadors選出。

本連載の内容に対するご意見・ご質問は Facebook まで。

※ 本記事は掲載時点の情報であり、最新のものとは異なる場合がございます。予めご了承ください。

一覧はこちら

連載目次

もっと知りたい!こちらもオススメ

[解説動画] 個人の業務効率化術 - 短時間集中はこうして作る

[解説動画] 個人の業務効率化術 - 短時間集中はこうして作る

「ToDoリストに何カ月も前のタスクが残っている」――今回はそんな皆様の現状を打破する「時間&タスク管理術」を、IT Search+スペシャルセミナーでお馴染み アイ・コミュニケーション 平野 友朗氏に解説していただきます。

関連リンク

この記事に興味を持ったら"いいね!"を Click
Facebook で IT Search+ の人気記事をお届けします

会員登録(無料)

注目の特集/連載
[解説動画] Googleアナリティクス分析&活用講座 - Webサイト改善の正しい考え方
[解説動画] 個人の業務効率化術 - 短時間集中はこうして作る
ミッションステートメント
教えてカナコさん! これならわかるAI入門
知りたい! カナコさん 皆で話そうAIのコト
対話システムをつくろう! Python超入門
Kubernetes入門
AWSで作るクラウドネイティブアプリケーションの基本
PowerShell Core入門
徹底研究! ハイブリッドクラウド
マイナビニュース スペシャルセミナー 講演レポート/当日講演資料 まとめ
セキュリティアワード特設ページ

一覧はこちら

今注目のIT用語の意味を事典でチェック!

一覧はこちら

ページの先頭に戻る