Snowflakeはアラート機能を拡張し、メールやWebhook経由での通知に対応しました。なかでもWebhookは強力で、Slack、Microsoft Teams、PagerDutyなどへ直接アラートを飛ばせます。パイプラインの障害検知やデータドリブンなアラートなど、活用できる場面は数多くあります。
本記事では、SnowflakeのアラートをSlackへ送るための設定手順をステップごとに紹介します。具体例として、ウェアハウスの使用量が急増した際にアラートを通知する仕組みを構築していきます。
ステップ1: SlackでWebhookを作成する
Slackにログインして、こちらのページを開きます。
「Create an App」をクリックします。
「From Scratch」を選びます。
次の画面でアプリ名を入力し、ワークスペースを選んで「Create App」をクリックします。
続いて、アプリの認証情報が表示される画面に切り替わります。
左サイドバーの「Incoming Webhooks」をクリックします。
「Active Incoming Webhooks」を「On」に切り替えます。
切り替えたら、ページ下部までスクロールして「Add New Webhook to Workspace」をクリックします。
Snowflakeからのアラートを受け取るチャンネルを選択します。今回は「snowflake-alerts」というチャンネルを新規作成し、ページを再読み込みしてドロップダウンに表示させました。
「Allow」をクリックします。
Webhookの動作確認
「Allow」をクリックすると、Webhook URLとテスト用のcURLコマンドが表示されます。
このコマンドをコピーしてターミナルに貼り付け、実行します。Windowsの場合はGit Bashを使ってください。
作成したばかりのSlackチャンネルに、すぐにメッセージが届くはずです。
これでWebhookが正しく動くようになりました。
(任意)アプリのアイコンをSnowflake ❄️ 風に整える
- 「Basic Information」タブに戻ります。
- ページ下部までスクロールします。
- Snowflakeのロゴをダウンロードして、ここにアップロードします。
- もう一度テストしてみます。
これで見栄えがぐっと良くなりました。
ステップ2: SnowflakeでSecretを作成する
Webhook URLにはシークレット文字列が含まれています。下記の文字列をご自身のものに置き換えたうえで、Snowflakeで次のコマンドを実行してください。
注意: SecretはスキーマレベルのオブジェクトのためSnowflakeのワークシート上のデータベース・スキーマコンテキストに気を付けるか、完全修飾名を使用してください。
今回は analytics データベースの public スキーマに gmds_slack_secret という名前で作成しました。
use schmea analytics.public;
CREATE OR REPLACE SECRET gmds_slack_secret
TYPE = GENERIC_STRING
SECRET_STRING = 'T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX';
ステップ3: Webhook用の通知連携(Notification Integration)を作成する
Secretを作成できたので、次はNotification Integrationを作ります。
CREATE OR REPLACE NOTIFICATION INTEGRATION gmds_slack_webhook_integration
TYPE=WEBHOOK
ENABLED=TRUE
WEBHOOK_URL='https://hooks.slack.com/services/SNOWFLAKE_WEBHOOK_SECRET'
WEBHOOK_SECRET=analytics.public.gmds_slack_secret
WEBHOOK_BODY_TEMPLATE='{"text": "SNOWFLAKE_WEBHOOK_MESSAGE"}'
WEBHOOK_HEADERS=('Content-Type'='application/json');
ステップ4: 通知を送信する
通知の送信には、組み込みのストアドプロシージャ SYSTEM$SEND_SNOWFLAKE_NOTIFICATION を使います。プレースホルダー(SNOWFLAKE_WEBHOOK_SECRET など)をメッセージから取り除くため、プロシージャに SANITIZE_WEBHOOK_CONTENT 関数を渡します。
実際に私のアカウントで実行したコードがこちらです。
CREATE OR REPLACE NOTIFICATION INTEGRATION gmds_slack_webhook_integration
TYPE=WEBHOOK
ENABLED=TRUE
WEBHOOK_URL='https://hooks.slack.com/services/SNOWFLAKE_WEBHOOK_SECRET'
WEBHOOK_SECRET=analytics.public.gmds_slack_secret
WEBHOOK_BODY_TEMPLATE='{"text": "SNOWFLAKE_WEBHOOK_MESSAGE"}'
WEBHOOK_HEADERS=('Content-Type'='application/json');
Slackに通知が即座に届きました。
基本的な部品が揃ったので、いよいよ実践的なユースケースに進みましょう。
アラート活用例: ウェアハウス使用量の急増を通知する
使用量の急増を検知するSQLクエリを書き、サーバーレスタスクに組み込む
次のクエリは、直前1時間の使用量と過去1か月における各ウェアハウスの1時間あたり平均使用量を比較し、使用量が50%以上増えたウェアハウスをフラグ付けします。
サーバーレスタスクを使うのはコスト削減のためです。ウェアハウス名を指定しないだけで、タスクをサーバーレスにできます。
説明のため、タスクの実行ごとに必ず1行とアラートが生成されるよう、ダミーレコードをUNIONで結合しておきます。
CREATE OR REPLACE TASK monitor_warehouse_spikes
SCHEDULE = 'USING CRON 2 * * * * America/New_York'
SERVERLESS_TASK_MIN_STATEMENT_SIZE = 'XSMALL'
SERVERLESS_TASK_MAX_STATEMENT_SIZE = 'XSMALL'
as
insert into usage_spike_alerts (warehouse_name,last_hour_credits,avg_monthly_credits,credit_diff,percent_increase)
WITH last_hour_usage AS (
SELECT
warehouse_name,
sum(credits_used) AS last_hour_credits
FROM
snowflake.account_usage.warehouse_metering_history
WHERE
start_time >= DATEADD(hour, -2, CURRENT_TIMESTAMP)
AND end_time <= CURRENT_TIMESTAMP
コードを展開
クエリ結果を保存するテーブルを作成する
CREATE or replace TABLE usage_spike_alerts (
alert_id INT AUTOINCREMENT PRIMARY KEY,
warehouse_name STRING NOT NULL,
last_hour_credits FLOAT NOT NULL,
avg_monthly_credits FLOAT NOT NULL,
credit_diff FLOAT NOT NULL,
percent_increase FLOAT NOT NULL,
inserted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
alert_sent boolean default false
);
このテーブルには、クエリ結果には現れないデフォルト値付きの補助カラムがいくつかあります。
- alert_id: 主キー
- inserted_at: タスクがレコードを挿入したタイミングを記録
- alert_sent: 初期値はfalse。アラート送信後にtrueへ更新
タスクを実行して結果を確認する
execute task monitor_warehouse_spikes;
select * from usage_spike_alerts where not alert_sent;
急増を検知したらアラートを送るプロシージャを作成する
ここで実現したいことは次のとおりです。
usage_spike_alertsテーブルから未送信レコード(alert_sent==false)を抽出する- 未送信レコードがあればSlackへ送信する
- 送信済みのレコードをマークする
- 送信したアラート数をSQLコンソールに出力する
CREATE OR REPLACE PROCEDURE send_usage_spike_alerts()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'send_alerts'
AS $$
import snowflake.snowpark as snowpark
def send_alerts(session):
query = """
SELECT warehouse_name, last_hour_credits, avg_monthly_credits, credit_diff, percent_increase
FROM usage_spike_alerts
WHERE alert_sent = FALSE
"""
コードを展開
改行 \n を SANITIZE_WEBHOOK_CONTENT 関数経由でSlackへ送る際にうまくいかない問題がありました。Slackはダブルバックスラッシュ表記の \n を含むテキストリテラルを要求するため、Python側の文字列としては \\n となるはずです。しかし、PythonからこのリテラルをSQL関数に渡すと何らかの理由でアラートが送られませんでした。手動実行の SANITIZE_WEBHOOK_CONTENT('line1\line2') は動くものの、Python経由で改行付きアラートを送ることはできませんでした。
プロシージャを手動で動作確認する
すでに「spikes」テーブルにテスト用のレコードが挿入済みです。プロシージャを呼び出して、以下を確認しましょう。
- レコードが
alert_sent==trueに更新される - Slack側にメッセージが届く
execute task monitor_warehouse_spikes; -- if you haven't already...
select * from usage_spike_alerts where not alert_sent; -- review it: sent == false
CALL send_usage_spike_alerts(); -- send the alert
-- wait for the alert to come
select * from usage_spike_alerts where not alert_sent; -- 0 rows
Slackにアラートが届いているはずです。
タスクとプロシージャを連結する
monitor_warehouse_spikes タスクの毎時スケジュールで一連の処理を完結させたいので、monitor_warehouse_spikes の実行後にアラート送信プロシージャも自動で走るようにします。
まず、プロシージャをサーバーレスタスクでラップして起動します。
CREATE TASK send_usage_spike_alerts_task
AS
CALL send_usage_spike_alerts();
alter task send_usage_spike_alerts_task resume;
次に、タスク同士を連結します。
ALTER TASK send_usage_spike_alerts_task
ADD AFTER monitor_warehouse_spikes;
動作を確認します。
1execute task monitor_warehouse_spikes;
これでタスクが新しいレコードをテーブルに追加し、Slackへもアラートを送ってくれます。
send_usage_spike_alerts() を直接呼び出すとSlackにはすぐ表示されますが、send_usage_spike_alerts_task を monitor_warehouse_spikes に連結した場合は、アラート到達まで最大3分ほどかかります。
タスクのDAGと実行履歴を確認する
SnowsightのサイドバーにあるDataメニュー(データベースアイコン)から、対象のタスクを含むデータベースとスキーマに移動します。タスクのコンテナを展開して、作成したタスクのいずれかを選び、メイン画面の「Graph」タブを開きます。ここからタスクを手動で実行することもできます。
「Run History」タブからは実行履歴も確認できます。
設計上のポイント
急増チェックのクエリとアラート送信は、1つのタスクにまとめることもできます。タスクを2つに分けて連結するのは、過剰な複雑化に見えるかもしれません。それでも私が今回の構成を好む理由は次のとおりです。
- 急増の記録がテーブルに残り続けます。アラートを送るだけなら必須ではありませんが、後から振り返れるのは大きなメリットです。
- 急増をテーブルに書き出す処理は独立した1つの仕事なので、専用タスクとして切り出すのが自然です。
- 急増の検知ロジックとアラート送信を分離することで、ロジックを変更してもアラート送信機能をテストし直す必要がなくなります。
おわりに
SnowflakeのアラートをSlackへ送る仕組みは非常に便利で、活用シーンは無限に広がります。
本記事では、以下のポイントを押さえました。
- SlackでWebhookを作成する
- SnowflakeでSecretを作成する
- SnowflakeでWebhook Integrationを作成する
- ウェアハウス使用量の急増を記録するタスクを作成する
- 条件(今回はクエリ結果に行が存在すること)に応じてアラートを送るプロシージャを作成する
- プロシージャをタスクでラップする
- タスク同士を連結する
みなさんがどんなユースケースを生み出すのか、楽しみにしています! 🥂
Jeffはデータ&アナリティクスのコンサルタントで、インサイトの自動化やデータドリブンな業務プロセス制御に15年以上携わってきました。技術的にはSnowflake + dbt + Tableauを得意とし、業務領域では公益事業、臨床試験、出版、CPG、製造業での経験があります。お気軽にご連絡ください: [email protected]。