SELECTSELECT

SELECT

Snowflake Connector for Python: come iniziare

By Ian WhitestoneApr 27, 20246 min read

Questa pagina è disponibile anche in English, Deutsch, Español, Français, 日本語 e Português.

Lo Snowflake Connector for Python è una libreria Python mantenuta dal team di Snowflake ed è il driver Python ufficiale per Snowflake. Se cerchi un modo programmatico per connetterti a Snowflake ed eseguire query in Python, sei nel posto giusto. Molte aziende, tra cui SELECT, si affidano a questa libreria per orchestrare workflow su Snowflake o per sviluppare applicazioni data-driven basate su Snowflake.

Come installare lo Snowflake Connector for Python

Lo Snowflake Connector è un pacchetto Python nativo: si installa esattamente come qualsiasi altro pacchetto Python.

Con pip

Il modo più semplice per installare la libreria è tramite pip, il sistema ufficiale di gestione dei pacchetti per Python:

1pip install snowflake-connector-python

Con conda

Se utilizzi conda per gestire i pacchetti, puoi installare la libreria con il seguente comando:

1conda install snowflake-connector-python

Esempio rapido di avvio

Ecco un esempio end-to-end di come utilizzare il pacchetto Snowflake Connector per eseguire una query ed elaborarne i risultati. L'esempio mostra il consumo totale di credit per virtual warehouse negli ultimi 30 giorni:

L'output stampato avrà un aspetto simile a questo:

('REPLICATOR_LARGE', Decimal('669.080833333'))
('REPLICATOR_XSMALL', Decimal('613.355000002'))
('REPLICATOR_XLARGE', Decimal('416.437500000'))
('BACKEND_LARGE', Decimal('162.702500031'))
('BACKEND', Decimal('105.263888856'))
('BACKEND_MEDIUM', Decimal('47.033055570'))
('REPLICATOR_MEDIUM', Decimal('40.775833333'))
('REPLICATOR_SMALL', Decimal('31.046388890'))
('BACKEND_STAGING', Decimal('26.985555532'))
('BACKEND_SMALL', Decimal('12.798333345'))
('SELECT_DOGFOOD', Decimal('11.283055542'))
('ADMIN', Decimal('10.899444408'))
('BACKEND_STAGING_MEDIUM', Decimal('6.822222224'))
('BI_XSMALL', Decimal('1.814999976'))
('CLOUD_SERVICES_ONLY', Decimal('0E-9'))

Per individuare il tuo account identifier, segui i passaggi descritti in questo articolo

Concetti fondamentali

Oggetti Connection e Cursor

Come mostrato nell'esempio rapido, gli oggetti con cui interagiamo sono due: l'oggetto connection e l'oggetto cursor.

L'oggetto connection gestisce l'autenticazione, la connessione a Snowflake e la creazione dei cursor. I singoli cursor vengono poi utilizzati per eseguire le query su Snowflake e recuperarne i risultati. Una stessa connection può avere più cursor e uno stesso cursor può essere riutilizzato per più query: non è necessario crearne uno nuovo ogni volta. Lo stesso vale per l'oggetto connection: stabilire una connessione a Snowflake richiede qualche secondo, quindi conviene farlo una sola volta.

Per approfondire i metodi disponibili su ciascun oggetto, consulta la documentazione ufficiale delle API dello Snowflake Python connector.

Query sincrone e asincrone

Con lo Snowflake Connector for Python è possibile eseguire query in due modalità: sincrona o asincrona.

Invio di una query sincrona

Quando invii una query sincrona, il processo Python resta in attesa finché la query non restituisce un risultato. È l'approccio più diffuso quando si lavora con il Python connector, grazie alla sua semplicità:

cur = conn.cursor()
cur.execute('select * from table') # Python will wait until the query is done before moving to next line
results = cur.fetchall()

Invio di una query asincrona

Con una query asincrona, il pacchetto Snowflake Python restituisce immediatamente il controllo al processo Python, senza attendere il completamento della query. È particolarmente utile quando si sviluppano applicazioni Python multi-thread (come un'applicazione web) o quando si usa Python per eseguire molte query in parallelo.

Per eseguire una query asincrona, usa il metodo del cursor execute_async al posto del classico execute. Più avanti nel codice dovrai interrogare Snowflake per verificare se la query è stata completata.

cur = conn.cursor()
cur.execute_async('select * from table') # Python will immediately move on to next line
query_id = cur.sfqid

while True:
		if conn.is_still_running(conn.get_query_status(query_id)):
				break
    time.sleep(1)

cur.get_results_from_sfqid(query_id)
results = cur.fetchall()

Controllo delle transazioni

Per impostazione predefinita, le query eseguite su Snowflake tramite il pacchetto Snowflake Connector vengono automaticamente confermate (commit).

Per un controllo più granulare sulle transazioni SQL, puoi impostare autocommit=False al momento di stabilire la connessione.

Utilizzo del context manager with

Osserva il codice seguente. Se una qualsiasi istruzione SQL fallisce, Snowflake eseguirà automaticamente il rollback. In caso contrario, le istruzioni verranno confermate automaticamente all'uscita dal blocco with.

with snowflake.connector.connect(
    user=USER,
    password=PASSWORD,
    account=ACCOUNT,
    autocommit=False,
) as conn:
    cur = conn.cursor()
    cur.execute("insert into table values(...)")
    cur.execute("insert into table values(...)")

Controllo manuale con try-except

Per un controllo ancora più granulare, puoi gestire manualmente commit e rollback delle transazioni SQL con i metodi conn.commit() e conn.close(), sfruttando il pattern try-except-finally:

con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False)
try:
    cur = conn.cursor()
    cur.execute("insert into table values(...)")
    cur.execute("insert into table values(...)")
    conn.commit()
except Exception as e:
    conn.rollback()
    raise e
finally:
    conn.close()

Chiusura della connessione

Come buona pratica, è consigliabile chiudere la connessione al termine dello script Python tramite il metodo conn.close(). Per essere certi che la connessione venga chiusa anche in caso di errore, puoi racchiudere il codice in un blocco try - finally:

try:
   cur.execute('...')
   results = cur.fetchall()
   # ...
except:
    # handle errors as needed
finally:
		conn.close()

Un altro pattern utile è il costrutto Python with(), che si occupa di chiudere automaticamente la connessione:

with snowflake.connector.connect(...) as conn:
		cur = conn.cursor()
		cur.execute('...')
		results = cur.fetchall()

# conn.close() method is automatically code after with block
# Move on to other operations not involving Snowflake
with open('filename.csv', 'w') as f:
		f.write(...)

Altri esempi

Vediamo ora altri esempi che mettono in evidenza i pattern più comuni nell'uso dello Snowflake Python Connector.

Passare parametri alle istruzioni SQL

Una necessità ricorrente per gli sviluppatori è poter passare parametri diversi alle istruzioni SQL.

Python offre un eccellente supporto alla manipolazione delle stringhe tramite le f-string, perfette proprio per questo scopo:

user_id = 123456
sql = f"select * from users where user_id={user_id}"
cur.execute(sql)

In alternativa, puoi sfruttare le bind variables:

warehouse_name='ADMIN'
sql="""
select *
from snowflake.account_usage.warehouse_metering_history
where warehouse_name=%s
"""
cur.execute(sql, (warehouse_name))

Recuperare tutti i risultati

In tutti gli esempi presentati in questo articolo abbiamo utilizzato il metodo fetchall() per ottenere tutti i risultati della query:

cur.execute('select * from table')
results=cur.fetchall()

Elaborazione a batch

Se vuoi ridurre al minimo la memoria consumata dall'applicazione Python, puoi elaborare i dati a blocchi. Ecco un esempio che processa 100.000 record alla volta:

cur.execute('select * from table')
while True:
    rows = cur.fetchmany(100000)
    if not rows:
        break
    for row in rows:
        # Process each row
        print(row)

Restituire una lista di dizionari

Per impostazione predefinita, Snowflake restituisce una lista di tuple, in cui non è chiaro a quale colonna corrisponda ciascun elemento. Ecco un esempio che mostra come restituire, per ogni riga, un dizionario in cui la chiave è il nome della colonna:

1sql = """

2	select

3		warehouse_name,

4		credits_used_compute

5	from snowflake.account_usage.warehouse_metering_history

6	limit 1

7"""

8cur.execute(sql)

9results = cur.fetchall()

10result_meta = cur.description

11results = [\
\
12    dict(zip([col.name for col in result_meta], row)) for row in results\
\
13]

14

15# >>> print(results)

Espandi codice

Eseguire più istruzioni SQL in un'unica chiamata

Il metodo cursor.execute accetta una sola istruzione SQL alla volta. Per eseguire più istruzioni SQL in un'unica chiamata, puoi utilizzare il metodo execute_string sull'oggetto connection. Otterrai in cambio una lista di cursor, che potrai utilizzare per elaborare i risultati come negli esempi precedenti.

sql="""
delete from table where user_id=123;
insert into table values(...);
"""
cursors = conn.execute_string(sql)

# return results from each cursor if needed

Impostare i parametri di sessione

Quando stabilisci la connessione a Snowflake puoi impostare comodamente diversi parametri di sessione. Ad esempio, puoi definire un query tag in modo che tutte le query eseguite tramite la connessione vengano taggate, oppure un query timeout per annullare automaticamente le query dopo un certo intervallo di tempo.

conn = snowflake.connector.connect(
    user='XXXX',
    password='XXXX',
    account='XXXX',
    session_parameters={
        'QUERY_TAG': 'COST_ANALYTICS',
        'STATEMENT_TIMEOUT_IN_SECONDS': 3600, # automatically cancel query after 1 hour
    }
)

Puoi inoltre passare altri parametri, come warehouse, per stabilire su quale virtual warehouse verranno eseguite le query. Se non viene specificato, le query gireranno sul warehouse predefinito dell'utente.

L'elenco completo dei parametri di connessione è disponibile qui.

Frequently asked
questions

Quale versione di Python serve per lo Snowflake connector?

Per utilizzare lo Snowflake connector for Python è necessario disporre di Python 3.8 o versione successiva.

Quali altre librerie Python offre Snowflake?

Dopo il rilascio dello Snowflake Connector for Python, Snowflake ha pubblicato una nuova libreria Python chiamata **Snowflake Python API.**

Questa libreria mette a disposizione diverse API per interagire con le risorse di Snowflake tramite una Python API di prima classe. Invece di usare lo Snowflake connector in Python per inviare istruzioni SQL a Snowflake, la Python API consente di interagire con Snowflake tramite comandi Python puri, senza dover scrivere una sola riga di SQL.

Ecco un esempio che crea un nuovo schema chiamato analytics contenente una tabella denominata temperature_readings.

from snowflake.snowpark import Session
from snowflake.core import Root

connection_params = {
    "account": "ACCOUNT-IDENTIFIER",
    "user": "USERNAME",
    "password": "PASSWORD",
}
session = Session.builder.configs(connection_params).create()
root = Root(session)

schema = database.schemas.create(Schema(name="analytics"), mode="orreplace")
table = schema.tables.create(
    Table(
        name="temperature_readings",

Espandi codice

Dietro le quinte, la Snowflake Python API installa la libreria Snowflake Connector e, presumibilmente, la utilizza per eseguire le query Snowflake generate da queste funzioni/API Python.

Ian Whitestone·Co-founder & CEO of SELECT

Ian è Co-founder & CEO di SELECT, piattaforma SaaS per la gestione e l'ottimizzazione dei costi di Snowflake. Prima di fondare SELECT, Ian ha guidato per 6 anni team di full stack data science & engineering in Shopify e Capital One. In Shopify ha guidato le iniziative di ottimizzazione del data warehouse e di miglioramento dell'osservabilità dei costi.