Introduzione
I Task di Snowflake sono una funzionalità che permette di automatizzare le operazioni sul database e di costruire pipeline di dati complete e automatizzate. I Task possono essere utilizzati per eseguire semplici comandi SQL e possono essere pianificati per partire a un orario prestabilito. È possibile collegarli tra loro per creare una pipeline di dati articolata, rappresentata come un DAG (Directed Acyclic Graph) con un unico root task. Combinati con funzionalità come Snowpipe, consentono di realizzare pipeline end-to-end con caricamento e trasformazione dei dati.
Qualsiasi pipeline di dati va monitorata, in modo da poter intervenire tempestivamente quando il processo inizia a generare errori.
Nel nostro precedente articolo su notifiche e alert di Snowflake abbiamo mostrato come sfruttare le funzionalità native di notifica e alerting di Snowflake per impostare controlli di validazione e ricevere notifiche via email basate sul risultato di una query SQL. I principali limiti di questo approccio sono l'impegno necessario per creare le notifiche e il fatto che non si viene avvisati immediatamente al verificarsi dell'errore, a meno di eseguire frequentemente la query SQL per verificare l'output, con i relativi costi di compute!
Per fortuna esiste una nuova funzionalità di Snowflake, "Error Notifications for Tasks", che consente di avvisare automaticamente e in tempo reale gli utenti in caso di failure di un Task di Snowflake o di Snowpipe. In questo articolo vedremo come creare una notifica di errore per Tasks / Snowpipe in Snowflake e collegarla a un canale Slack.
Architettura della notifica di errore
Iniziamo disegnando l'architettura complessiva di questa integrazione, per poi analizzare ogni passaggio nel dettaglio.

Supponiamo di avere un DAG composto da più task. Si può assegnare una notification integration al root task del DAG: in questo modo, se uno qualsiasi dei task fallisce, la notifica di errore lo intercetta e invia un messaggio a Simple Notification Service (SNS). Lato AWS abbiamo una funzione Lambda attivata dai messaggi SNS in ingresso. La Lambda elabora il messaggio e lo inoltra al canale Slack tramite l'API di Slack. Il team dati monitora il canale Slack e può intervenire di conseguenza.
Passo 1: creare il topic SNS su AWS
Per i passaggi descritti in questo articolo serviranno il supporto degli amministratori AWS oppure i privilegi necessari lato AWS per creare e gestire diversi servizi. Per prima cosa occorre creare un topic SNS in AWS. Per le istruzioni dettagliate è possibile fare riferimento alla documentazione di AWS.
Passo 2: creare una policy IAM su AWS
Il passaggio successivo consiste nel creare una nuova policy IAM che conceda i permessi di pubblicazione sul topic SNS, definendo l'azione sns:publish sul topic. Ecco i passaggi:
- Accedere a Identity & Access Management (IAM) nella console AWS.
- Selezionare Account settings dal pannello di navigazione a sinistra.
- Espandere la Security Token Service Regions list, individuare la regione AWS corrispondente a quella in cui si trova l'account e scegliere Activate se lo stato è Inactive.
- Selezionare Policies dal pannello di navigazione a sinistra.
- Fare clic su Create Policy.
- Aprire la scheda JSON.
- Aggiungere un documento di policy che definisca le azioni eseguibili sul topic SNS.
1{
2 "Version": "2012-10-17",
3 "Statement": [\
\
4 {\
\
5 "Effect": "Allow",\
\
6 "Action": [\
\
7 "sns:Publish"\
\
8 ],\
\
9 "Resource": "<sns_topic_arn> created in previous step"\
\
10 }\
\
11 ]
12 }
Passo 3: creare un ruolo IAM su AWS
Come ultimo passaggio lato AWS occorre creare un ruolo IAM. Se si dispone già di un ruolo utilizzato per l'interazione tra account Snowflake e AWS, è naturalmente possibile riutilizzarlo aggiungendo la policy creata al passaggio precedente. Per creare un nuovo ruolo IAM, ecco i passaggi:
- Accedere a Identity & Access Management (IAM) in AWS.
- Selezionare Roles dal pannello di navigazione a sinistra.
- Fare clic sul pulsante Create role.
- Selezionare Another AWS account come tipo di entità attendibile.
- Nel campo Account ID, inserire temporaneamente il proprio AWS account ID.
- Attivare l'opzione Require external ID, che consente di concedere a una terza parte (in questo caso Snowflake) le autorizzazioni sulle risorse del proprio account Amazon (in questo caso SNS).
- Per il momento inserire un ID fittizio, ad esempio 0000. In seguito si modificherà la trust relationship sostituendo l'ID fittizio con l'external ID dell'utente IAM di Snowflake generato per il proprio account. Una condizione nella trust policy del ruolo IAM consentirà agli utenti Snowflake di assumere il ruolo tramite l'oggetto notification integration che verrà creato successivamente.
- Fare clic sul pulsante Next.
- Individuare la policy creata e selezionarla.
- Fare clic sul pulsante Next.
- Inserire un nome e una descrizione per il ruolo e fare clic sul pulsante Create role.
- Annotare il valore del Role ARN riportato nella pagina di riepilogo del ruolo: servirà in uno o più passaggi successivi.
Passo 4: creare la Notification Integration di Snowflake
A questo punto bisogna creare in Snowflake un oggetto notification integration, che verrà poi assegnato al root task del DAG di esempio illustrato sopra.
Per creare la notification integration servono l'ARN AWS del topic SNS e il ruolo IAM creati in precedenza. Ecco un esempio di codice:
CREATE NOTIFICATION INTEGRATION my_notif_integration
ENABLED = true
TYPE = QUEUE
NOTIFICATION_PROVIDER = AWS_SNS
DIRECTION = OUTBOUND
AWS_SNS_TOPIC_ARN = 'your SNS topic'
AWS_SNS_ROLE_ARN = 'your role;
Passo 5: concedere a Snowflake l'accesso al topic SNS
Ora bisogna concedere a Snowflake l'accesso al topic SNS. Innanzitutto, eseguire DESC NOTIFICATION INTEGRATION my_notif_integration. Dall'output occorre annotare i due valori seguenti:
- SF_AWS_IAM_USER_ARN — ARN dell'utente IAM Snowflake creato per il proprio account.
- SF_AWS_EXTERNAL_ID — External ID dell'utente IAM Snowflake creato per il proprio account.
L'ultimo passaggio della configurazione consiste nel modificare la trust relationship del ruolo IAM. Tornare alla console AWS, individuare il ruolo IAM e aprire la scheda Trust relationship. Fare clic su Edit trust relationship e aggiornare il documento di policy con i valori ottenuti dal describe della notification integration.
1{
2 "Version": "2012-10-17",
3 "Statement": [\
\
4 {\
\
5 "Sid": "",\
\
6 "Effect": "Allow",\
\
7 "Principal": {\
\
8 "AWS": "<sf_aws_iam_user_arn value from above>"\
\
9 },\
\
10 "Action": "sts:AssumeRole",\
\
11 "Condition": {\
\
12 "StringEquals": {\
\
13 "sts:ExternalId": "<sf_aws_external_id from above>"\
\
14 }\
\
15 }\
```\
\
Expand Code\
\
L'integrazione tra Snowflake e AWS è ora completa!\
\
Passo 6: aggiornare il Task di Snowflake\
Non resta che assegnare la notification integration al root task del DAG. Se il task non è ancora stato creato, l'assegnazione può essere fatta direttamente all'interno dello statement CREATE TASK. Nel mio caso il root task era già presente, quindi userò il comando ALTER TASK:ALTER TASK <name> SET ERROR_INTEGRATION = <integration_name>;
\
Passo 7: inviare alert su Slack a partire da SNS\
A questo punto bisogna elaborare i messaggi SNS in ingresso e inoltrarli a Slack. Creiamo una funzione Lambda dedicata, che sfrutta l'API di Slack per inviare i messaggi. Useremo il nostro topic SNS come trigger della Lambda, che verrà attivata ogni volta che arriva un nuovo messaggio SNS.
Ecco un esempio di stringa JSON inviata da Snowflake:
\
1{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"s3://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}\
```\
\
E la versione formattata:\
\
```json\
1{\
\
2 "version": "1.0",\
\
3 "messageId": "a62e34bc-6141-4e95-92d8-f04fe43b43f5",\
\
4 "messageType": "INGEST_FAILED_FILE",\
\
5 "timestamp": "2021-10-22T19:15:29.471Z",\
\
6 "accountName": "MYACCOUNT",\
\
7 "pipeName": "MYDB.MYSCHEMA.MYPIPE",\
\
8 "tableName": "MYDB.MYSCHEMA.MYTABLE",\
\
9 "stageLocation": "s3://mybucket/mypath",\
\
10 "messages": [\
\
11 {\
\
12 "fileName": "/file1.csv_0_0_0.csv.gz",\
\
13 "firstError": "Numeric value 'abc' is not recognized"\
\
14 }\
\
15 ]\
```\
\
Expand Code\
\
È possibile scrivere una logica personalizzata per elaborare il payload: usare solo gli attributi rilevanti, integrare informazioni aggiuntive, modificare la formattazione e così via. A scopo dimostrativo, prendiamo il messaggio in ingresso così com'è e inoltriamolo a Slack.\
\
Per prima cosa, ne miglioriamo un po' la leggibilità aggiungendo l'indentazione.\
\
````python\
1def format_slack_message(message):\
\
2 json_message = {\
\
3 "blocks": [\
\
4 {\
\
5 "type": "section",\
\
6 "text": {\
\
7 "text": ":red_circle: *Snowflake pipeline failure:* Snowflake notification integration output :point_down: ",\
\
8 "type": "mrkdwn"\
\
9 }\
\
10 },\
\
11 {\
\
12 "type": "section",\
\
13 "text": {\
\
14 "text": "```" + json.dumps(message,indent=2, separators=(',', ': ')) + "```",\
\
15 "type": "mrkdwn"\
````\
\
Expand Code\
\
A questo punto ci connettiamo a Slack e inviamo il messaggio:\
\
```python\
1def lambda_handler(event, context):\
\
2 if event:\
\
3 message = format_slack_message(event)\
\
4\
\
5 if message:\
\
6 logging.info('Starting sending message to slack')\
\
7 response = requests.post(\
\
8 my_webhook_uri, data=json.dumps(message),\
\
9 headers={'Content-Type': 'application/json'}\
\
10 )\
\
11 logging.info(response.text)\
\
12 logging.info('Finished sending message to Slack webhook')\
\
13 if response.status_code != 200:\
\
14 raise ValueError(\
\
15 'Request to slack returned an error %s, the response is:\n%s'\
```\
\
Expand Code\
\
\
\
Ecco fatto! Abbiamo una pipeline di notifica errori completa tra i Task di Snowflake e Slack, costruita con una Snowflake Notification integration e pochi servizi AWS.\
\
Notifiche di errore per Snowpipe\
La stessa identica configurazione consente di ricevere automaticamente le notifiche quando un Snowpipe fallisce: è sufficiente impostare il parametro error_integration in fase di creazione o di modifica dell'oggetto pipe!
\
1CREATE PIPE mypipe\
\
2 ERROR_INTEGRATION = '<integration_name>'\
\
3 AS\
\
4 COPY INTO mytable FROM @mystage\
\
5;\
```\
\
\
Tomáš Sobotík·Senior Data Engineer & Snowflake SME presso Norlys\
\
Tomas è un Snowflake Data SuperHero di lunga data e un riferimento generale su Snowflake. La sua esperienza nel mondo dei dati copre oltre un decennio, durante il quale ha ricoperto i ruoli di Snowflake data engineer, architect e admin in numerosi progetti, in settori e tecnologie eterogenei. Tomas è un membro attivo della community, dove condivide la propria competenza e ispira gli altri. È inoltre istruttore O'Reilly e tiene sessioni di formazione live online.\