Snowflakeはアラート機能を拡張し、メールまたはWebhook経由でアラートを送信できるようになりました。特にWebhookアラートは強力で、SlackやMicrosoft Teamsへ通知を飛ばせるようになります。パイプライン障害の検知やデータドリブンな通知など、活用シーンは多岐にわたります。
本記事では、Microsoft TeamsにSnowflakeアラートを連携する手順をステップごとに解説します。
クイックスタート
ステップ1: TeamsでWebhookを作成する
Teamsで新しいチャネルを作成するか、アラートを受け取りたいチャネルを開きます。
チャネル右上の「…」(3点メニュー)をクリックし、「チャネルの管理」を選びます。
「設定」タブの「コネクタ」セクションを開き、「編集」をクリックします。
コネクタの一覧が表示されるので、「Webhook」を検索し「追加」をクリックします。
次の画面では、Webhookに表示する画像をアップロードできます。今回はSnowflakeのロゴを使ってみます。名前を入力したら「作成」をクリックしましょう。
「作成」をクリックするとWebhook URLが発行されます。このURLにはシークレットが含まれているため、取り扱いには十分注意してください。ひとまずコピーしてメモ帳などに貼り付けておきましょう。
Webhookをテストする
次のCURLコマンドでWebhookの動作を確認できます。Windows環境ではGit Bashをお使いください。
curl -X POST https://paste-your-url-with-secret-here \
-H "Content-Type: application/json" \
-d '{"text": "Hello, world"}'
すぐにTeamsのチャネルへ「Hello, world」というメッセージが届くはずです。
これでWebhookは正常に動作しています!
ステップ2: Snowflakeでシークレットを作成する
Webhook URLにはシークレットが含まれています。下記のシークレット文字列をご自身のものに置き換え、Snowflakeで実行してください。
注意点: シークレットはスキーマレベルのオブジェクトです。ワークシートのデータベース・スキーマのコンテキストに気を配るか、完全修飾名で指定するようにしてください。
今回はanalyticsデータベースのpublicスキーマにgmds_teams_secretというシークレットを作成しました。
use schmea <database>.<schema>;
CREATE OR REPLACE SECRET gmds_teams_secret
TYPE = GENERIC_STRING
SECRET_STRING = 'this-is-the-secret';
念のため補足すると、シークレットは次のURIの末尾部分です: https://org-name.webhook.office.com/webhookb2/webhook-id/IncomingWebhook/this-is-the-secret
ステップ3: Webhook通知連携を作成する
シークレットが作成できたら、次は通知連携(Notification Integration)を作成します。
CREATE OR REPLACE NOTIFICATION INTEGRATION gmds_teams_webhook_integration
TYPE=WEBHOOK
ENABLED=TRUE
WEBHOOK_URL='https://org-name/webhook.office.com/webhookb2/webhook-id/IncomingWebhook/SNOWFLAKE_WEBHOOK_SECRET'
WEBHOOK_SECRET=analytics.public.gmds_teams_secret
WEBHOOK_BODY_TEMPLATE='{"text": "SNOWFLAKE_WEBHOOK_MESSAGE"}'
WEBHOOK_HEADERS=('Content-Type'='application/json');
ステップ4: 通知を送信する
通知の送信には、組み込みのストアドプロシージャSYSTEM$SEND_SNOWFLAKE_NOTIFICATIONを使います。メッセージからプレースホルダ(例: SNOWFLAKE_WEBHOOK_SECRET)を取り除くために、SANITIZE_WEBHOOK_CONTENT関数を渡す必要があります。
実際にアカウントで実行したコードは以下のとおりです。
CALL SYSTEM$SEND_SNOWFLAKE_NOTIFICATION(
SNOWFLAKE.NOTIFICATION.TEXT_PLAIN(
SNOWFLAKE.NOTIFICATION.SANITIZE_WEBHOOK_CONTENT('This is a test Teams Alert from my Snowflake Account')
),
SNOWFLAKE.NOTIFICATION.INTEGRATION('gmds_teams_webhook_integration')
);
Microsoft Teamsへ通知が瞬時に届きました!
基本の仕組みが整ったので、ここから実践的なユースケースに進みましょう。
アラート例: ウェアハウスの利用量が急増したら通知する
利用量の急増を検知するSQLを作成し、サーバーレスタスクにまとめる
以下のクエリは、直近の利用量(直前の完了1時間)と、各ウェアハウスの過去1か月の時間あたり平均利用量を比較するものです。今回は利用量が50%以上増えたウェアハウスをフラグ付けします。
サーバーレスタスクを使うとコストを抑えられます。ウェアハウス名を指定しないだけで、タスクをサーバーレス化できます。
動作確認のため、実行のたびに必ず1行が生成されアラートが発火するよう、ダミーレコードをUNIONで追加しています。
CREATE OR REPLACE TASK monitor_warehouse_spikes
SCHEDULE = 'USING CRON 2 * * * * America/New_York'
SERVERLESS_TASK_MIN_STATEMENT_SIZE = 'XSMALL'
SERVERLESS_TASK_MAX_STATEMENT_SIZE = 'XSMALL'
as
insert into usage_spike_alerts (warehouse_name,last_hour_credits,avg_monthly_credits,credit_diff,percent_increase)
WITH last_hour_usage AS (
SELECT
warehouse_name,
sum(credits_used) AS last_hour_credits
FROM
snowflake.account_usage.warehouse_metering_history
WHERE
start_time >= DATEADD(hour, -2, CURRENT_TIMESTAMP)
AND end_time <= CURRENT_TIMESTAMP
コードを展開する
SCHEDULE = 'USING CRON 2 * * * * America/New_York' は、毎時2分に実行するという意味です。
クエリ結果を格納するテーブルを作成する
CREATE or replace TABLE usage_spike_alerts (
alert_id INT AUTOINCREMENT PRIMARY KEY,
warehouse_name STRING NOT NULL,
last_hour_credits FLOAT NOT NULL,
avg_monthly_credits FLOAT NOT NULL,
credit_diff FLOAT NOT NULL,
percent_increase FLOAT NOT NULL,
inserted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
alert_sent boolean default false
);
このテーブルには、クエリ結果には含まれないデフォルト値付きの補助カラムをいくつか用意しています。
- alert_id: 主キー
- inserted_at: タスクがレコードを挿入した時刻を把握するためのカラム
- alert_sent: 初期値はfalse。アラート送信後にtrueへ更新されます
タスクを実行し、結果を確認する
execute task monitor_warehouse_spikes;
select * from usage_spike_alerts where not alert_sent;
急増があった場合にアラートを送るプロシージャを作成する
ここでは、次の処理を実装します。
usage_spike_alertsテーブルから未送信レコード(alert_sent==false)を取得する- 未送信レコードがあればMicrosoft Teamsへ送信する
- 送信したレコードに送信済みフラグを立てる
- 送信したアラート件数をSQLコンソールに出力する
CREATE OR REPLACE PROCEDURE send_usage_spike_alerts()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'send_alerts'
AS $$
import snowflake.snowpark as snowpark
def send_alerts(session):
query = """
SELECT warehouse_name, last_hour_credits, avg_monthly_credits, credit_diff, percent_increase
FROM usage_spike_alerts
WHERE alert_sent = FALSE
"""
コードを展開する
プロシージャを手動でテストする
「spikes」テーブルにはすでにテスト行が入っています。プロシージャを実行して、次の点を確認しましょう。
- レコードが
alert_sent==trueに更新されること - Microsoft Teamsにメッセージが届くこと
execute task monitor_warehouse_spikes; -- if you haven't already...
select * from usage_spike_alerts where not alert_sent; -- review it, sent = false
CALL send_usage_spike_alerts(); -- send the alert
-- wait for the alert to come
select * from usage_spike_alerts where not alert_sent; -- 0 rows
Teamsにアラートが届いているはずです!
アラートの見た目を整えるのは少し骨が折れる作業です。Pythonコードに手を加えて、より見やすいフォーマットで送信できたらぜひ教えてください!
タスクとプロシージャをチェーンする
monitor_warehouse_spikesタスクの毎時スケジュールで一連の処理を完結させたいので、このタスクの実行後にアラート送信プロシージャも続けて走るようにします。
まずはプロシージャをサーバーレスタスクでラップし、有効化します。
CREATE TASK send_usage_spike_alerts_task
AS
CALL send_usage_spike_alerts();
alter task send_usage_spike_alerts_task resume;
続いて、タスク同士をチェーンします。
ALTER TASK send_usage_spike_alerts_task
ADD AFTER monitor_warehouse_spikes;
テストしてみましょう。
1execute task monitor_warehouse_spikes;
これで、新しいレコードがテーブルに追加され、Teamsへアラートが届くはずです!
send_usage_spike_alerts()プロシージャを直接呼び出した場合は、Microsoft Teamsにメッセージが即座に表示されます。一方、send_usage_spike_alerts_taskをmonitor_warehouse_spikesタスクにチェーンした場合は、アラートが届くまでに最大3分ほどかかります。
まとめ
SnowflakeのアラートをMicrosoft Teamsへ送れるようになると、活用の幅が一気に広がります。ユースケースは挙げればきりがありません!
本記事を通じて、次のノウハウを身につけていただけたはずです。
- Microsoft TeamsでのWebhook作成
- Snowflakeでのシークレット作成
- SnowflakeでのWebhook連携の作成
- ウェアハウスの利用量急増を記録するタスクの作成
- 条件(今回はクエリ結果の行の有無)に応じてアラートを送るプロシージャの作成
- プロシージャをタスクでラップする方法
- タスク同士をチェーンする方法
皆さんが思いついたユースケースもぜひ聞かせてください! 🥂
Jeffはデータ・アナリティクス領域のコンサルタントで、インサイトの自動化やデータを活用した業務プロセスの制御に15年以上携わってきました。技術面ではSnowflake + dbt + Tableauを得意とし、業務領域では公益事業、臨床試験、出版、CPG、製造業での経験があります。お気軽にご連絡ください: [email protected]。