SELECTSELECT

SELECT

Fehlerbenachrichtigungen für Snowflake Tasks

By Tomáš SobotíkSep 26, 20238 min read

Diese Seite ist auch in English, Español, Français, Italiano, 日本語 und Português verfügbar.

Einführung

Snowflake Tasks sind ein Feature, mit dem sich datenbankbezogene Vorgänge automatisieren und vollständige, automatisierte Daten-Pipelines aufbauen lassen. Tasks lösen einfache SQL-Befehle aus und können zeitgesteuert ausgeführt werden. Sie lassen sich miteinander verknüpfen und so zu einer komplexen Daten-Pipeline kombinieren, die als DAG (Directed Acyclic Graph) mit einem einzigen Root-Task abgebildet wird. In Kombination mit Features wie Snowpipe entstehen daraus End-to-End-Daten-Pipelines mit Lade- und Transformationsfunktionalität.

Wie bei jeder Daten-Pipeline gilt: Sie muss überwacht werden, damit Sie reagieren können, sobald an irgendeiner Stelle Fehler auftreten.

In unserem vorherigen Beitrag zu Snowflake Notifications & Alerts haben wir gezeigt, wie sich die nativen Benachrichtigungs- und Alerting-Funktionen von Snowflake nutzen lassen, um Validierungsprüfungen einzurichten und E-Mail-Benachrichtigungen auf Basis des Ergebnisses einer SQL-Abfrage zu erhalten. Die wesentlichen Nachteile: Das Einrichten ist aufwendig, und Sie werden erst dann informiert, wenn Sie die SQL-Abfrage in kurzen Abständen ausführen – inklusive der damit verbundenen Compute-Kosten.

Glücklicherweise gibt es ein neues Snowflake-Feature namens "Error Notifications for Tasks", das Sie automatisch und sofort über fehlgeschlagene Snowflake Tasks oder Snowpipes informiert. In diesem Beitrag zeigen wir, wie Sie eine Fehlerbenachrichtigung für Tasks bzw. Snowpipe in Snowflake einrichten und mit einem Slack-Channel verbinden.

Architektur der Fehlerbenachrichtigung

Zuerst skizzieren wir die Gesamtarchitektur dieser Integration und gehen anschließend jeden Schritt im Detail durch.

Snowflake Error Notification Architecture

Angenommen, Sie haben einen DAG aus mehreren Tasks. Dem Root-Task des DAG weisen Sie eine Notification Integration zu. Schlägt nun ein beliebiger Task im DAG fehl, greift die Fehlerbenachrichtigung und schickt eine Nachricht an den Simple Notification Service (SNS). Auf AWS-Seite läuft eine Lambda-Funktion, die durch eingehende SNS-Nachrichten ausgelöst wird. Sie verarbeitet die Nachricht und stellt sie über die Slack-API in Ihren Slack-Channel zu. Ihr Datenteam überwacht den Channel und kann entsprechend reagieren.

Schritt 1: AWS-SNS-Topic anlegen

Für die folgenden Schritte benötigen Sie entweder die Unterstützung Ihrer AWS-Admins oder selbst ausreichende Berechtigungen, um auf AWS-Seite mehrere Services anzulegen und zu verwalten. Legen Sie zunächst ein SNS-Topic in AWS an. Eine ausführliche Anleitung finden Sie in der AWS-Dokumentation.

Schritt 2: AWS-IAM-Policy anlegen

Als Nächstes legen Sie eine neue IAM-Policy an, die das Veröffentlichen in diesem SNS-Topic erlaubt. Dazu definieren Sie die Aktion sns:publish für das SNS-Topic. So gehen Sie vor:

  1. Öffnen Sie Identity & Access Management (IAM) in der AWS-Konsole.
  2. Wählen Sie im linken Navigationsbereich Account settings.
  3. Klappen Sie die Security Token Service Regions list auf, suchen Sie die AWS-Region, in der sich Ihr Konto befindet, und klicken Sie auf Activate, falls der Status Inactive lautet.
  4. Wählen Sie im linken Navigationsbereich Policies.
  5. Klicken Sie auf Create Policy.
  6. Wechseln Sie auf den Tab JSON.
  7. Fügen Sie ein Policy-Dokument ein, das die zulässigen Aktionen auf Ihrem SNS-Topic festlegt.
1{

2    "Version": "2012-10-17",

3    "Statement": [\
\
4      {\
\
5         "Effect": "Allow",\
\
6         "Action": [\
\
7             "sns:Publish"\
\
8         ],\
\
9         "Resource": "<sns_topic_arn> created in previous step"\
\
10      }\
\
11    ]

12 }

Schritt 3: AWS-IAM-Rolle anlegen

Im letzten Schritt auf AWS-Seite legen wir eine IAM-Rolle an. Falls Sie bereits eine Rolle nutzen, die Ihr Snowflake-Konto mit AWS verbindet, können Sie diese selbstverständlich weiterverwenden und einfach um die im vorigen Schritt erstellte Policy ergänzen. Wenn Sie eine neue IAM-Rolle benötigen, gehen Sie wie folgt vor:

  1. Öffnen Sie Identity & Access Management (IAM) in AWS.
  2. Wählen Sie im linken Navigationsbereich Roles.
  3. Klicken Sie auf Create role.
  4. Wählen Sie als Trusted-Entity-Typ Another AWS account.
  5. Tragen Sie im Feld Account ID vorübergehend Ihre eigene AWS-Konto-ID ein.
  6. Aktivieren Sie die Option Require external ID. So können Sie einem Dritten (z. B. Snowflake) Berechtigungen auf Ihre Amazon-Ressourcen (z. B. SNS) erteilen.
  7. Tragen Sie zunächst eine Dummy-ID wie 0000 ein. Später passen Sie die Trust Relationship an und ersetzen die Dummy-ID durch die External ID des für Ihr Konto generierten Snowflake-IAM-Users. Eine Bedingung in der Trust Policy Ihrer IAM-Rolle erlaubt es Ihren Snowflake-Nutzern dann, die Rolle über das später erstellte Notification-Integration-Objekt anzunehmen.
  8. Klicken Sie auf Next.
  9. Suchen Sie die zuvor erstellte Policy und wählen Sie sie aus.
  10. Klicken Sie auf Next.
  11. Vergeben Sie einen Namen sowie eine Beschreibung für die Rolle und klicken Sie auf Create role.
  12. Notieren Sie sich den Wert Role ARN auf der Übersichtsseite der Rolle. Sie benötigen ihn in einem oder mehreren der folgenden Schritte.

Schritt 4: Snowflake Notification Integration anlegen

Als Nächstes legen wir in Snowflake ein Notification-Integration-Objekt an, das später dem Root-Task in unserem Beispiel-DAG zugewiesen wird.

Beim Anlegen der Notification Integration benötigen Sie den AWS-ARN des SNS-Topics sowie der zuvor erstellten IAM-Rolle. Hier ein Codebeispiel:

CREATE NOTIFICATION INTEGRATION my_notif_integration
  ENABLED = true
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AWS_SNS
  DIRECTION = OUTBOUND
  AWS_SNS_TOPIC_ARN = 'your SNS topic'
  AWS_SNS_ROLE_ARN = 'your role;

Schritt 5: Snowflake Zugriff auf das SNS-Topic gewähren

Nun erteilen wir Snowflake Zugriff auf das SNS-Topic. Führen Sie dazu zunächst DESC NOTIFICATION INTEGRATION my_notif_integration aus. Aus der Ausgabe brauchen wir die folgenden beiden Werte:

  • SF_AWS_IAM_USER_ARN — ARN des für Ihr Konto erstellten Snowflake-IAM-Users.
  • SF_AWS_EXTERNAL_ID — External ID des für Ihr Konto erstellten Snowflake-IAM-Users.

Im letzten Konfigurationsschritt passen wir die Trust Relationship unserer IAM-Rolle an. Wechseln Sie zurück in die AWS-Konsole, öffnen Sie Ihre IAM-Rolle und klicken Sie auf den Tab Trust relationship. Klicken Sie auf Edit trust relationship und tragen Sie im Policy-Dokument die Werte ein, die wir zuvor mit DESC aus Ihrer Notification Integration ausgelesen haben.

1{

2  "Version": "2012-10-17",

3  "Statement": [\
\
4    {\
\
5      "Sid": "",\
\
6      "Effect": "Allow",\
\
7      "Principal": {\
\
8        "AWS": "<sf_aws_iam_user_arn value from above>"\
\
9      },\
\
10      "Action": "sts:AssumeRole",\
\
11      "Condition": {\
\
12        "StringEquals": {\
\
13          "sts:ExternalId": "<sf_aws_external_id from above>"\
\
14        }\
\
15      }\
```\
\
Expand Code\
\
Die Integration zwischen Snowflake und AWS ist damit abgeschlossen!\
\

Schritt 6: Snowflake Task aktualisieren\


Jetzt müssen wir die Notification Integration nur noch dem Root-Task im DAG zuweisen. Falls Sie den Task noch nicht angelegt haben, lässt sich die Zuweisung direkt im CREATE-TASK-Statement erledigen. In meinem Fall existiert der Root-Task bereits, deshalb verwende ich den Befehl ALTER TASK:

ALTER TASK <name> SET ERROR_INTEGRATION = <integration_name>;
\

Schritt 7: Slack-Alerts aus SNS auslösen\


Jetzt brauchen wir noch einen Weg, die eingehenden SNS-Nachrichten zu verarbeiten und an Slack weiterzuleiten. Dafür legen wir eine Lambda-Funktion an, die die Nachrichten über die Slack-API verschickt. Unser SNS-Topic dient dabei als Trigger – Lambda wird ausgeführt, sobald eine neue SNS-Nachricht eintrifft.

Snowflake Error Notification Architecture with SNS and Lambda

So sieht ein JSON-String aus, den Snowflake schickt:
\

1{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"s3://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}\
```\
\
Und die formatierte Variante:\
\
```json\
1{\
\
2   "version": "1.0",\
\
3   "messageId": "a62e34bc-6141-4e95-92d8-f04fe43b43f5",\
\
4   "messageType": "INGEST_FAILED_FILE",\
\
5   "timestamp": "2021-10-22T19:15:29.471Z",\
\
6   "accountName": "MYACCOUNT",\
\
7   "pipeName": "MYDB.MYSCHEMA.MYPIPE",\
\
8   "tableName": "MYDB.MYSCHEMA.MYTABLE",\
\
9   "stageLocation": "s3://mybucket/mypath",\
\
10   "messages": [\
\
11      {\
\
12         "fileName": "/file1.csv_0_0_0.csv.gz",\
\
13         "firstError": "Numeric value 'abc' is not recognized"\
\
14      }\
\
15   ]\
```\
\
Expand Code\
\
Sie können den Payload mit eigener Logik verarbeiten – etwa nur relevante Attribute übernehmen, zusätzliche Informationen ergänzen oder die Formatierung anpassen. Zur Demonstration übernehmen wir die eingehende Nachricht unverändert und schicken sie an Slack.\
\
Zuerst formatieren wir die Nachricht ein wenig und ergänzen Einrückungen für bessere Lesbarkeit.\
\
````python\
1def format_slack_message(message):\
\
2    json_message = {\
\
3        "blocks": [\
\
4            {\
\
5            "type": "section",\
\
6            "text": {\
\
7                "text": ":red_circle: *Snowflake pipeline failure:* Snowflake notification integration output :point_down: ",\
\
8                "type": "mrkdwn"\
\
9            }\
\
10            },\
\
11            {\
\
12            "type": "section",\
\
13            "text": {\
\
14                "text": "```" + json.dumps(message,indent=2, separators=(',', ': ')) + "```",\
\
15                "type": "mrkdwn"\
````\
\
Expand Code\
\
Anschließend stellen wir die Verbindung zu Slack her und senden die Nachricht:\
\
```python\
1def lambda_handler(event, context):\
\
2    if event:\
\
3        message = format_slack_message(event)\
\
4\
\
5        if message:\
\
6            logging.info('Starting sending message to slack')\
\
7            response = requests.post(\
\
8                my_webhook_uri, data=json.dumps(message),\
\
9                headers={'Content-Type': 'application/json'}\
\
10            )\
\
11            logging.info(response.text)\
\
12            logging.info('Finished sending message to Slack webhook')\
\
13             if response.status_code != 200:\
\
14                raise ValueError(\
\
15                    'Request to slack returned an error %s, the response is:\n%s'\
```\
\
Expand Code\
\
![Snowflake Error Notification message in Slack](https://select.dev/cdn-cgi/imagedelivery/1zmOcgV1p520E4lLTrYjjg/blog/error-notifications-for-snowflake-tasks/3.png/width=1366,quality=75)\
\
Fertig! Damit haben wir eine vollständige Pipeline für Fehlerbenachrichtigungen zwischen Snowflake Tasks und Slack – umgesetzt mit einer Snowflake Notification Integration und ein paar AWS-Services.\
\

Fehlerbenachrichtigungen für Snowpipe\


Dasselbe Setup eignet sich auch für automatische Benachrichtigungen bei fehlgeschlagenen Snowpipes – setzen Sie dazu beim Anlegen oder Ändern Ihres Pipe-Objekts einfach den Parameter error_integration.
\

1CREATE PIPE mypipe\
\
2    ERROR_INTEGRATION = '<integration_name>'\
\
3    AS\
\
4    COPY INTO mytable FROM @mystage\
\
5;\
```\
\
\
Tomáš Sobotík·Senior Data Engineer & Snowflake SME bei Norlys\
\
Tomas ist langjähriger Snowflake Data SuperHero und ausgewiesener Snowflake-Experte. Seine Erfahrung in der Datenwelt reicht über mehr als ein Jahrzehnt – in dieser Zeit war er als Snowflake Data Engineer, Architekt und Admin in Projekten unterschiedlichster Branchen und Technologien tätig. Tomas ist ein zentrales Mitglied der Community, gibt sein Know-how aktiv weiter und inspiriert andere. Außerdem ist er O'Reilly-Instructor und leitet Live-Online-Trainings.\