SELECTSELECT

SELECT

Mãos à obra com o Snowflake Connector para Python

By Ian WhitestoneApr 27, 20246 min read

Esta página também está disponível em English, Deutsch, Español, Français, Italiano e 日本語.

O Snowflake Connector para Python é uma biblioteca Python mantida pelo time da Snowflake e é o driver Python oficial do Snowflake. Se você quer se conectar ao Snowflake de forma programática e rodar queries em Python, chegou ao lugar certo. Muitas empresas, incluindo a SELECT, usam essa biblioteca para orquestrar workflows no Snowflake ou até mesmo criar aplicações orientadas a dados em cima do Snowflake.

Como instalar o Snowflake Connector para Python

O Snowflake Connector é um pacote Python nativo, ou seja, pode ser instalado como qualquer outro pacote Python.

Com o pip

A forma mais fácil de instalar a biblioteca é pelo pip, o gerenciador de pacotes oficial do Python:

1pip install snowflake-connector-python

Com o conda

Se você usa o conda como gerenciador de pacotes, instale assim:

1conda install snowflake-connector-python

Exemplo de início rápido

Veja um exemplo completo de como usar o pacote Snowflake Connector para executar uma query e processar os resultados. Este exemplo mostra o consumo total de créditos por virtual warehouse nos últimos 30 dias:

Os dados impressos na saída ficam mais ou menos assim:

('REPLICATOR_LARGE', Decimal('669.080833333'))
('REPLICATOR_XSMALL', Decimal('613.355000002'))
('REPLICATOR_XLARGE', Decimal('416.437500000'))
('BACKEND_LARGE', Decimal('162.702500031'))
('BACKEND', Decimal('105.263888856'))
('BACKEND_MEDIUM', Decimal('47.033055570'))
('REPLICATOR_MEDIUM', Decimal('40.775833333'))
('REPLICATOR_SMALL', Decimal('31.046388890'))
('BACKEND_STAGING', Decimal('26.985555532'))
('BACKEND_SMALL', Decimal('12.798333345'))
('SELECT_DOGFOOD', Decimal('11.283055542'))
('ADMIN', Decimal('10.899444408'))
('BACKEND_STAGING_MEDIUM', Decimal('6.822222224'))
('BI_XSMALL', Decimal('1.814999976'))
('CLOUD_SERVICES_ONLY', Decimal('0E-9'))

Para encontrar o identificador da sua conta, siga os passos deste post

Conceitos centrais

Objetos connection e cursor

Como vimos no exemplo de início rápido, há dois objetos distintos com os quais interagimos: o objeto connection e o objeto cursor.

O objeto connection é responsável por autenticar e se conectar ao Snowflake, além de retornar cursores. Já os cursores são usados para executar queries no Snowflake e recuperar os resultados. Uma única conexão pode ter vários cursores. O mesmo cursor pode executar várias queries — não é preciso criar um novo a cada vez. O mesmo vale para o objeto connection: estabelecer uma conexão com o Snowflake leva alguns segundos, então o ideal é fazer isso só uma vez.

Para conhecer os diferentes métodos disponíveis em cada objeto, consulte a documentação oficial da API do conector Python do Snowflake.

Queries síncronas x assíncronas

Existem duas formas de executar queries com o Snowflake Connector para Python: síncrona e assíncrona.

Enviando uma query síncrona

Ao enviar uma query síncrona, o seu processo Python fica esperando até a query retornar. É a abordagem mais comum com o conector Python, pela simplicidade:

cur = conn.cursor()
cur.execute('select * from table') # O Python vai esperar a query terminar antes de passar para a próxima linha
results = cur.fetchall()

Enviando uma query assíncrona

Com uma query assíncrona, o pacote Snowflake Python devolve o controle ao seu processo Python na hora, antes de a query terminar. Isso é especialmente útil quando você está criando aplicações multithread em Python (como uma aplicação web) ou usando Python para disparar muitas queries ao mesmo tempo.

Para rodar uma query assíncrona, use o método execute_async do cursor em vez do execute tradicional. Mais à frente no código, será preciso consultar o Snowflake para verificar se a query já terminou.

cur = conn.cursor()
cur.execute_async('select * from table') # O Python passa imediatamente para a próxima linha
query_id = cur.sfqid

while True:
		if conn.is_still_running(conn.get_query_status(query_id)):
				break
    time.sleep(1)

cur.get_results_from_sfqid(query_id)
results = cur.fetchall()

Controle de transações

Por padrão, quando você roda queries no Snowflake pelo pacote Snowflake Connector, elas sofrem commit automaticamente.

Para um controle mais fino sobre as transações SQL, defina autocommit=False ao abrir a conexão.

Usando o context manager with

Veja o código a seguir. Se alguma das instruções SQL falhar, o Snowflake faz rollback automaticamente. Se forem bem-sucedidas, o commit acontece automaticamente quando o bloco with é encerrado.

with snowflake.connector.connect(
    user=USER,
    password=PASSWORD,
    account=ACCOUNT,
    autocommit=False,
) as conn:
    cur = conn.cursor()
    cur.execute("insert into table values(...)")
    cur.execute("insert into table values(...)")

Controle manual com try-except

Para um controle ainda mais fino, dá para fazer commit das transações SQL manualmente ou rollback usando os métodos conn.commit() e conn.close() junto com o padrão try-except-finally:

con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False)
try:
    cur = conn.cursor()
    cur.execute("insert into table values(...)")
    cur.execute("insert into table values(...)")
    conn.commit()
except Exception as e:
    conn.rollback()
    raise e
finally:
    conn.close()

Fechando a conexão

Como boa prática, feche a conexão no final do seu script Python com o método conn.close(). Para garantir que a conexão seja fechada mesmo se o script encontrar um erro, envolva o código em um bloco try - finally:

try:
   cur.execute('...')
   results = cur.fetchall()
   # ...
except:
    # trate os erros conforme necessário
finally:
		conn.close()

Outro padrão útil é usar o with() do Python, que fecha a conexão automaticamente:

with snowflake.connector.connect(...) as conn:
		cur = conn.cursor()
		cur.execute('...')
		results = cur.fetchall()

# o método conn.close() é chamado automaticamente após o bloco with
# Siga para outras operações que não envolvam o Snowflake
with open('filename.csv', 'w') as f:
		f.write(...)

Mais exemplos

Vamos a mais alguns exemplos que mostram outros padrões comuns com o Snowflake Python Connector.

Passando parâmetros para a instrução SQL

Uma necessidade comum entre desenvolvedores é poder passar parâmetros diferentes para as instruções SQL.

O Python tem ótimo suporte para manipulação de strings com f-strings, ideais para esse caso:

user_id = 123456
sql = f"select * from users where user_id={user_id}"
cur.execute(sql)

Como alternativa, dá para usar bind variables:

warehouse_name='ADMIN'
sql="""
select *
from snowflake.account_usage.warehouse_metering_history
where warehouse_name=%s
"""
cur.execute(sql, (warehouse_name))

Buscando todos os resultados

Em todos os exemplos deste post, usamos o método fetchall() para retornar todos os resultados da query:

cur.execute('select * from table')
results=cur.fetchall()

Processando em lotes

Se quiser reduzir o consumo de memória da sua aplicação Python, dá para processar os dados em lotes. Veja um exemplo que processa 100 mil registros por vez:

cur.execute('select * from table')
while True:
    rows = cur.fetchmany(100000)
    if not rows:
        break
    for row in rows:
        # Processa cada linha
        print(row)

Retornando uma lista de dicionários

Por padrão, o Snowflake retorna uma lista de tuplas e você não sabe a qual coluna cada entrada da tupla corresponde. Veja um exemplo de como retornar uma lista de dicionários para cada linha, em que a chave é o nome da coluna:

1sql = """

2	select

3		warehouse_name,

4		credits_used_compute

5	from snowflake.account_usage.warehouse_metering_history

6	limit 1

7"""

8cur.execute(sql)

9results = cur.fetchall()

10result_meta = cur.description

11results = [\
\
12    dict(zip([col.name for col in result_meta], row)) for row in results\
\
13]

14

15# >>> print(results)

Expandir código

Executando várias instruções SQL de uma vez

Pelo método cursor.execute, só é possível passar uma única instrução SQL. Para executar várias instruções SQL de uma vez, use o método execute_string no objeto connection. Ele retorna uma lista de cursores, que você pode aproveitar para processar os resultados, se necessário, como nos exemplos acima.

sql="""
delete from table where user_id=123;
insert into table values(...);
"""
cursors = conn.execute_string(sql)

# retorna os resultados de cada cursor, se necessário

Definindo parâmetros de sessão

Ao abrir a conexão com o Snowflake, dá para definir vários parâmetros de sessão de forma prática. Por exemplo, você pode definir uma query tag para que todas as queries executadas a partir da sua conexão fiquem marcadas, ou configurar um timeout de query para que as queries sejam canceladas automaticamente após um determinado período.

conn = snowflake.connector.connect(
    user='XXXX',
    password='XXXX',
    account='XXXX',
    session_parameters={
        'QUERY_TAG': 'COST_ANALYTICS',
        'STATEMENT_TIMEOUT_IN_SECONDS': 3600, # cancela a query automaticamente após 1 hora
    }
)

Também é possível passar outros parâmetros, como warehouse, para controlar em qual virtual warehouse as queries vão rodar. Se nada for informado, elas rodam no warehouse padrão do usuário.

Veja a lista completa de parâmetros de conexão aqui.

Frequently asked
questions

Qual versão do Python é necessária para o conector do Snowflake?

Para usar o conector do Snowflake para Python, é preciso estar na versão 3.8 do Python ou superior.

Que outras bibliotecas Python o Snowflake oferece?

Depois do Snowflake Connector para Python, a Snowflake lançou uma nova biblioteca Python chamada **Snowflake Python API.**

Essa biblioteca oferece várias APIs para interagir com recursos do Snowflake por meio de uma API Python de primeira classe. Em vez de usar o conector do Snowflake em Python para enviar instruções SQL ao Snowflake, a Python API permite interagir com o Snowflake usando comandos Python puros, sem precisar escrever SQL.

Veja um exemplo que cria um novo schema chamado analytics com uma tabela chamada temperature_readings.

from snowflake.snowpark import Session
from snowflake.core import Root

connection_params = {
    "account": "ACCOUNT-IDENTIFIER",
    "user": "USERNAME",
    "password": "PASSWORD",
}
session = Session.builder.configs(connection_params).create()
root = Root(session)

schema = database.schemas.create(Schema(name="analytics"), mode="orreplace")
table = schema.tables.create(
    Table(
        name="temperature_readings",

Expandir código

Por baixo dos panos, a Snowflake Python API instala a biblioteca Snowflake Connector e, presumivelmente, a usa para executar as queries Snowflake geradas por essas funções/APIs em Python.

Ian Whitestone·Co-founder & CEO da SELECT

Ian é Co-founder & CEO da SELECT, uma plataforma SaaS de gestão e otimização de custos do Snowflake. Antes de fundar a SELECT, Ian passou 6 anos liderando times de full stack data science & engineering na Shopify e na Capital One. Na Shopify, Ian liderou o trabalho de otimização do data warehouse e o aumento da observabilidade de custos.