SELECTSELECT

SELECT

Snowflake Tasksのエラー通知

By Tomáš SobotíkSep 26, 20238 min read

このページはEnglishDeutschEspañolFrançaisItalianoPortuguêsでもご覧いただけます。

はじめに

Snowflake Tasksは、データベース関連の処理を自動化するための機能で、完全自動化されたデータパイプラインの構築にも使えます。TasksではシンプルなSQLコマンドを実行できるほか、指定した時刻にスケジュール実行することも可能です。複数のタスクを連結すれば、単一のルートタスクを起点としたDAG(有向非巡回グラフ)として複雑なデータパイプラインを表現できます。Snowpipeなどの機能と組み合わせれば、ロードから変換までを一気通貫で行うエンドツーエンドのデータパイプラインも実現できます。

どんなデータパイプラインであっても、稼働状況を監視し、途中でエラーが発生したら適切に対処できる仕組みが欠かせません。

以前の「Snowflakeの通知とアラート」に関する記事では、Snowflake標準の通知・アラート機能を使って検証チェックを設定し、SQLクエリの結果に応じてメール通知を受け取る方法を紹介しました。ただしこの方法には、通知の作成に手間がかかること、そしてSQLクエリを定期的に実行して結果を確認しない限りエラーをすぐに把握できず、その分のコンピュートコストも発生してしまうという難点があります。

そこで登場するのが、Snowflakeの新機能「Error Notifications for Tasks」です。これを使えば、Snowflake TaskやSnowpipeの失敗を自動かつ即座にユーザーへ通知できます。本記事では、SnowflakeでTasks / Snowpipe用のエラー通知を作成し、Slackチャンネルに連携する手順を解説します。

エラー通知のアーキテクチャ

まずは全体のアーキテクチャを示し、その後に各ステップを詳しく見ていきましょう。

Snowflake Error Notification Architecture

複数のタスクで構成されたDAGがあるとします。DAGのルートタスクに通知統合(notification integration)を割り当てておくと、DAG内のいずれかのタスクが失敗した際に、その通知統合がエラーを検知してSimple Notification Service(SNS)メッセージを送信します。AWS側には、SNSメッセージをトリガーとして起動するLambda関数を用意しておきます。Lambda関数はメッセージを処理し、Slack API経由でSlackチャンネルに送信します。データチームはそのSlackチャンネルを監視し、必要に応じて対応できます。

ステップ1:AWS SNSトピックを作成する

本記事の手順を進めるには、AWS管理者の協力を仰ぐか、AWS側で各種サービスを作成・管理できる権限が必要です。まずはAWSでSNSトピックを作成します。詳しい手順はAWS公式ドキュメントを参照してください。

ステップ2:AWS IAMポリシーを作成する

次に、対象のSNSトピックへのpublishを許可する新しいIAMポリシーを作成します。SNSトピックに対してsns:publishアクションを定義してください。手順は次のとおりです。

  1. AWSコンソールでIdentity & Access Management(IAM)を開きます。
  2. 左側のナビゲーションペインからAccount settingsを選択します。
  3. Security Token Service Regionsリストを展開し、ご利用のアカウントが存在するリージョンを探して、ステータスがInactiveの場合はActivateを選択します。
  4. 左側のナビゲーションペインからPoliciesを選択します。
  5. Create Policyをクリックします。
  6. JSONタブをクリックします。
  7. SNSトピックに対して実行可能なアクションを定義したポリシードキュメントを追加します。
1{

2    "Version": "2012-10-17",

3    "Statement": [\
\
4      {\
\
5         "Effect": "Allow",\
\
6         "Action": [\
\
7             "sns:Publish"\
\
8         ],\
\
9         "Resource": "<sns_topic_arn> created in previous step"\
\
10      }\
\
11    ]

12 }

ステップ3:AWS IAMロールを作成する

AWS側の最後のステップとして、IAMロールを作成します。すでにSnowflakeアカウントとAWSの連携用に使っているロールがある場合は、それを再利用して前のステップで作成したポリシーを追加するだけで構いません。新規にIAMロールを作成する場合は、以下の手順に従ってください。

  1. AWSのIdentity & Access Management(IAM)を開きます。
  2. 左側のナビゲーションペインからRolesを選択します。
  3. Create roleボタンをクリックします。
  4. 信頼されたエンティティのタイプとしてAnother AWS accountを選択します。
  5. Account ID欄に、ひとまずご自身のAWSアカウントIDを入力します。
  6. Require external IDオプションを選択します。このオプションにより、Amazonアカウントのリソース(SNSなど)へのアクセス権限を第三者(Snowflakeなど)に付与できるようになります。
  7. ここでは仮のID(例:0000)を入力しておきます。後ほど信頼関係を編集し、この仮IDを、ご自身のアカウント用に生成されたSnowflake IAMユーザーの外部IDに置き換えます。IAMロールの信頼ポリシーに設定する条件により、後で作成する通知統合オブジェクトを介してSnowflakeユーザーがこのロールを引き受けられるようになります。
  8. Nextボタンをクリックします。
  9. 先ほど作成したポリシーを探して選択します。
  10. Nextボタンをクリックします。
  11. ロールの名前と説明を入力し、Create roleボタンをクリックします。
  12. ロールのサマリーページに表示されるRole ARNの値を控えておきます。この値は以降のステップで使用します。

ステップ4:Snowflakeの通知統合を作成する

次に、Snowflake側で通知統合オブジェクトを作成します。これを最終的に、先ほどのサンプルDAGのルートタスクに割り当てます。

通知統合を作成する際には、先ほど用意したSNSトピックとIAMロールのAWS ARNが必要です。サンプルコードは次のとおりです。

CREATE NOTIFICATION INTEGRATION my_notif_integration
  ENABLED = true
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AWS_SNS
  DIRECTION = OUTBOUND
  AWS_SNS_TOPIC_ARN = 'your SNS topic'
  AWS_SNS_ROLE_ARN = 'your role;

ステップ5:SnowflakeにSNSトピックへのアクセス権を付与する

続いて、SnowflakeにSNSトピックへのアクセス権を付与します。まずDESC NOTIFICATION INTEGRATION my_notif_integrationを実行し、出力結果から次の2つの値を控えます。

  • SF_AWS_IAM_USER_ARN — ご自身のアカウント用に作成されたSnowflake IAMユーザーのARN
  • SF_AWS_EXTERNAL_ID — ご自身のアカウント用に作成されたSnowflake IAMユーザーの外部ID

設定の最後のステップは、IAMロールの信頼関係の編集です。AWSコンソールに戻り、対象のIAMロールを開いてTrust relationshipタブをクリックします。Edit trust relationshipボタンをクリックし、通知統合のDESCで取得した値を使ってポリシードキュメントを書き換えます。

1{

2  "Version": "2012-10-17",

3  "Statement": [\
\
4    {\
\
5      "Sid": "",\
\
6      "Effect": "Allow",\
\
7      "Principal": {\
\
8        "AWS": "<sf_aws_iam_user_arn value from above>"\
\
9      },\
\
10      "Action": "sts:AssumeRole",\
\
11      "Condition": {\
\
12        "StringEquals": {\
\
13          "sts:ExternalId": "<sf_aws_external_id from above>"\
\
14        }\
\
15      }\
```\
\
Expand Code\
\
The integration between Snowflake and AWS is now complete!\
\

ステップ6:Snowflake Taskを更新する\


あとは、この通知統合をDAGのルートタスクに割り当てるだけです。まだタスクを作成していない場合は、CREATE TASK文の中で直接指定できます。今回はすでにルートタスクが存在しているので、ALTER TASKコマンドを使います。

ALTER TASK <name> SET ERROR_INTEGRATION = <integration_name>;
\

ステップ7:SNSからSlackアラートを発火させる\


続いて、受信したSNSメッセージを処理してSlackに送信する仕組みを用意します。そのために、Slack APIを使ってメッセージを送信するLambda関数を作成しましょう。SNSトピックをLambda関数のトリガーに設定するので、新しいSNSメッセージが届くたびにLambdaが起動します。

Snowflake Error Notification Architecture with SNS and Lambda

Snowflakeから送信されるJSON文字列の例は次のとおりです。
\

1{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"s3://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}\
```\
\
And the formatted version:\
\
```json\
1{\
\
2   "version": "1.0",\
\
3   "messageId": "a62e34bc-6141-4e95-92d8-f04fe43b43f5",\
\
4   "messageType": "INGEST_FAILED_FILE",\
\
5   "timestamp": "2021-10-22T19:15:29.471Z",\
\
6   "accountName": "MYACCOUNT",\
\
7   "pipeName": "MYDB.MYSCHEMA.MYPIPE",\
\
8   "tableName": "MYDB.MYSCHEMA.MYTABLE",\
\
9   "stageLocation": "s3://mybucket/mypath",\
\
10   "messages": [\
\
11      {\
\
12         "fileName": "/file1.csv_0_0_0.csv.gz",\
\
13         "firstError": "Numeric value 'abc' is not recognized"\
\
14      }\
\
15   ]\
```\
\
Expand Code\
\
ペイロードの処理ロジックは自由に実装できます。必要な属性だけを抽出する、不足している情報を追加する、フォーマットを変更するなど、用途に応じて柔軟に書けます。ここではデモのため、受信したメッセージをそのままSlackに送る形にしましょう。\
\
まずは、メッセージにインデントを付けて読みやすく整形してみます。\
\
````python\
1def format_slack_message(message):\
\
2    json_message = {\
\
3        "blocks": [\
\
4            {\
\
5            "type": "section",\
\
6            "text": {\
\
7                "text": ":red_circle: *Snowflake pipeline failure:* Snowflake notification integration output :point_down: ",\
\
8                "type": "mrkdwn"\
\
9            }\
\
10            },\
\
11            {\
\
12            "type": "section",\
\
13            "text": {\
\
14                "text": "```" + json.dumps(message,indent=2, separators=(',', ': ')) + "```",\
\
15                "type": "mrkdwn"\
````\
\
Expand Code\
\
続いて、Slackに接続してメッセージを送信します。\
\
```python\
1def lambda_handler(event, context):\
\
2    if event:\
\
3        message = format_slack_message(event)\
\
4\
\
5        if message:\
\
6            logging.info('Starting sending message to slack')\
\
7            response = requests.post(\
\
8                my_webhook_uri, data=json.dumps(message),\
\
9                headers={'Content-Type': 'application/json'}\
\
10            )\
\
11            logging.info(response.text)\
\
12            logging.info('Finished sending message to Slack webhook')\
\
13             if response.status_code != 200:\
\
14                raise ValueError(\
\
15                    'Request to slack returned an error %s, the response is:\n%s'\
```\
\
Expand Code\
\
![Snowflake Error Notification message in Slack](https://select.dev/cdn-cgi/imagedelivery/1zmOcgV1p520E4lLTrYjjg/blog/error-notifications-for-snowflake-tasks/3.png/width=1366,quality=75)\
\
以上で完成です。Snowflakeの通知統合といくつかのAWSサービスを組み合わせるだけで、Snowflake TasksとSlackをつなぐエラー通知パイプラインを構築できました。\
\

Snowpipeのエラー通知\


まったく同じ仕組みで、Snowpipeの失敗時にも自動で通知を受け取れます。pipeオブジェクトを作成または変更する際にerror_integrationパラメータを設定するだけです。
\

1CREATE PIPE mypipe\
\
2    ERROR_INTEGRATION = '<integration_name>'\
\
3    AS\
\
4    COPY INTO mytable FROM @mystage\
\
5;\
```\
\
\
Tomáš Sobotík・Senior Data Engineer & Snowflake SME at Norlys\
\
Tomas氏は、長年にわたりSnowflake Data SuperHeroとして活躍する、Snowflake分野のスペシャリストです。データ業界で10年以上の経験を持ち、さまざまな業界・技術にまたがる多数のプロジェクトでSnowflakeのデータエンジニア、アーキテクト、管理者を務めてきました。コミュニティの中心メンバーとして積極的に知見を発信し、多くの人にインスピレーションを与えています。またO'Reillyのインストラクターとして、オンラインのライブトレーニングも担当しています。\