はじめに
Snowflake Tasksは、データベース関連の処理を自動化するための機能で、完全自動化されたデータパイプラインの構築にも使えます。TasksではシンプルなSQLコマンドを実行できるほか、指定した時刻にスケジュール実行することも可能です。複数のタスクを連結すれば、単一のルートタスクを起点としたDAG(有向非巡回グラフ)として複雑なデータパイプラインを表現できます。Snowpipeなどの機能と組み合わせれば、ロードから変換までを一気通貫で行うエンドツーエンドのデータパイプラインも実現できます。
どんなデータパイプラインであっても、稼働状況を監視し、途中でエラーが発生したら適切に対処できる仕組みが欠かせません。
以前の「Snowflakeの通知とアラート」に関する記事では、Snowflake標準の通知・アラート機能を使って検証チェックを設定し、SQLクエリの結果に応じてメール通知を受け取る方法を紹介しました。ただしこの方法には、通知の作成に手間がかかること、そしてSQLクエリを定期的に実行して結果を確認しない限りエラーをすぐに把握できず、その分のコンピュートコストも発生してしまうという難点があります。
そこで登場するのが、Snowflakeの新機能「Error Notifications for Tasks」です。これを使えば、Snowflake TaskやSnowpipeの失敗を自動かつ即座にユーザーへ通知できます。本記事では、SnowflakeでTasks / Snowpipe用のエラー通知を作成し、Slackチャンネルに連携する手順を解説します。
エラー通知のアーキテクチャ
まずは全体のアーキテクチャを示し、その後に各ステップを詳しく見ていきましょう。

複数のタスクで構成されたDAGがあるとします。DAGのルートタスクに通知統合(notification integration)を割り当てておくと、DAG内のいずれかのタスクが失敗した際に、その通知統合がエラーを検知してSimple Notification Service(SNS)メッセージを送信します。AWS側には、SNSメッセージをトリガーとして起動するLambda関数を用意しておきます。Lambda関数はメッセージを処理し、Slack API経由でSlackチャンネルに送信します。データチームはそのSlackチャンネルを監視し、必要に応じて対応できます。
ステップ1:AWS SNSトピックを作成する
本記事の手順を進めるには、AWS管理者の協力を仰ぐか、AWS側で各種サービスを作成・管理できる権限が必要です。まずはAWSでSNSトピックを作成します。詳しい手順はAWS公式ドキュメントを参照してください。
ステップ2:AWS IAMポリシーを作成する
次に、対象のSNSトピックへのpublishを許可する新しいIAMポリシーを作成します。SNSトピックに対してsns:publishアクションを定義してください。手順は次のとおりです。
- AWSコンソールでIdentity & Access Management(IAM)を開きます。
- 左側のナビゲーションペインからAccount settingsを選択します。
- Security Token Service Regionsリストを展開し、ご利用のアカウントが存在するリージョンを探して、ステータスがInactiveの場合はActivateを選択します。
- 左側のナビゲーションペインからPoliciesを選択します。
- Create Policyをクリックします。
- JSONタブをクリックします。
- SNSトピックに対して実行可能なアクションを定義したポリシードキュメントを追加します。
1{
2 "Version": "2012-10-17",
3 "Statement": [\
\
4 {\
\
5 "Effect": "Allow",\
\
6 "Action": [\
\
7 "sns:Publish"\
\
8 ],\
\
9 "Resource": "<sns_topic_arn> created in previous step"\
\
10 }\
\
11 ]
12 }
ステップ3:AWS IAMロールを作成する
AWS側の最後のステップとして、IAMロールを作成します。すでにSnowflakeアカウントとAWSの連携用に使っているロールがある場合は、それを再利用して前のステップで作成したポリシーを追加するだけで構いません。新規にIAMロールを作成する場合は、以下の手順に従ってください。
- AWSのIdentity & Access Management(IAM)を開きます。
- 左側のナビゲーションペインからRolesを選択します。
- Create roleボタンをクリックします。
- 信頼されたエンティティのタイプとしてAnother AWS accountを選択します。
- Account ID欄に、ひとまずご自身のAWSアカウントIDを入力します。
- Require external IDオプションを選択します。このオプションにより、Amazonアカウントのリソース(SNSなど)へのアクセス権限を第三者(Snowflakeなど)に付与できるようになります。
- ここでは仮のID(例:0000)を入力しておきます。後ほど信頼関係を編集し、この仮IDを、ご自身のアカウント用に生成されたSnowflake IAMユーザーの外部IDに置き換えます。IAMロールの信頼ポリシーに設定する条件により、後で作成する通知統合オブジェクトを介してSnowflakeユーザーがこのロールを引き受けられるようになります。
- Nextボタンをクリックします。
- 先ほど作成したポリシーを探して選択します。
- Nextボタンをクリックします。
- ロールの名前と説明を入力し、Create roleボタンをクリックします。
- ロールのサマリーページに表示されるRole ARNの値を控えておきます。この値は以降のステップで使用します。
ステップ4:Snowflakeの通知統合を作成する
次に、Snowflake側で通知統合オブジェクトを作成します。これを最終的に、先ほどのサンプルDAGのルートタスクに割り当てます。
通知統合を作成する際には、先ほど用意したSNSトピックとIAMロールのAWS ARNが必要です。サンプルコードは次のとおりです。
CREATE NOTIFICATION INTEGRATION my_notif_integration
ENABLED = true
TYPE = QUEUE
NOTIFICATION_PROVIDER = AWS_SNS
DIRECTION = OUTBOUND
AWS_SNS_TOPIC_ARN = 'your SNS topic'
AWS_SNS_ROLE_ARN = 'your role;
ステップ5:SnowflakeにSNSトピックへのアクセス権を付与する
続いて、SnowflakeにSNSトピックへのアクセス権を付与します。まずDESC NOTIFICATION INTEGRATION my_notif_integrationを実行し、出力結果から次の2つの値を控えます。
- SF_AWS_IAM_USER_ARN — ご自身のアカウント用に作成されたSnowflake IAMユーザーのARN
- SF_AWS_EXTERNAL_ID — ご自身のアカウント用に作成されたSnowflake IAMユーザーの外部ID
設定の最後のステップは、IAMロールの信頼関係の編集です。AWSコンソールに戻り、対象のIAMロールを開いてTrust relationshipタブをクリックします。Edit trust relationshipボタンをクリックし、通知統合のDESCで取得した値を使ってポリシードキュメントを書き換えます。
1{
2 "Version": "2012-10-17",
3 "Statement": [\
\
4 {\
\
5 "Sid": "",\
\
6 "Effect": "Allow",\
\
7 "Principal": {\
\
8 "AWS": "<sf_aws_iam_user_arn value from above>"\
\
9 },\
\
10 "Action": "sts:AssumeRole",\
\
11 "Condition": {\
\
12 "StringEquals": {\
\
13 "sts:ExternalId": "<sf_aws_external_id from above>"\
\
14 }\
\
15 }\
```\
\
Expand Code\
\
The integration between Snowflake and AWS is now complete!\
\
ステップ6:Snowflake Taskを更新する\
あとは、この通知統合をDAGのルートタスクに割り当てるだけです。まだタスクを作成していない場合は、CREATE TASK文の中で直接指定できます。今回はすでにルートタスクが存在しているので、ALTER TASKコマンドを使います。ALTER TASK <name> SET ERROR_INTEGRATION = <integration_name>;
\
ステップ7:SNSからSlackアラートを発火させる\
続いて、受信したSNSメッセージを処理してSlackに送信する仕組みを用意します。そのために、Slack APIを使ってメッセージを送信するLambda関数を作成しましょう。SNSトピックをLambda関数のトリガーに設定するので、新しいSNSメッセージが届くたびにLambdaが起動します。
Snowflakeから送信されるJSON文字列の例は次のとおりです。
\
1{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"s3://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}\
```\
\
And the formatted version:\
\
```json\
1{\
\
2 "version": "1.0",\
\
3 "messageId": "a62e34bc-6141-4e95-92d8-f04fe43b43f5",\
\
4 "messageType": "INGEST_FAILED_FILE",\
\
5 "timestamp": "2021-10-22T19:15:29.471Z",\
\
6 "accountName": "MYACCOUNT",\
\
7 "pipeName": "MYDB.MYSCHEMA.MYPIPE",\
\
8 "tableName": "MYDB.MYSCHEMA.MYTABLE",\
\
9 "stageLocation": "s3://mybucket/mypath",\
\
10 "messages": [\
\
11 {\
\
12 "fileName": "/file1.csv_0_0_0.csv.gz",\
\
13 "firstError": "Numeric value 'abc' is not recognized"\
\
14 }\
\
15 ]\
```\
\
Expand Code\
\
ペイロードの処理ロジックは自由に実装できます。必要な属性だけを抽出する、不足している情報を追加する、フォーマットを変更するなど、用途に応じて柔軟に書けます。ここではデモのため、受信したメッセージをそのままSlackに送る形にしましょう。\
\
まずは、メッセージにインデントを付けて読みやすく整形してみます。\
\
````python\
1def format_slack_message(message):\
\
2 json_message = {\
\
3 "blocks": [\
\
4 {\
\
5 "type": "section",\
\
6 "text": {\
\
7 "text": ":red_circle: *Snowflake pipeline failure:* Snowflake notification integration output :point_down: ",\
\
8 "type": "mrkdwn"\
\
9 }\
\
10 },\
\
11 {\
\
12 "type": "section",\
\
13 "text": {\
\
14 "text": "```" + json.dumps(message,indent=2, separators=(',', ': ')) + "```",\
\
15 "type": "mrkdwn"\
````\
\
Expand Code\
\
続いて、Slackに接続してメッセージを送信します。\
\
```python\
1def lambda_handler(event, context):\
\
2 if event:\
\
3 message = format_slack_message(event)\
\
4\
\
5 if message:\
\
6 logging.info('Starting sending message to slack')\
\
7 response = requests.post(\
\
8 my_webhook_uri, data=json.dumps(message),\
\
9 headers={'Content-Type': 'application/json'}\
\
10 )\
\
11 logging.info(response.text)\
\
12 logging.info('Finished sending message to Slack webhook')\
\
13 if response.status_code != 200:\
\
14 raise ValueError(\
\
15 'Request to slack returned an error %s, the response is:\n%s'\
```\
\
Expand Code\
\
\
\
以上で完成です。Snowflakeの通知統合といくつかのAWSサービスを組み合わせるだけで、Snowflake TasksとSlackをつなぐエラー通知パイプラインを構築できました。\
\
Snowpipeのエラー通知\
まったく同じ仕組みで、Snowpipeの失敗時にも自動で通知を受け取れます。pipeオブジェクトを作成または変更する際にerror_integrationパラメータを設定するだけです。
\
1CREATE PIPE mypipe\
\
2 ERROR_INTEGRATION = '<integration_name>'\
\
3 AS\
\
4 COPY INTO mytable FROM @mystage\
\
5;\
```\
\
\
Tomáš Sobotík・Senior Data Engineer & Snowflake SME at Norlys\
\
Tomas氏は、長年にわたりSnowflake Data SuperHeroとして活躍する、Snowflake分野のスペシャリストです。データ業界で10年以上の経験を持ち、さまざまな業界・技術にまたがる多数のプロジェクトでSnowflakeのデータエンジニア、アーキテクト、管理者を務めてきました。コミュニティの中心メンバーとして積極的に知見を発信し、多くの人にインスピレーションを与えています。またO'Reillyのインストラクターとして、オンラインのライブトレーニングも担当しています。\