Introducción
Los Snowflake Tasks son una funcionalidad que ayuda a automatizar operaciones sobre la base de datos y sirven para construir data pipelines completos y automatizados. Con los tasks puedes ejecutar comandos SQL simples y programarlos para que corran a una hora determinada. También puedes encadenar varios tasks y armar un data pipeline complejo representado como un DAG (Directed Acyclic Graph) con un único root task. Combinados con funcionalidades como Snowpipe, permiten crear data pipelines de extremo a extremo con carga y transformación de datos.
En cualquier data pipeline es indispensable monitorear el proceso y reaccionar a tiempo si empieza a fallar en algún punto.
En nuestro post anterior sobre notificaciones y alertas en Snowflake, mostramos cómo aprovechar las funcionalidades nativas de Snowflake para configurar validaciones y recibir notificaciones por correo según el resultado de una consulta SQL. Las principales desventajas de ese enfoque son el esfuerzo que requiere crear esas notificaciones y que no te enteras del error de inmediato, salvo que ejecutes la consulta SQL con mucha frecuencia para revisar el resultado, ¡asumiendo además los costos de cómputo asociados!
Por suerte, ya existe una nueva funcionalidad de Snowflake llamada "Error Notifications for Tasks" que permite alertar de forma automática e inmediata cuando falla un Snowflake Task o un Snowpipe. En este post veremos cómo crear una notificación de error para Tasks / Snowpipe en Snowflake y conectarla a un canal de Slack.
Arquitectura de la notificación de errores
Primero, veamos la arquitectura general de la integración y luego repasamos cada paso en detalle.

Supongamos que tienes un DAG con varios tasks. Puedes asignar una integración de notificaciones al root task del DAG. Así, si cualquier task del DAG falla, la notificación de error lo detecta y envía un mensaje a Simple Notification Service (SNS). Del lado de AWS tenemos una función Lambda que se dispara cada vez que llega un mensaje SNS. La Lambda procesa ese mensaje y lo envía a tu canal de Slack mediante la Slack API. Tu equipo de datos monitorea el canal de Slack y reacciona según corresponda.
Paso 1: Crear el topic de SNS en AWS
Para seguir los pasos de este post vas a necesitar la ayuda de tu administrador de AWS o contar con los permisos adecuados para crear y administrar varios servicios de AWS. Lo primero es crear un topic de SNS en AWS. Puedes seguir la documentación de AWS para ver las instrucciones detalladas.
Paso 2: Crear una política de IAM en AWS
Después tienes que crear una nueva política de IAM que otorgue permisos para publicar en ese topic de SNS. Para eso hay que definir la acción sns:publish sobre el topic. Estos son los pasos:
- Entra a Identity & Access Management (IAM) en la consola de AWS:
- Elige Account settings en el panel de navegación de la izquierda.
- Despliega la lista Security Token Service Regions, ubica la región de AWS donde está tu cuenta y elige Activate si el estado es Inactive.
- Elige Policies en el panel de navegación de la izquierda.
- Haz clic en Create Policy.
- Haz clic en la pestaña JSON.
- Agrega un documento de política que defina las acciones permitidas sobre tu topic de SNS.
1{
2 "Version": "2012-10-17",
3 "Statement": [\
\
4 {\
\
5 "Effect": "Allow",\
\
6 "Action": [\
\
7 "sns:Publish"\
\
8 ],\
\
9 "Resource": "<sns_topic_arn> created in previous step"\
\
10 }\
\
11 ]
12 }
Paso 3: Crear un rol de IAM en AWS
Como último paso del lado de AWS hay que crear un rol de IAM. Si ya cuentas con un rol que usas para la interacción entre tu cuenta de Snowflake y AWS, claro que puedes reutilizarlo y simplemente sumarle la política creada en el paso anterior. Si necesitas crear un rol de IAM nuevo, estos son los pasos:
- Ve a Identity & Access Management (IAM) en AWS:
- Elige Roles en el panel de navegación de la izquierda.
- Haz clic en el botón Create role.
- Selecciona Another AWS account como tipo de entidad de confianza.
- En el campo Account ID, ingresa temporalmente tu propio ID de cuenta de AWS.
- Marca la opción Require external ID. Esta opción te permite otorgar permisos sobre los recursos de tu cuenta de Amazon (por ejemplo, SNS) a un tercero (en este caso, Snowflake).
- Por ahora, ingresa un ID ficticio, por ejemplo 0000. Más adelante vas a modificar la trust relationship y reemplazar ese ID ficticio por el external ID del usuario de IAM de Snowflake generado para tu cuenta. Una condición en la política de confianza del rol permitirá que tus usuarios de Snowflake asuman el rol mediante el objeto de integración de notificaciones que crearás después.
- Haz clic en el botón Next.
- Ubica la política que creaste y selecciónala.
- Haz clic en el botón Next.
- Asígnale un nombre y una descripción al rol y haz clic en el botón Create role.
- Anota el valor del Role ARN que aparece en la página de resumen del rol. Lo vas a necesitar en uno o más de los pasos siguientes.
Paso 4: Crear la integración de notificaciones en Snowflake
Ahora hay que crear un objeto de integración de notificaciones en Snowflake, que más adelante se asignará al root task del DAG de ejemplo de arriba.
Al crear la integración de notificaciones vas a necesitar el ARN del topic de SNS y el del rol de IAM que creaste antes. Aquí tienes un código de ejemplo:
CREATE NOTIFICATION INTEGRATION my_notif_integration
ENABLED = true
TYPE = QUEUE
NOTIFICATION_PROVIDER = AWS_SNS
DIRECTION = OUTBOUND
AWS_SNS_TOPIC_ARN = 'your SNS topic'
AWS_SNS_ROLE_ARN = 'your role;
Paso 5: Darle acceso a Snowflake al topic de SNS
Ahora hay que darle acceso a Snowflake al topic de SNS. Primero, ejecuta DESC NOTIFICATION INTEGRATION my_notif_integration. De la salida tenemos que anotar estos dos valores:
- SF_AWS_IAM_USER_ARN — ARN del usuario de IAM de Snowflake creado para tu cuenta.
- SF_AWS_EXTERNAL_ID — External ID del usuario de IAM de Snowflake creado para tu cuenta.
El paso final de la configuración consiste en modificar la trust relationship del rol de IAM. Vuelve a la consola de AWS, busca tu rol de IAM y haz clic en la pestaña Trust relationship. Haz clic en el botón Edit trust relationship y actualiza el documento de política con los valores que obtuviste al describir tu integración de notificaciones.
1{
2 "Version": "2012-10-17",
3 "Statement": [\
\
4 {\
\
5 "Sid": "",\
\
6 "Effect": "Allow",\
\
7 "Principal": {\
\
8 "AWS": "<sf_aws_iam_user_arn value from above>"\
\
9 },\
\
10 "Action": "sts:AssumeRole",\
\
11 "Condition": {\
\
12 "StringEquals": {\
\
13 "sts:ExternalId": "<sf_aws_external_id from above>"\
\
14 }\
\
15 }\
```\
\
Expand Code\
\
¡Listo, la integración entre Snowflake y AWS ya está completa!\
\
Paso 6: Actualizar el Snowflake Task\
Ya solo nos queda asignar esta integración de notificaciones al root task del DAG. Si todavía no creaste el task, la asignación se hace directamente como parte de la sentencia CREATE TASK. En mi caso el root task ya estaba creado, así que voy a usar el comando ALTER TASK:ALTER TASK <name> SET ERROR_INTEGRATION = <integration_name>;
\
Paso 7: Disparar alertas en Slack desde SNS\
Ahora hay que procesar los mensajes SNS entrantes y enviarlos a Slack. Para eso vamos a crear una función Lambda que se apoya en la Slack API para mandar los mensajes. Usaremos nuestro topic de SNS como disparador de la Lambda, que se ejecutará cada vez que llegue un nuevo mensaje SNS.
Este es un ejemplo del JSON string que envía Snowflake:
\
1{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"s3://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}\
```\
\
Y la versión formateada:\
\
```json\
1{\
\
2 "version": "1.0",\
\
3 "messageId": "a62e34bc-6141-4e95-92d8-f04fe43b43f5",\
\
4 "messageType": "INGEST_FAILED_FILE",\
\
5 "timestamp": "2021-10-22T19:15:29.471Z",\
\
6 "accountName": "MYACCOUNT",\
\
7 "pipeName": "MYDB.MYSCHEMA.MYPIPE",\
\
8 "tableName": "MYDB.MYSCHEMA.MYTABLE",\
\
9 "stageLocation": "s3://mybucket/mypath",\
\
10 "messages": [\
\
11 {\
\
12 "fileName": "/file1.csv_0_0_0.csv.gz",\
\
13 "firstError": "Numeric value 'abc' is not recognized"\
14 }\
\
15 ]\
```\
\
Expand Code\
\
Puedes escribir tu propia lógica para procesar el payload: usar solo los atributos relevantes, sumar información adicional que no venga incluida, cambiar el formato, etc. A modo de demostración, tomemos el mensaje entrante tal cual y enviémoslo a Slack.\
\
Primero, démosle un formato más legible al mensaje agregando indentación.\
\
````python\
1def format_slack_message(message):\
\
2 json_message = {\
\
3 "blocks": [\
\
4 {\
\
5 "type": "section",\
\
6 "text": {\
\
7 "text": ":red_circle: *Snowflake pipeline failure:* Snowflake notification integration output :point_down: ",\
\
8 "type": "mrkdwn"\
\
9 }\
\
10 },\
\
11 {\
\
12 "type": "section",\
\
13 "text": {\
\
14 "text": "```" + json.dumps(message,indent=2, separators=(',', ': ')) + "```",\
\
15 "type": "mrkdwn"\
````\
\
Expand Code\
\
Y ahora nos conectamos a Slack y enviamos el mensaje:\
\
```python\
1def lambda_handler(event, context):\
\
2 if event:\
\
3 message = format_slack_message(event)\
\
4\
\
5 if message:\
\
6 logging.info('Starting sending message to slack')\
\
7 response = requests.post(\
\
8 my_webhook_uri, data=json.dumps(message),\
\
9 headers={'Content-Type': 'application/json'}\
\
10 )\
\
11 logging.info(response.text)\
\
12 logging.info('Finished sending message to Slack webhook')\
\
13 if response.status_code != 200:\
\
14 raise ValueError(\
\
15 'Request to slack returned an error %s, the response is:\n%s'\
```\
\
Expand Code\
\
\
\
¡Y eso es todo! Con la integración de notificaciones de Snowflake y unos pocos servicios de AWS ya tenemos un pipeline completo de notificaciones de error entre Snowflake Tasks y Slack.\
\
Notificaciones de errores para Snowpipe\
Puedes usar exactamente la misma configuración para recibir notificaciones automáticas cuando un Snowpipe falla. ¡Solo tienes que definir el parámetro error_integration al crear o modificar tu objeto pipe!
\
1CREATE PIPE mypipe\
\
2 ERROR_INTEGRATION = '<integration_name>'\
\
3 AS\
\
4 COPY INTO mytable FROM @mystage\
\
5;\
```\
\
\
Tomáš Sobotík·Senior Data Engineer & Snowflake SME en Norlys\
\
Tomas es un reconocido Snowflake Data SuperHero y referente general en Snowflake. Su trayectoria en el mundo de los datos supera la década, durante la cual se ha desempeñado como data engineer, arquitecto y administrador de Snowflake en proyectos de las más diversas industrias y tecnologías. Tomas es un miembro clave de la comunidad: comparte activamente su experiencia e inspira a otros. Además, es instructor de O'Reilly y dicta sesiones de capacitación en vivo online.\