Introduction
Les Snowflake Tasks sont une fonctionnalité qui permet d'automatiser des opérations liées à la base de données et de bâtir des pipelines de données complets et automatisés. Les Tasks peuvent déclencher de simples commandes SQL et être planifiées pour s'exécuter à un moment précis. Vous pouvez les enchaîner pour créer un pipeline de données complexe représenté sous forme de DAG (Directed Acyclic Graph) avec une unique tâche racine. Combinées à des fonctionnalités comme Snowpipe, elles permettent de créer des pipelines de bout en bout assurant chargement et transformation des données.
Comme tout pipeline de données, il doit être surveillé afin de pouvoir réagir si le processus se met à générer des erreurs.
Dans notre précédent article sur les notifications et alertes Snowflake, nous avons montré comment exploiter les fonctionnalités natives de Snowflake pour mettre en place des contrôles de validation et recevoir des notifications par e-mail à partir du résultat d'une requête SQL. Les principaux inconvénients de cette approche tiennent à l'effort nécessaire pour créer ces notifications et au fait que vous n'êtes pas averti immédiatement en cas d'erreur, à moins d'exécuter régulièrement la requête SQL pour en vérifier le résultat — avec les coûts de calcul qui en découlent !
Heureusement, une nouvelle fonctionnalité Snowflake baptisée Error Notifications for Tasks permet d'alerter automatiquement et immédiatement les utilisateurs lors de l'échec d'une Snowflake Task ou d'un Snowpipe. Dans cet article, nous verrons comment créer une notification d'erreur pour les Tasks / Snowpipe dans Snowflake et la connecter à un canal Slack.
Architecture de la notification d'erreur
Commençons par schématiser l'architecture globale de cette intégration, puis détaillons chaque étape.

Imaginons un DAG composé de plusieurs tâches. Vous pouvez associer une intégration de notification à la tâche racine du DAG. Dès lors, si l'une des tâches du DAG échoue, la notification d'erreur sera déclenchée et enverra un message via Simple Notification Service (SNS). Côté AWS, une fonction Lambda est déclenchée à la réception du message SNS. Elle traite ce message et le transmet à votre canal Slack via l'API Slack. Votre équipe data surveille le canal Slack et peut réagir en conséquence.
Étape 1 : créer le topic AWS SNS
Pour suivre les étapes de cet article, vous aurez besoin de l'aide de votre administrateur AWS ou des privilèges nécessaires pour créer et gérer plusieurs services AWS. Première étape : créer un topic SNS dans AWS. Reportez-vous à la documentation AWS pour des instructions détaillées.
Étape 2 : créer une policy IAM AWS
Vous devez ensuite créer une nouvelle policy IAM qui accorde l'autorisation de publier sur ce topic SNS, en définissant l'action sns:publish sur ce topic. Voici la marche à suivre :
- Accédez à Identity & Access Management (IAM) dans la console AWS :
- Sélectionnez Account settings dans le menu de gauche.
- Déroulez la liste Security Token Service Regions, repérez la région AWS correspondant à celle de votre compte, et cliquez sur Activate si le statut est Inactive.
- Sélectionnez Policies dans le menu de gauche.
- Cliquez sur Create Policy.
- Cliquez sur l'onglet JSON.
- Ajoutez un document de policy qui définit les actions autorisées sur votre topic SNS.
1{
2 "Version": "2012-10-17",
3 "Statement": [\
\
4 {\
\
5 "Effect": "Allow",\
\
6 "Action": [\
\
7 "sns:Publish"\
\
8 ],\
\
9 "Resource": "<sns_topic_arn> created in previous step"\
\
10 }\
\
11 ]
12 }
Étape 3 : créer un rôle IAM AWS
Dernière étape côté AWS : créer un rôle IAM. Si vous disposez déjà d'un rôle dédié à l'interaction entre votre compte Snowflake et AWS, vous pouvez bien sûr le réutiliser et y ajouter la policy créée à l'étape précédente. Sinon, voici comment créer un nouveau rôle IAM :
- Accédez à Identity & Access Management (IAM) sur AWS :
- Sélectionnez Roles dans le menu de gauche.
- Cliquez sur le bouton Create role.
- Choisissez Another AWS account comme type d'entité de confiance.
- Dans le champ Account ID, saisissez temporairement votre propre identifiant de compte AWS.
- Cochez l'option Require external ID. Elle permet d'accorder des autorisations sur vos ressources Amazon (par exemple SNS) à un tiers (par exemple Snowflake).
- Pour l'instant, saisissez un ID fictif tel que 0000. Vous modifierez ensuite la relation de confiance pour remplacer cet ID fictif par l'external ID de l'utilisateur IAM Snowflake généré pour votre compte. Une condition de la trust policy de votre rôle IAM permettra à vos utilisateurs Snowflake d'assumer ce rôle via l'objet d'intégration de notification que vous créerez par la suite.
- Cliquez sur le bouton Next.
- Localisez la policy que vous avez créée et sélectionnez-la.
- Cliquez sur le bouton Next.
- Saisissez un nom et une description pour le rôle, puis cliquez sur le bouton Create role.
- Notez la valeur du Role ARN affichée sur la page récapitulative du rôle. Elle vous servira lors des étapes suivantes.
Étape 4 : créer l'intégration de notification Snowflake
Nous devons à présent créer un objet d'intégration de notification dans Snowflake, qui sera ensuite associé à la tâche racine de notre DAG d'exemple.
Pour créer cette intégration, vous aurez besoin de l'ARN AWS du topic SNS et de celui du rôle IAM créés précédemment. Voici un exemple de code :
CREATE NOTIFICATION INTEGRATION my_notif_integration
ENABLED = true
TYPE = QUEUE
NOTIFICATION_PROVIDER = AWS_SNS
DIRECTION = OUTBOUND
AWS_SNS_TOPIC_ARN = 'your SNS topic'
AWS_SNS_ROLE_ARN = 'your role;
Étape 5 : accorder à Snowflake l'accès au topic SNS
Il faut maintenant donner à Snowflake l'accès au topic SNS. Commencez par exécuter DESC NOTIFICATION INTEGRATION my_notif_integration. Relevez les deux valeurs suivantes dans la sortie :
- SF_AWS_IAM_USER_ARN — ARN de l'utilisateur IAM Snowflake créé pour votre compte.
- SF_AWS_EXTERNAL_ID — External ID de l'utilisateur IAM Snowflake créé pour votre compte.
Dernière étape de la configuration : modifier la relation de confiance de notre rôle IAM. Retournez dans la console AWS, retrouvez votre rôle IAM et ouvrez l'onglet Trust relationship. Cliquez sur le bouton Edit trust relationship et mettez à jour le document de policy avec les valeurs récupérées via la description de votre intégration de notification.
1{
2 "Version": "2012-10-17",
3 "Statement": [\
\
4 {\
\
5 "Sid": "",\
\
6 "Effect": "Allow",\
\
7 "Principal": {\
\
8 "AWS": "<sf_aws_iam_user_arn value from above>"\
\
9 },\
\
10 "Action": "sts:AssumeRole",\
\
11 "Condition": {\
\
12 "StringEquals": {\
\
13 "sts:ExternalId": "<sf_aws_external_id from above>"\
\
14 }\
\
15 }\
```\
\
Expand Code\
\
L'intégration entre Snowflake et AWS est désormais opérationnelle !\
\
Étape 6 : mettre à jour la Snowflake Task\
Il ne reste plus qu'à associer cette intégration de notification à la tâche racine de notre DAG. Si vous n'avez pas encore créé la tâche, l'association peut se faire directement dans l'instruction CREATE TASK. Dans mon cas, la tâche racine existait déjà ; je vais donc utiliser la commande ALTER TASK :ALTER TASK <name> SET ERROR_INTEGRATION = <integration_name>;
\
Étape 7 : déclencher des alertes Slack depuis SNS\
Il faut désormais traiter les messages SNS entrants et les transmettre à Slack. Créons pour cela une fonction Lambda qui s'appuie sur l'API Slack pour l'envoi des messages. Nous utiliserons notre topic SNS comme déclencheur de la fonction Lambda, qui sera ainsi exécutée à chaque arrivée d'un nouveau message SNS.
Voici un exemple de chaîne JSON que Snowflake enverra :
\
1{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"s3://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}\
```\
\
Et la version formatée :\
\
```json\
1{\
\
2 "version": "1.0",\
\
3 "messageId": "a62e34bc-6141-4e95-92d8-f04fe43b43f5",\
\
4 "messageType": "INGEST_FAILED_FILE",\
\
5 "timestamp": "2021-10-22T19:15:29.471Z",\
\
6 "accountName": "MYACCOUNT",\
\
7 "pipeName": "MYDB.MYSCHEMA.MYPIPE",\
\
8 "tableName": "MYDB.MYSCHEMA.MYTABLE",\
\
9 "stageLocation": "s3://mybucket/mypath",\
\
10 "messages": [\
\
11 {\
\
12 "fileName": "/file1.csv_0_0_0.csv.gz",\
\
13 "firstError": "Numeric value 'abc' is not recognized"\
\
14 }\
\
15 ]\
```\
\
Expand Code\
\
Libre à vous d'écrire votre propre logique pour traiter le payload : ne conserver que les attributs utiles, enrichir le message avec d'autres informations, modifier le formatage, etc. À titre de démonstration, prenons le message entrant tel quel et envoyons-le à Slack.\
\
Commençons par mettre légèrement en forme le message en ajoutant de l'indentation pour en faciliter la lecture.\
\
````python\
1def format_slack_message(message):\
\
2 json_message = {\
\
3 "blocks": [\
\
4 {\
\
5 "type": "section",\
\
6 "text": {\
\
7 "text": ":red_circle: *Snowflake pipeline failure:* Snowflake notification integration output :point_down: ",\
\
8 "type": "mrkdwn"\
\
9 }\
\
10 },\
\
11 {\
\
12 "type": "section",\
\
13 "text": {\
\
14 "text": "```" + json.dumps(message,indent=2, separators=(',', ': ')) + "```",\
\
15 "type": "mrkdwn"\
````\
\
Expand Code\
\
Il ne reste plus qu'à se connecter à Slack et à envoyer le message :\
\
```python\
1def lambda_handler(event, context):\
\
2 if event:\
\
3 message = format_slack_message(event)\
\
4\
\
5 if message:\
\
6 logging.info('Starting sending message to slack')\
\
7 response = requests.post(\
\
8 my_webhook_uri, data=json.dumps(message),\
\
9 headers={'Content-Type': 'application/json'}\
\
10 )\
\
11 logging.info(response.text)\
\
12 logging.info('Finished sending message to Slack webhook')\
\
13 if response.status_code != 200:\
\
14 raise ValueError(\
\
15 'Request to slack returned an error %s, the response is:\n%s'\
```\
\
Expand Code\
\
\
\
Et voilà ! Vous disposez d'un pipeline complet de notifications d'erreur reliant les Snowflake Tasks à Slack, grâce à une intégration de notification Snowflake et à quelques services AWS.\
\
Notifications d'erreur pour Snowpipe\
La même configuration permet de recevoir automatiquement des notifications en cas d'échec d'un Snowpipe : il suffit de renseigner le paramètre error_integration lors de la création ou de la modification de votre objet pipe !
\
1CREATE PIPE mypipe\
\
2 ERROR_INTEGRATION = '<integration_name>'\
\
3 AS\
\
4 COPY INTO mytable FROM @mystage\
\
5;\
```\
\
\
Tomáš Sobotík·Senior Data Engineer & Snowflake SME chez Norlys\
\
Tomas est un Snowflake Data SuperHero de longue date et un expert reconnu de Snowflake. Fort de plus d'une décennie d'expérience dans l'univers de la data, il a occupé les rôles de Snowflake data engineer, architecte et administrateur sur des projets variés, dans de nombreux secteurs et avec différentes technologies. Tomas est un membre actif de la communauté, où il partage son expertise et inspire les autres. Il est également formateur chez O'Reilly, où il anime des sessions de formation en direct en ligne.\