Snowflake Connector for Pythonは、Snowflakeチームが開発・運用しているPythonライブラリで、Snowflake公式のPythonドライバーです。プログラムからSnowflakeに接続してクエリを実行したい方は、ぜひ本記事を参考にしてください。SELECTをはじめ、多くの企業がこのライブラリを使ってSnowflake上のワークフローをオーケストレーションしたり、Snowflakeを基盤としたデータ駆動型アプリケーションを構築しています。
Snowflake Connector for Pythonのインストール方法
Snowflake ConnectorはネイティブなPythonパッケージなので、他のPythonパッケージと同じ手順でインストールできます。
pipを使う場合
もっとも手軽なのは、Python公式のパッケージ管理システムであるpipを使う方法です。
1pip install snowflake-connector-python
condaを使う場合
パッケージ管理にcondaを使っている場合は、次のコマンドでインストールできます。
1conda install snowflake-connector-python
クイックスタート
Snowflake Connectorパッケージでクエリを実行し、結果を処理する一連の流れを示すサンプルコードです。この例では、直近30日間の仮想ウェアハウス別クレジット消費量の合計を取得しています。
出力されるデータは次のような形式になります。
('REPLICATOR_LARGE', Decimal('669.080833333'))
('REPLICATOR_XSMALL', Decimal('613.355000002'))
('REPLICATOR_XLARGE', Decimal('416.437500000'))
('BACKEND_LARGE', Decimal('162.702500031'))
('BACKEND', Decimal('105.263888856'))
('BACKEND_MEDIUM', Decimal('47.033055570'))
('REPLICATOR_MEDIUM', Decimal('40.775833333'))
('REPLICATOR_SMALL', Decimal('31.046388890'))
('BACKEND_STAGING', Decimal('26.985555532'))
('BACKEND_SMALL', Decimal('12.798333345'))
('SELECT_DOGFOOD', Decimal('11.283055542'))
('ADMIN', Decimal('10.899444408'))
('BACKEND_STAGING_MEDIUM', Decimal('6.822222224'))
('BI_XSMALL', Decimal('1.814999976'))
('CLOUD_SERVICES_ONLY', Decimal('0E-9'))
アカウント識別子の調べ方は、こちらの記事の手順をご覧ください。
押さえておきたい基本
ConnectionオブジェクトとCursorオブジェクト
先ほどのクイックスタートの例で見たとおり、扱うオブジェクトはConnectionオブジェクトとCursorオブジェクトの2種類です。
Connectionオブジェクトは、Snowflakeへの認証・接続と、カーソルの発行を担当します。一方Cursorオブジェクトは、Snowflakeに対してクエリを実行し、結果を取得するために使います。1つのコネクションから複数のカーソルを生成でき、同じカーソルで何度でもクエリを実行できるため、毎回新しく作り直す必要はありません。これはConnectionオブジェクトについても同じです。Snowflakeへの接続確立には数秒かかるため、一度確立したコネクションは使い回すのが望ましいでしょう。
各オブジェクトで利用できるメソッドの詳細は、Snowflake Python connector APIの公式ドキュメントをご覧ください。
同期クエリと非同期クエリ
Snowflake Connector for Pythonでクエリを実行する方法には、同期と非同期の2通りがあります。
同期クエリを実行する
同期クエリを実行すると、Pythonプロセスはクエリの結果が返るまで待機します。シンプルで扱いやすいため、Pythonコネクタを使う際にもっとも一般的なアプローチです。
cur = conn.cursor()
cur.execute('select * from table') # クエリが完了するまで次の行に進まない
results = cur.fetchall()
非同期クエリを実行する
非同期クエリでは、Snowflake Pythonパッケージはクエリの完了を待たずに、すぐにPythonプロセスへ制御を返します。マルチスレッドのアプリケーション(Webアプリなど)を構築する場面や、Pythonから多数のクエリを同時に実行したい場面で特に役立ちます。
非同期クエリを実行するには、通常のexecuteメソッドに代えてexecute_asyncメソッドを呼び出します。その後、コード内でSnowflakeに対してクエリの完了状況をポーリングする必要があります。
cur = conn.cursor()
cur.execute_async('select * from table') # 待たずに次の行へ進む
query_id = cur.sfqid
while True:
if conn.is_still_running(conn.get_query_status(query_id)):
break
time.sleep(1)
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
トランザクション制御
Snowflake Connectorパッケージを通じて実行されるクエリは、デフォルトで自動コミットされます。
SQLトランザクションをより細かく制御したい場合は、接続時にautocommit=Falseを指定します。
withコンテキストマネージャーを使う
次のコードを見てみましょう。いずれかのSQL文が失敗すると、Snowflakeは自動的にロールバックを行います。すべて成功した場合は、withブロックを抜けるタイミングで自動的にコミットされます。
with snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
autocommit=False,
) as conn:
cur = conn.cursor()
cur.execute("insert into table values(...)")
cur.execute("insert into table values(...)")
try-exceptで手動制御する
さらに細かく制御したい場合は、conn.commit()とconn.close()メソッドをtry-except-finallyパターンと組み合わせて、SQLトランザクションを手動でコミット・ロールバックできます。
con = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
autocommit=False)
try:
cur = conn.cursor()
cur.execute("insert into table values(...)")
cur.execute("insert into table values(...)")
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
コネクションを閉じる
ベストプラクティスとして、Pythonスクリプトの末尾ではconn.close()メソッドでコネクションを閉じるようにしましょう。エラーが発生した場合でも確実にコネクションを閉じるには、コードをtry - finallyブロックで囲みます。
try:
cur.execute('...')
results = cur.fetchall()
# ...
except:
# 必要に応じてエラーを処理
finally:
conn.close()
もう1つ便利な方法として、Pythonのwith()構文を使うとコネクションが自動的に閉じられます。
with snowflake.connector.connect(...) as conn:
cur = conn.cursor()
cur.execute('...')
results = cur.fetchall()
# withブロックを抜けると自動的にconn.close()が呼ばれる
# Snowflake以外の処理に進む
with open('filename.csv', 'w') as f:
f.write(...)
実践的な活用例
Snowflake Python Connectorでよく使われるパターンを、もう少し掘り下げて見ていきましょう。
SQL文にパラメータを渡す
開発の現場でよくあるのが、SQL文に動的なパラメータを渡したいというニーズです。
Pythonには文字列操作に便利なf-stringがあり、まさにこうした用途で活躍します。
user_id = 123456
sql = f"select * from users where user_id={user_id}"
cur.execute(sql)
あるいは、バインド変数を使う方法もあります。
warehouse_name='ADMIN'
sql="""
select *
from snowflake.account_usage.warehouse_metering_history
where warehouse_name=%s
"""
cur.execute(sql, (warehouse_name))
すべての結果を取得する
本記事で紹介してきたサンプルでは、いずれもfetchall()メソッドを使ってクエリの全結果を取得しています。
cur.execute('select * from table')
results=cur.fetchall()
バッチで処理する
Pythonアプリのメモリ使用量を抑えたい場合は、データをバッチ単位で処理する方法があります。次の例では、10万件ずつ処理しています。
cur.execute('select * from table')
while True:
rows = cur.fetchmany(100000)
if not rows:
break
for row in rows:
# 各行を処理
print(row)
辞書のリストとして取得する
デフォルトではSnowflakeはタプルのリストを返すため、タプル内の各値がどのカラムに対応するのかが分かりません。次の例では、各行を辞書として返す方法を示します。辞書のキーがカラム名になります。
1sql = """
2 select
3 warehouse_name,
4 credits_used_compute
5 from snowflake.account_usage.warehouse_metering_history
6 limit 1
7"""
8cur.execute(sql)
9results = cur.fetchall()
10result_meta = cur.description
11results = [\
\
12 dict(zip([col.name for col in result_meta], row)) for row in results\
\
13]
14
15# >>> print(results)
コードを展開
複数のSQL文を一度に実行する
cursor.executeメソッドに渡せるSQL文は1つだけです。複数のSQL文をまとめて実行したい場合は、Connectionオブジェクトのexecute_stringメソッドを使います。カーソルのリストが返るので、必要に応じて上記の例と同様に結果を処理できます。
sql="""
delete from table where user_id=123;
insert into table values(...);
"""
cursors = conn.execute_string(sql)
# 必要に応じて各カーソルから結果を取得
セッションパラメータを設定する
Snowflakeへの接続時には、各種セッションパラメータをまとめて指定できます。たとえば、クエリタグを設定して接続経由で実行されるすべてのクエリにタグ付けしたり、クエリタイムアウトを設定して一定時間を超えたクエリを自動キャンセルしたりといった使い方が可能です。
conn = snowflake.connector.connect(
user='XXXX',
password='XXXX',
account='XXXX',
session_parameters={
'QUERY_TAG': 'COST_ANALYTICS',
'STATEMENT_TIMEOUT_IN_SECONDS': 3600, # 1時間経過したクエリを自動キャンセル
}
)
このほか、warehouseなどのパラメータを指定して、クエリを実行する仮想ウェアハウスを切り替えることもできます。指定しなかった場合は、そのユーザーのデフォルトウェアハウスでクエリが実行されます。
接続パラメータの一覧はこちらから確認できます。
Frequently asked
questions
Snowflakeコネクタには、どのバージョンのPythonが必要ですか?
Snowflake Connector for Pythonを使うには、Python 3.8以降が必要です。
Snowflakeは他にどのようなPythonライブラリを提供していますか?
Snowflake Connector for Pythonのリリース以降、Snowflakeは**Snowflake Python API.**という新しいPythonライブラリも公開しています。
このライブラリは、Snowflakeのリソースをファーストクラスのオブジェクトとして扱えるPython APIを多数提供します。PythonコネクタのようにSQL文をSnowflakeに送るのではなく、SQLを書かずに純粋なPythonコマンドだけでSnowflakeを操作できるのが特長です。
次の例では、analyticsという新しいスキーマと、その中にtemperature_readingsというテーブルを作成しています。
from snowflake.snowpark import Session
from snowflake.core import Root
connection_params = {
"account": "ACCOUNT-IDENTIFIER",
"user": "USERNAME",
"password": "PASSWORD",
}
session = Session.builder.configs(connection_params).create()
root = Root(session)
schema = database.schemas.create(Schema(name="analytics"), mode="orreplace")
table = schema.tables.create(
Table(
name="temperature_readings",
コードを展開
内部的には、Snowflake Python APIはSnowflake Connectorライブラリをインストールしており、これらのPython関数・APIから生成されたクエリの実行にも、おそらく同ライブラリが利用されています。
Ian Whitestone・Co-founder & CEO of SELECT
Ianは、Snowflakeのコスト管理・最適化SaaSプラットフォームであるSELECTの共同創業者兼CEOです。SELECTを立ち上げる前は、ShopifyとCapital Oneで6年間にわたりフルスタックのデータサイエンス・エンジニアリングチームを率いてきました。Shopify在籍時には、データウェアハウスの最適化とコスト可視化の取り組みを主導しています。