本連載では、AWS Lambdaを使ったサーバレス処理でのエラーハンドリング方法を解説しています。

前回取り上げた範囲は、下図の赤字の矢印の部分です。同期的なLambdaの呼び出しでビジネスエラーが発生するケースを想定し、Spring Cloud Functionを使ったLambdaファンクション内でエラーハンドリングを実装して、CloudFormationテンプレートを使ってAPI GatewayとLambda環境を構築する方法を解説しました。

イメージ図

今回から取り上げるのは、API GatewayとLambda実行環境でシステムエラーが発生することを想定した数の赤い矢印部分の実装です。

イメージ図

本連載の第6回でも解説した通り、システムエラーはユーザー側で対処しようがない、システムに問題が発生しているときのエラーであり、システム管理者や運用者が対処しなければなりません。そのため、ユーザー側にエラーを通知するだけでなく、エラー原因解析のためのログを残したり、運用担当へメッセージを通知したりする必要があります。

オンプレミスで構築されたシステムなどでは、通常、監視ミドルウェアなどを通じてサーバに出力されたエラーメッセージを運用担当へ通知する方法が一般的です。しかし、本連載のように実行環境がクラウド環境、ましてサーバレスであるLambdaでは、基本的にCloudWatchを活用してエラー対処するための設定を行う必要があります。

このような場合のよくあるユースケースとして、上図のようにCloudWatchに出力されたログをイベント契機として新たにLambdaファンクションを実行し、MattermostなどのコミュニケーションツールのWebhook機能を使ってエラー通知する実装方法を解説していきます。

システムエラーのハンドリングの実装 - Lambdaファンクションの同期呼び出し時

以降では、第7回と同じ要領で、Spring Cloud Functionで実装されたLambdaファンクションのシステムエラーのハンドリング実装について、解説を進めていきます。FunctionInvokerから実行される、同期的なシステムエラーハンリングを実装したファンクションクラスは以下の通りです。

package org.debugroom.mynavi.sample.aws.lambda.errorhandling.app.function;

// omit
import java.util.function.Function;

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import org.debugroom.mynavi.sample.aws.lambda.errorhandling.domain.service.CreateSystemErrorService;
import org.debugroom.mynavi.sample.aws.lambda.errorhandling.app.model.SampleResource;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SyncExecuteSystemErrorFunction
     implements Function<Map<String, Object>, Message<SampleResource>> { //(1)


    @Autowired
    CreateSystemErrorService createSystemErrorService; //(2)

    @Override
    public Message<SampleResource> apply(Map<String, Object> stringObjectMap) {

        log.info(this.getClass().getName() + " has started!");
        log.info("[Input]" + stringObjectMap.toString()); //(3)

        createSystemErrorService.execute(); //(4)

        return MessageBuilder.withPayload(SampleResource.builder()
             .message("This code would never be reached.").build()).build(); //(5)

    }
}

上記のコードのポイントは下表の通りです。

項番 説明
1 java.util.Functionクラスを実装します。インプット型としてMapクラスを、アウトプット型として、返却するクラスオブジェクトSampleResourceを型パラメータに指定したorg.springframework.messaging.Messageを指定します
2 システム例外(非検査例外)をスローするサービスクラスとしてCreateSystemErrorServiceを使用します
3 Slf4jのログ機能を使って、インプットパラメータおよびメタデータを標準出力に出力しています。標準出力した内容はCloudWatch側へログとして出力されます
4 通常非検査例外は、想定外に発生するエラーに起因するものであり、キャッチするものでありません。ここでは、何かしら想定がにエラーが発生したことを想定して、Javaの非検査例外クラスであるRuntimeExceptionを継承したSystemExceptionクラスをスローするServiceを呼び出します
5 4で非検査例外であるSystemExceptionがスローされているので到達することはないコードです。スローされたタイミングでその処理は中断され、SystemExceptionのスタックトレースが標準出力へ出力されることになります。出力されたスタックトレースはCloudWatch Logsから確認可能です

ここで、出力されたログに特定の文字列が含まれた場合、新たに別のLambdaファンクションを起動して、出力されたエラーの内容を外部のコミュニケーションツールであるMattermostへ連携する方法を紹介します。

CloudWatchに出力されたシステムエラーログを契機としたLambda関数の実装

前節で実行されたLambdaファンクションのエラーログは、CloudWatch Logsに出力されています。AWSのユーザーガイド「サブスクリプションを使用したログデータのリアルタイム処理」にもある通り、CloudWatch Logsで出力された特定の文字列をサブスクリプションして、Lambdaファンクションを実行することができます。

その際、Lambdaに渡されるログのイベントデータはJSON形式で渡されますが、同じくユーザーガイドの「AWS Lambda のサブスクリプションフィルタ」でも説明されているように、Data属性がBase64でエンコードされており、GZIP形式で圧縮されています。

ログデータを受け取り、内容をデコードしてMattermostのWebhookURLへ通知するLambdaファンクション実装のサンプルは以下の通りです。

package org.debugroom.mynavi.sample.aws.lambda.errorhandling.app.function;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.zip.GZIPInputStream;

import com.amazonaws.util.Base64;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.debugroom.mynavi.sample.aws.lambda.errorhandling.app.ServiceProperties;
import org.debugroom.mynavi.sample.aws.lambda.errorhandling.domain.model.MattermostNotification;
import org.debugroom.mynavi.sample.aws.lambda.errorhandling.domain.repository.NotificationRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

@Slf4j
public class NotifySystemErrorFunction implements Function<Map<String, Object>, Message<String>> { //(1)

    @Autowired
    ObjectMapper objectMapper;

    @Autowired
    ServiceProperties serviceProperties;

    @Autowired
    @Qualifier("notificationRepositoryMattermostImpl")
    NotificationRepository notificationRepository;  //(2)

    @Override
    public Message<String> apply(Map<String, Object> stringObjectMap) {

        log.info(this.getClass().getName() + " has started!");
        for(String key : stringObjectMap.keySet()){
            Object value = stringObjectMap.get(key);
            if(Objects.nonNull(value)){
                log.info("[Key]" + key + " [Value]" + value.toString());
            }else {
                log.info("[Key]" + key + " [Value]" + "null");
            }
        }
        Map<String, Object> decodeLogMap = decodeLog((Map)stringObjectMap.get("awslogs")); //(3)

        notificationRepository.save(MattermostNotification.builder()
             .text(decodeLogMap.toString())
             .channel(serviceProperties.getMattermost().getChannel()) // (4)
             .build());

        return MessageBuilder.withPayload("Complete!").build();
    }

    private Map<String, Object> decodeLog(Map<String, Object> inputLogMap){
        byte[] compressedDecodeLogs = Base64.decode(
             ((String)inputLogMap.get("data")).getBytes(StandardCharsets.UTF_8)); //(5)

        Map<String, Object> decodeLogMap = new HashMap<>();

        try(ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedDecodeLogs);
            GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
            InputStreamReader inputStreamReader = new InputStreamReader(gzipInputStream);
            BufferedReader bufferedReader = new BufferedReader(inputStreamReader)){

            StringBuilder stringBuilder = new StringBuilder();
            String line;
            while ((line = bufferedReader.readLine()) != null){
                stringBuilder.append(line);
            }
            decodeLogMap = objectMapper.readValue(stringBuilder.toString(), new TypeReference<Map<String, Object>>() {}); //(6)
        } catch (IOException e){
            log.info(decodeLogMap.toString());
        }

        return decodeLogMap;

    }

}

ファンクションクラスの実装コードの詳細は下表の通りです。

項番 説明
1 java.util.Functionクラスを実装します。インプット型としてMapクラスを、アウトプット型として、文字列型を型パラメータに指定したorg.springframework.messaging.Messageを指定します
2 MattermostのWebhook用のURLにメッセージを送信するRepositoryクラスをインジェクションします
3 インプットデータから"awslogs"属性のデータを取り出し、Base64+Zip形式でエンコードされているログデータをデコードするメソッドを呼び出します
4 デコードしたログの文字列をMattermostの指定されたチャネルに送信します。チャネル名称はプロパティから指定するものとして実装しています
5 Base64エンコードされている文字列をデコードします
6 デコードされた文字列のをUnzip化してJSON文字列から、Map型のオブジェクトクラスに変換して返却します

また、Mattermostへログメッセージを送信する部分は、WebClientを使って次のように実装しています。なお、WebhookのURL自体はSystemsManager Parameter Storeに設定したパラメータを取得して送信します。

package org.debugroom.mynavi.sample.aws.lambda.errorhandling.domain.repository;

import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement;
import com.amazonaws.services.simplesystemsmanagement.model.GetParameterRequest;
import com.amazonaws.services.simplesystemsmanagement.model.GetParameterResult;
import org.debugroom.mynavi.sample.aws.lambda.errorhandling.app.ServiceProperties;
import org.debugroom.mynavi.sample.aws.lambda.errorhandling.domain.model.Notification;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;

@Component
public class NotificationRepositoryMattermostImpl implements NotificationRepository {

    @Autowired
    ServiceProperties serviceProperties;

    @Autowired
    AWSSimpleSystemsManagement awsSimpleSystemsManagement;

    @Override
    public void save(Notification message) {
        WebClient webClient = WebClient.builder()
          .baseUrl(getParameterFromParameterStore(
                  serviceProperties.getSystemsManagerParameterStore()
                          .getMattermost()
                          .getInCommingWebhook(), false))
          .build();
        webClient.post()
          .bodyValue(message)
          .retrieve()
          .bodyToMono(Void.class)
          .block();
    }

    private String getParameterFromParameterStore(String paramName, boolean isEncripted){
        GetParameterRequest request = new GetParameterRequest();
        request.setName(paramName);
        request.setWithDecryption(isEncripted);
        GetParameterResult getParameterResult = awsSimpleSystemsManagement.getParameter(request);
        return getParameterResult.getParameter().getValue();
    }
}


* * *

今回は、API GatewayとLambdaを使用した同期呼び出しでシステムエラーが発生した際に、CloudWatchに出力されたエラーログを契機として、システム管理者へ通知を行うファンクション実装を含めて解説しました。

次回は、システムエラーが発生した場合に今回実装したファンクションが呼び出されるように、CloudWatch Logsにサブスクリプションの設定を行い、実際にMattermostへメッセージが通知されることを確認してみます。

著者紹介


川畑 光平(KAWABATA Kohei) - NTTデータ
エグゼクティブ ITスペシャリスト ソフトウェアアーキテクト・デジタルテクノロジーストラテジスト(クラウド)

金融機関システム業務アプリケーション開発・システム基盤担当、ソフトウェア開発自動化関連の研究開発を経て、デジタル技術関連の研究開発・推進に従事。

Red Hat Certified Engineer、Pivotal Certified Spring Professional、AWS Certified Solutions Architect Professional等の資格を持ち、アプリケーション基盤・クラウドなど様々な開発プロジェクト支援にも携わる。AWS Top Engineers & Ambassadors選出。

本連載の内容に対するご意見・ご質問は Facebook まで。