SELECTSELECT

SELECT

Notificações de erro para Snowflake Tasks

By Tomáš SobotíkSep 26, 20238 min read

Esta página também está disponível em English, Deutsch, Español, Français, Italiano e 日本語.

Introdução

As Snowflake Tasks são um recurso que ajuda a automatizar operações de banco de dados e podem ser usadas para montar pipelines de dados completos e automatizados. As tasks disparam comandos SQL simples e podem ser agendadas para rodar em horários específicos. Dá para encadear várias tasks e criar um pipeline de dados complexo no formato de um DAG (Directed Acyclic Graph) com uma única root task. Combinadas com recursos como o Snowpipe, viabilizam pipelines ponta a ponta com carga e transformação de dados.

Em qualquer pipeline de dados, é fundamental monitorar a execução e reagir rápido caso o processo comece a falhar em algum ponto.

No nosso post anterior sobre notificações e alertas do Snowflake, mostramos como usar os recursos nativos de notificação e alerta do Snowflake para configurar validações e receber notificações por e-mail com base no resultado de uma consulta SQL. As principais desvantagens dessa abordagem são o esforço para criar essas notificações e o fato de você só ficar sabendo do erro se rodar a consulta SQL com frequência — pagando o custo de compute envolvido nisso!

Felizmente, o Snowflake lançou um recurso chamado "Error Notifications for Tasks", que avisa automaticamente e na hora sobre falhas em uma Snowflake Task ou no Snowpipe. Neste post, vamos ver como criar uma notificação de erro para Tasks / Snowpipe no Snowflake e conectá-la a um canal do Slack.

Arquitetura da notificação de erro

Primeiro, vamos desenhar a arquitetura geral da integração e, em seguida, detalhar cada etapa.

Snowflake Error Notification Architecture

Imagine que você tem um DAG com várias tasks. Você pode atribuir uma notification integration à root task do DAG. Aí, se qualquer task do DAG falhar, a notificação de erro captura a falha e envia uma mensagem pelo Simple Notification Service (SNS). Do lado da AWS, temos uma função Lambda disparada sempre que chega uma mensagem SNS. A função Lambda processa essa mensagem e a envia para o seu canal do Slack pela Slack API. Seu time de dados acompanha o canal e age conforme necessário.

Etapa 1: criar o tópico SNS na AWS

Para seguir os passos deste post, você vai precisar da ajuda do(s) admin(s) da AWS ou ter os privilégios adequados no lado da AWS para criar e gerenciar diversos serviços. O primeiro passo é criar um tópico SNS na AWS. Veja as instruções detalhadas na documentação da AWS.

Etapa 2: criar uma IAM Policy na AWS

Agora, crie uma nova IAM policy que conceda permissão para publicar no tópico SNS. É preciso definir a ação sns:publish para esse tópico. Veja o passo a passo:

  1. Acesse Identity & Access Management (IAM) no console da AWS:
  2. Escolha Account settings no menu de navegação à esquerda.
  3. Expanda a Security Token Service Regions list, localize a região AWS correspondente à região da sua conta e clique em Activate se o status estiver Inactive.
  4. Escolha Policies no menu de navegação à esquerda.
  5. Clique em Create Policy.
  6. Clique na aba JSON.
  7. Adicione um documento de policy definindo as ações permitidas no seu tópico SNS.
1{

2    "Version": "2012-10-17",

3    "Statement": [\
\
4      {\
\
5         "Effect": "Allow",\
\
6         "Action": [\
\
7             "sns:Publish"\
\
8         ],\
\
9         "Resource": "<sns_topic_arn> created in previous step"\
\
10      }\
\
11    ]

12 }

Etapa 3: criar uma IAM Role na AWS

Como última etapa do lado da AWS, precisamos criar uma IAM role. Se você já tem uma role para a interação entre a conta Snowflake e a AWS, claro, pode reaproveitá-la — basta anexar a policy criada na etapa anterior. Para criar uma nova IAM role, siga estes passos:

  1. Acesse Identity & Access Management (IAM) na AWS:
  2. Escolha Roles no menu de navegação à esquerda.
  3. Clique no botão Create role.
  4. Selecione Another AWS account como tipo de entidade confiável.
  5. No campo Account ID, informe temporariamente o ID da sua própria conta AWS.
  6. Marque a opção Require external ID. Ela permite conceder permissões nos recursos da sua conta Amazon (no caso, SNS) a terceiros (no caso, o Snowflake).
  7. Por enquanto, informe um ID fictício, como 0000. Mais adiante, você vai ajustar o trust relationship e substituir esse ID fictício pelo external ID do usuário IAM do Snowflake gerado para sua conta. Uma condição na trust policy da sua IAM role permite que seus usuários do Snowflake assumam a role pelo objeto de notification integration que você vai criar depois.
  8. Clique no botão Next
  9. Localize a policy que você criou e selecione-a.
  10. Clique no botão Next.
  11. Defina um nome e uma descrição para a role e clique no botão Create role.
  12. Anote o valor do Role ARN exibido na página de resumo da role. Você vai usá-lo em uma ou mais etapas seguintes.

Etapa 4: criar a Notification Integration no Snowflake

Agora vamos criar um objeto de notification integration no Snowflake, que mais para a frente será atribuído à root task do DAG de exemplo acima.

Ao criar a notification integration, você vai precisar do ARN AWS do tópico SNS e da IAM role criada anteriormente. Veja um exemplo de código:

CREATE NOTIFICATION INTEGRATION my_notif_integration
  ENABLED = true
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AWS_SNS
  DIRECTION = OUTBOUND
  AWS_SNS_TOPIC_ARN = 'your SNS topic'
  AWS_SNS_ROLE_ARN = 'your role;

Etapa 5: dar ao Snowflake acesso ao tópico SNS

Agora precisamos liberar o acesso do Snowflake ao tópico SNS. Primeiro, execute DESC NOTIFICATION INTEGRATION my_notif_integration. Anote estes dois valores do retorno:

  • SF_AWS_IAM_USER_ARN — ARN do usuário IAM do Snowflake criado para sua conta.
  • SF_AWS_EXTERNAL_ID — External ID do usuário IAM do Snowflake criado para sua conta.

A etapa final da configuração é ajustar o trust relationship da IAM role. Volte ao console da AWS, encontre sua IAM role e clique na aba Trust relationship. Clique no botão Edit trust relationship e atualize o documento da policy com os valores obtidos ao descrever sua notification integration.

1{

2  "Version": "2012-10-17",

3  "Statement": [\
\
4    {\
\
5      "Sid": "",\
\
6      "Effect": "Allow",\
\
7      "Principal": {\
\
8        "AWS": "<sf_aws_iam_user_arn value from above>"\
\
9      },\
\
10      "Action": "sts:AssumeRole",\
\
11      "Condition": {\
\
12        "StringEquals": {\
\
13          "sts:ExternalId": "<sf_aws_external_id from above>"\
\
14        }\
\
15      }\
```\
\
Expand Code\
\
Pronto, a integração entre Snowflake e AWS está concluída!\
\

Etapa 6: atualizar a Snowflake Task\


Agora é só atribuir essa notification integration à root task do DAG. Se você ainda não criou a task, a atribuição pode ser feita direto no comando CREATE TASK. Como a minha root task já existia, vou usar o comando ALTER TASK:

ALTER TASK <name> SET ERROR_INTEGRATION = <integration_name>;
\

Etapa 7: disparar alertas do Slack a partir do SNS\


Agora precisamos processar as mensagens SNS recebidas e mandá-las para o Slack. Vamos criar uma função Lambda para essa tarefa, usando a Slack API para enviar as mensagens. Nosso tópico SNS será o gatilho da Lambda, que será disparada toda vez que uma nova mensagem SNS chegar.

Snowflake Error Notification Architecture with SNS and Lambda

Veja um exemplo de string JSON que o Snowflake vai enviar:
\

1{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"s3://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}\
```\
\
E a versão formatada:\
\
```json\
1{\
\
2   "version": "1.0",\
\
3   "messageId": "a62e34bc-6141-4e95-92d8-f04fe43b43f5",\
\
4   "messageType": "INGEST_FAILED_FILE",\
\
5   "timestamp": "2021-10-22T19:15:29.471Z",\
\
6   "accountName": "MYACCOUNT",\
\
7   "pipeName": "MYDB.MYSCHEMA.MYPIPE",\
\
8   "tableName": "MYDB.MYSCHEMA.MYTABLE",\
\
9   "stageLocation": "s3://mybucket/mypath",\
\
10   "messages": [\
\
11      {\
\
12         "fileName": "/file1.csv_0_0_0.csv.gz",\
\
13         "firstError": "Numeric value 'abc' is not recognized"\
\
14      }\
\
15   ]\
```\
\
Expand Code\
\
Você pode criar sua própria lógica para tratar o payload — usar apenas os atributos relevantes, agregar outras informações que não vieram na mensagem, mudar a formatação e por aí vai. Para fins de demonstração, vamos pegar a mensagem recebida como ela está e enviá-la ao Slack.\
\
Primeiro, vamos formatar a mensagem, adicionando indentação para deixá-la mais legível.\
\
````python\
1def format_slack_message(message):\
\
2    json_message = {\
\
3        "blocks": [\
\
4            {\
\
5            "type": "section",\
\
6            "text": {\
\
7                "text": ":red_circle: *Snowflake pipeline failure:* Snowflake notification integration output :point_down: ",\
\
8                "type": "mrkdwn"\
\
9            }\
\
10            },\
\
11            {\
\
12            "type": "section",\
\
13            "text": {\
\
14                "text": "```" + json.dumps(message,indent=2, separators=(',', ': ')) + "```",\
\
15                "type": "mrkdwn"\
````\
\
Expand Code\
\
Agora é só conectar ao Slack e enviar a mensagem:\
\
```python\
1def lambda_handler(event, context):\
\
2    if event:\
\
3        message = format_slack_message(event)\
\
4\
\
5        if message:\
\
6            logging.info('Starting sending message to slack')\
\
7            response = requests.post(\
\
8                my_webhook_uri, data=json.dumps(message),\
\
9                headers={'Content-Type': 'application/json'}\
\
10            )\
\
11            logging.info(response.text)\
\
12            logging.info('Finished sending message to Slack webhook')\
\
13             if response.status_code != 200:\
\
14                raise ValueError(\
\
15                    'Request to slack returned an error %s, the response is:\n%s'\
```\
\
Expand Code\
\
![Snowflake Error Notification message in Slack](https://select.dev/cdn-cgi/imagedelivery/1zmOcgV1p520E4lLTrYjjg/blog/error-notifications-for-snowflake-tasks/3.png/width=1366,quality=75)\
\
E é isso! Agora temos um pipeline completo de notificações de erro entre Snowflake Tasks e Slack, usando uma Snowflake Notification integration e alguns serviços da AWS.\
\

Notificações de erro para o Snowpipe\


A mesma configuração funciona para receber notificações automáticas quando um Snowpipe falha — é só definir o parâmetro error_integration ao criar ou alterar o objeto pipe!
\

1CREATE PIPE mypipe\
\
2    ERROR_INTEGRATION = '<integration_name>'\
\
3    AS\
\
4    COPY INTO mytable FROM @mystage\
\
5;\
```\
\
\
Tomáš Sobotík·Senior Data Engineer & Snowflake SME na Norlys\
\
Tomas é Snowflake Data SuperHero há muito tempo e referência geral no assunto Snowflake. Sua vasta experiência no mundo de dados soma mais de uma década, atuando como Snowflake data engineer, arquiteto e admin em projetos de setores e tecnologias variados. Tomas é um membro ativo da comunidade, sempre compartilhando conhecimento e inspirando outras pessoas. Também é instrutor da O'Reilly, conduzindo treinamentos online ao vivo.\