前回は、S3へファイルをダイレクトアップロードするアプリケーションを実装しました。続く今回は、アップロードした後の後続処理を「AWS Lambda」を使ってサーバレスに実行するアプリケーションを実装していきます。なお、動作するAWS Lambdaファンクションは以下のランタイムライブラリを使って実装するものとします。

動作対象 バージョン
Java 1.8
Spring Boot 2.3.1.RELEASE
Spring Cloud Function 3.0.8.RELEASE
Spring Cloud AWS 2.2.2.RELEASE
Spring Data DynamoDB 5.2.1
AmazonSDK Lambda Java event 2.0.2
AmazonSDK Lambda Java core 1.1.0

AWS Lambdaを使ったS3ダイレクトアップロード後の後続処理

S3へのダイレクトアップロード処理が完了した後には、後処理が必要なケースが多くあります。例えば、アップロードしたファイルのオブジェクトキーをデータベースに保存したい、アップロードされたファイルのチェック/加工をしたいなど、実行したい内容に応じて後続処理を実装することになります。

その際、有力な選択肢の一つとなるのが、性能負荷を気にせずにスケールすることができるAWS Lambdaです。ただし、Lambdaファンクションで実装する処理に応じて適切なマネージドサービスやミドルウェアを選択し、エラー発生時のハンドリングも同時に考慮しなければなりません。代表的な処理パターンとしては、以下の図のようなケースが挙げられます。

代表的な処理パターン

代表的な処理パターン

各処理の詳細は以下の通りです。1~5の説明は、前回までに実装したS3へのダイレクトアップロード処理と対応します。

項番 説明
1 クライアント(ブラウザ)からファイルをアップロードするための要求リクエストをサーバ(ECSアプリケーション)へ発出します
2 ECSアプリケーションはクライアントからS3にダイレクトアップロードさせるよう、STSに一時的な認証情報を要求します
3 STSはアプリケーションに一時認証情報を発行します。アプリケーションでは認証情報を基に、署名したPostポリシーなどのダイレクトアップロードに必要な情報を作成します
4 アプリケーションにクライアントにダイレクトアップロードに必要なURLや署名したポリシーなどの情報を返却します
5 クライアントはS3へ署名とともにファイルをダイレクトアップロードします
6 アップロード先のS3のバケットにファイルがアップロード(Post/Put)されるとイベントを発生させるようS3側に設定を行っておき、アップロードを契機として、AWS Lambdaファンクションを実行させます
7 アップロードしたファイルに応じて後処理を実行します
 7’ アップロードしたファイルのオブジェクトキーやファイルサイズといったメタデータを「DynamoDB」に保存します。DynamoDBはLambdaと同様、スケーラビリティの高いデータベースなので、大量の書き込みアクセスが発生するようなユースケースでのデータ保存に適しています。大量のファイルが多数のクライアントから常時アップロードするような場合に向いているケースです
 7” アップロードしたファイルのメタ情報や結果をVPC内で動いているサーバへ連携し、RDSなどに保存したり、クライアントへ通知するためにSQSを使ってメッセージキューとして送信します
 7”’ アップロード後の処理の中でエラーが発生した場合、CloudWatch Logsにログを出力して、それを契機として再びLambdaファンクションを起動し、デッドレターキューを送信します
8 蓄積されたキューに対し、アプリケーションからポーリングしてメッセージキューを取得します
9 受け取ったキューから情報取得して、バックエンドのRDSへ接続するサービスを呼び出します
10 バックエンドのRDSへアップロードしたファイルのメタデータを保存します。RDS内の別のテーブルのデータとジョインさせたりする必要がある場合に向いているケースです
11 クライアントに対して結果を通知する場合などに、クライアントとWebSocketsコネクションを確立しているサーバとの連携をするために、ElastiCacheへメッセージを保存します
12 クライアントとWebSocketsコネクションを確立しているサーバがクライアントへプッシュ通知します(この実装例は、次回以降詳説します)

※ AWS LambdaをVPCにアタッチして、直接RDSなどへアクセスする処理を記述することも可能です。ただし、リクエスト数によってはコネクションが多数張られてRDSに過負荷となるため、SQSなどを経由して、コネクションが確立されているサーバを経由するか、RDS Proxyなど使用することを検討します。

8~10の項目に関しては、連載「AWSで作るクラウドネイティブアプリケーションの基本」の第32回で解説しているので、適宜参照してください。本連載では、6、7や、11、12の項目を順次実装/説明していきます。

なお、実際のソースコードはGitHub上にコミットしています。以降のソースコードでは本質的でない記述を一部省略しているので、実行コードを作成する場合は、必要に応じて適宜GitHub上のソースコードも参照してください。

S3アップロード後に実行するLambdaファンクションの実装

まず、後処理として実行するLambdaファンクションを実装します。

アプリケーションの実装は基本編の第1回と同様、Spring Cloud Functionを使用したアプリケーションで実装します。 必要なライブラリは以下の通りですが、DynamoDBとSQSへアクセスするので、Spring Cloud AWSと、Spring Data DynamoDBのライブラリを追加します。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-function-context</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-function-adapter-aws</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-aws</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-aws-messaging</artifactId>
</dependency>
<dependency>
  <groupId>com.amazonaws</groupId>
  <artifactId>aws-lambda-java-events</artifactId>
  <version>2.0.2</version>
</dependency>
<dependency>
  <groupId>com.amazonaws</groupId>
  <artifactId>aws-lambda-java-core</artifactId>
  <version>1.1.0</version>
</dependency>
<dependency>
  <groupId>io.github.boostchicken</groupId>
  <artifactId>spring-data-dynamodb</artifactId>
  <version>5.2.1</version>
</dependency>

それでは、Handlerクラスと、イベントとしてLamnda関数から実行されるFunctionクラスから実装していきましょう。今回は、S3のファイルアップロードを契機にイベント受信するので、HandlerクラスはSpirngBootRequestHandlerを継承して作成します。継承するクラスの型パラメータとしては、Inputにcom.amazonaws.services.s3.event.S3EventNotificationを、OutputにはString型を指定します。

package org.debugroom.mynavi.sample.aws.lambda.s3event.app.handler;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.s3.event.S3EventNotification;
import org.springframework.cloud.function.adapter.aws.SpringBootRequestHandler;

public class S3UploadEventHandler extends
     SpringBootRequestHandler<S3EventNotification, String> {

    @Override
    public Object handleRequest(S3EventNotification event, Context context) {
        return super.handleRequest(event, context);
    }

}

続いて、Functionクラスです。S3EventNotificationをFlux型で受け取るため、そこからイベントレコードを抽出して、DynamoDBとSQSキューを送信するサービスクラスを作成して呼び出す実装とします。

package org.debugroom.mynavi.sample.aws.lambda.s3event.app.function;

import java.util.function.Function;
import reactor.core.publisher.Flux;

import com.amazonaws.services.s3.event.S3EventNotification;
import org.springframework.beans.factory.annotation.Autowired;

import org.debugroom.mynavi.sample.aws.lambda.s3event.app.moodel.UploadFileMapper;
import org.debugroom.mynavi.sample.aws.lambda.s3event.domain.service.SampleService;

public class SampleFunction implements
     Function<Flux<S3EventNotification>, Flux<String>> {

    @Autowired
    SampleService sampleService;

    @Override
    public Flux<String> apply(Flux<S3EventNotification> s3EventNotificationFlux) {

        s3EventNotificationFlux.subscribe(s ->{
            if(0 != s.getRecords().size()){
                sampleService.uploadPostProcess(
                  UploadFileMapper.map(s.getRecords().get(0)));
            }
        });
        return Flux.just("Complete!");
    }

}

サービスクラスでは、DynamoDBのテーブルへアクセスするUploadFileRepositoryクラスとメッセージキューを送信する処理をまとめたMessageRepositoryを呼び出す処理を実装します。SQSメッセージはJacksonのマッパーライブラリを使って、JSON文字列に変換して送信します。

package org.debugroom.mynavi.sample.aws.lambda.s3event.domain.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.debugroom.mynavi.sample.aws.lambda.s3event.domain.model.UploadFile;
import org.debugroom.mynavi.sample.aws.lambda.s3event.domain.repository.MessageRepository;
import org.debugroom.mynavi.sample.aws.lambda.s3event.domain.repository.UploadFileRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
public class SampleServiceImpl implements SampleService{

    @Autowired
    ObjectMapper objectMapper;

    @Autowired
    MessageRepository messageRepository;

    @Autowired
    UploadFileRepository uploadFileRepository;

    @Override
    public void uploadPostProcess(UploadFile uploadFile) {
        uploadFile.setFileId(UUID.randomUUID().toString());
        uploadFileRepository.save(uploadFile);
        try{
            messageRepository.save(objectMapper.writeValueAsString(uploadFile));
        }catch (JsonProcessingException e){
            e.printStackTrace();
        }
    }

}

DynamoDBとSQSキューを送信する処理や設定の詳細については、基本編の第34回を参照してください。今回の実装で、DynamoDBでは以下のようなモデル情報をS3EventNotificationから受け取って保存するupload-file-tableを作成するものとします。

package org.debugroom.mynavi.sample.aws.lambda.s3event.domain.model;

// omit

@DynamoDBTable(tableName = "upload-file-table")
public class UploadFile {

    @Id
    @DynamoDBHashKey
    private String fileId;
    @DynamoDBAttribute
    private String awsRegion;
    @DynamoDBAttribute
    private String objectKey;
    @DynamoDBAttribute
    private String eventName;
    @DynamoDBAttribute
    private String eventSource;
    @DynamoDBAttribute
    private String eventVersion;
    @DynamoDBAttribute
    private String eventTime;
    @DynamoDBAttribute
    private String ipAddress;
    @DynamoDBAttribute
    private String principalId;
    @DynamoDBAttribute
    private String bucketName;
    @DynamoDBAttribute
    private String arn;
    @DynamoDBAttribute
    private String ownerIdentity;
    @DynamoDBAttribute
    private String urlDecordedKey;
    @DynamoDBAttribute
    private String size;
    @DynamoDBAttribute
    private String sequencer;
    @DynamoDBAttribute
    private String eTag;

}

今回使用したライブラリのバージョンでは、これまで通りSpringBootのオートコンフィグレーション機能を利用していると、アプリケーション実行時に例外org.springframework.beans.factory.BeanDefinitionStoreException: Failed to process import candidates for configuration class. Caused by: java.lang.ArrayStoreException: sun.reflect.annotation.TypeNotPresentExceptionProxyが発生します。

オートコンフィグレーション機能は、特定のライブラリを含めるだけであらかじめ準備された設定クラスの自動構築を行う機能ですが、今回のケースではspring-cloud-starter-aws-messagingのライブラリを含めたときに有効化されるMessagingAutoConfigurationのインナークラスSnsAutoCinfigurationに設定されている@EableSnsアノテーションを取得する際にエラーとなっていました。

今回SNSは利用しないので、以下のようにMessagingAutoConfigurationをオートコンフィグレーション対象から除外して、使用するSQSだけを有効化するよう@EnableSqsを設定クラスに付与する実装で対応しています。

package org.debugroom.mynavi.sample.aws.lambda.s3event.config;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.aws.autoconfigure.messaging.MessagingAutoConfiguration;

// omit

@SpringBootApplication(exclude = MessagingAutoConfiguration.class)
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }

    // omit
}
package org.debugroom.mynavi.sample.aws.lambda.s3event.config;

// omit

@EnableSqs
@Configuration
public class SqsConfig {
  // omit
}
なお、対象のAutoConfiguration機能でのエラー原因を特定するには、エラーが発生している箇所でデバックする必要があります。

* * *

今回は、S3からイベントを受け、DynamoDBへの情報の保存とSQSキューを送信するLambdaファンクションの実装方法について説明しました。次回は実際にAWS上に、Lambdaのデプロイや環境を構築して実行してみます。

著者紹介


川畑 光平(KAWABATA Kohei) - NTTデータ

金融機関システム業務アプリケーション開発・システム基盤担当、ソフトウェア開発自動化関連の研究開発を経て、デジタル技術関連の研究開発・推進に従事。

Red Hat Certified Engineer、Pivotal Certified Spring Professional、AWS Certified Solutions Architect Professional等の資格を持ち、アプリケーション基盤・クラウドなど様々な開発プロジェクト支援にも携わる。AWS Top Engineers & Ambassadors選出。

本連載の内容に対するご意見・ご質問は Facebook まで。