エンジニアのTHです。 eviryではkamui tracker(以下 KT)の開発をしています。
KTでは日々様々なバッチ処理を実行していますが、バッチの実行状況の確認は毎日手動(DBの状態を確認する)で行なっている状態でした。
手動ではどうしても確認忘れや手間がかかるなどの問題があったため、今回、AWSサービスの勉強もかねてLambda, S3, SNSなどを用いて自動的に監視する仕組みを導入しました。
概要
KTのバッチタスクの管理はDB(RDS)で行なっているため、バッチの稼働状況を知るためにはDBのタスク管理テーブルのデータをチェックする必要があります。
今回作成したシステムは下図のような構成になります。
仕組みとしては、Lambda + S3でRDSからバッチタスクの状態を取得・正常に稼働しているかどうかの判定などを行い、正常に稼働していない場合はSNSを通じて別のLambdaをキックし、Slackへ通知するというものです。
システム構成
処理の流れ
- CloudWatch Eventsを使って定期的にLambda(VPC)関数を実行
- キックされたLambda関数で、以下を実行
- DBのタスク管理テーブルからバッチの稼働状況データを取得
- S3から前回の稼働状況データを取得し、最新の稼働状況データと比較してバッチ全体の稼働状況を判定
- 稼働状況が異常な場合はSNSに通知を発行
- 最新の稼働状況をS3に保存
- SNSに通知が来た場合、別のLambdaがサブスクライブするので、2つめのLambda(非VPC)が実行される
- SlackのIncoming WebhookでLambdaからSlackに通知メッセージを投稿
実装
あらかじめ、Slackにメッセージを送信するためのIncoming Webhookを設定し、Incoming Webhook URLを取得しておきます。
(設定方法はここでは割愛します)
1. Slackへメッセージを投稿するLambdaを作成
まずはバッチ実行状況に異常があった場合に、Slackへメッセージを投稿するLambdaを作成します。 構成図ではVPCの外側にあるLambdaです。
Lambda > 関数 > 関数の作成、から新規にLambdaを作成
作成したLambdaの設定画面で、ネットワーク設定を非VPCにします。
このLambdaはSlackへの投稿のためインターネットアクセスが必要になるので、非VPCに置く必要があります。
LambdaをVPC内に作成してしまうと通常は(NATゲートウェイなどを用意しない場合は)インターネットアクセスができなくなるため注意が必要です。
Slackへメッセージを投稿する処理を実装します。
const axios = require('axios'); exports.handler = async (event) => { await axios.post(process.env.SLACK_URL, { text: "<!channel> メッセージ本文" }).then(function(data) { console.log(data); }) };
2. SNSの設定
次に、異常があった場合に通知を送るためにSNSを設定していきます。
トピックの作成
まずはメッセージ発行先となるトピックを作成します。
サブスクリプションの作成
次に、トピックに発行されたメッセージを受け取る側(サブスクリプション)を設定します。
作成したトピック > サブスクリプション > サブスクリプションの作成をクリック。
プロトコルにAWS Lambdaを選択します。
通知は先ほど作成したLambda関数で受け取りたいので、エンドポイントから先ほど作成したLambdaのARNを選択し、サブスクリプションの作成をします。
ステータスが確認済みとなっていればOKです。
3. バッチ稼働状況チェックをするLambdaを作成
一番重要な、RDSからバッチ実行状態の取得・実行状況の判定をするLambda処理を作成していきます。 このLambdaはVPC外からアクセスする必要はないため、プライベートサブネットに作成します。
ロールの作成
このLambdaはVPC内のリソース(RDS)へのアクセスと、S3・SNSのアクセスが必要になるため、まずLambdaの実行ロールを作成します。
IAM > ロール > ロールの作成から、
ロールはLambdaに付与するので、Lambdaを選択します。
次にアタッチする権限ポリシーを選択します。 今回は管理ポリシーで以下の4ポシリーを付与しました。
AWSLambdaVPCAccessExecutionRoleは、LambdaからVPCリソースへアクセスするために必要になるものです。
LambdaRoleというロールが作成されました。
Lambda関数の作成
次にLambdaの処理を作成します。 先ほどと同じようにまずはLambda関数を作成します。
作成後、以下のようにLambdaを設定します。
まずRoleは先ほど作成したLambdaRoleを選択します。
次にネットワークですが、VPC内のRDSにアクセスするため、このLambdaもVPC内に作成します。
以下のようにRDSと同じVPCを設定し、サブネットを選択します。
また、RDSへのアクセスはセキュリティグループで制限されているため、アクセス可能なセキュリティグループも設定しました。
設定が終わったら、処理を書いていきます。 (詳細は割愛します。)
const mysql = require('mysql'); const util = require('util'); const moment = require('moment'); const aws = require('aws-sdk'); const s3 = new aws.S3(); const sns = new aws.SNS(); const fileKey = '<key to s3 file>'; const params = { Bucket: process.env.BUCKET, Key: fileKey }; const uploadData = (body) => { return { Bucket: process.env.BUCKET, Key: fileKey, Body: JSON.stringify(body), ContentType: 'application/json' } }; # slackへ通知を飛ばす # 実際にはSNSのトピックへメッセージを発行する const alertToSlack = async () => { let params = { Message: "<Message>", TopicArn: process.env.SNS_ARN }; await sns.publish(params).promise().then((data) => { console.log("MessageID is " + data.MessageId); }).catch((err) => { console.error(err, err.stack); }) }; exports.handler = async (event) => { const pool = mysql.createPool({ host: process.env.HOST, port: 3306, database: process.env.DATABASE, user: process.env.USER, password: process.env.PASSWORD }) pool.query = util.promisify(pool.query) let rowResult = null; let currentTasks = {}; try { if ("バッチ処理が全て完了している") { console.log("Today's tasks are all finished"); pool.end(); return; } // 現在処理中のタスクをチェック currentTasks = await pool.query('SQL'); pool.end(); // 処理中がなければ異常なのでアラート if (rowResult.length === 0) { alertToSlack(); return; } } catch (err) { pool.end(); throw new Error(err) } // 前回のタスク実行状況をS3から取得 let prevTasks = {}; await s3.getObject(params).promise().then(function(data) { for (let item of JSON.parse(data.Body.toString())) { prevTasks[item.id] = item; } }, function(err) { console.log(err.errorType); }) // 前回タスクと現在タスクの比較 if (isTaskProcessed(prevTasks, currentTasks)) { console.log("OK") } else { await alertToSlack(); } // 現在処理中のタスク状況データをS3にアップロード await s3.putObject(uploadData(rowResult)).promise().then(function(data) { console.log(`File uploaded successfully. ${data.Location}`); }, function (err) { console.error("Upload failed", err); }); };
以上でVPC内のLambdaの実装・設定が完了しました。
しかし、このままではプライベートサブネットにインターネットアクセスがないため、プライベートサブネット内にあるLambdaからVPC外にあるS3, SNSにアクセスができません。
NATゲートウェイをプライベートサブネットに設定することでも対応できますが、今回はS3, SNSのAWSリソースにアクセスできれば良いため、VPCエンドポイントを利用しました。
VPCエンドポイントの設定
プライベートサブネットからS3, SNSへアクセスするため、サブネットにVPCエンドポイントを設定します。 VPC > エンドポイントから、エンドポイントの作成をクリックします。
まずは、S3のエンドポイントです。
AWSサービスの一覧から、S3を探します。リージョンごとにわかれているので、今回はcom.amazonaws.ap-northeast-1.s3
を選択します。
次にエンドポイントを設定したいVPCとサブネット(正確にはサブネットに紐づけられたルートテーブル)を選択します。
Lambdaを作成したVPCとプライベートサブネットを選択。
最後にエンドポイントの作成を押せば完了です。
SNSのVPCエンドポイントも同様に作成します。
エンドポイントの一覧画面で、作成した2つのエンドポイントのステータスが使用可能
に変われば、設定は完了です。
4.CloudWatch Eventsの設定
以上の手順でバッチ実行状況のチェック、Slackへの通知などの処理は完成したので、最後に定期的にチェックを行うようにCloudWatch EventsでLambdaのトリガーを作成します。
CloudWatch > イベント > 今すぐ始める からイベントを作成します。
ルールの作成画面で、イベントのトリガーとターゲットの設定をします。
今回は1時間に一回、定期的に実行するのでスケジュールを選択し、Cron式で設定します。
また、このイベントで、先ほど作成したLambda関数を実行させたいので、ターゲットで対象のLambda関数名を選択し、詳細の設定をクリック。
名前や説明を入力して、ルールの作成。
これで、Lambdaが定期的に実行されるようになりました。
まとめ
- VPC内のリソースにアクセスするためにはVPC Lambdaにする必要があるが、そのままではLambdaからインターネットへのアクセスができなくなる
- VPC LambdaからS3やSNSにアクセスするためには、Role・セキュリティグループ・VPCエンドポイント(or NATゲートウェイ)全てが正しく設定されていないとうまくいかない。(S3やSNSへのアクセスがタイムアウトになるので、何が原因かわかりにくかった)