はじめに
先週ロンドンで開催されたdbtのCoalesceカンファレンスに参加し、MiroのFelipe LeiteとStephen Pastanによる素晴らしいセッションを聴く機会がありました。お二人は、データベースクローンを複数のテーブルクローンに置き換えることで大幅な高速化を実現したと話しており、これは試してみるしかないと思いました。
検証
結果は以下のクエリで収集しました。最初のクエリ開始時刻と最後のクエリ終了時刻をDATEDIFF関数に渡して、各処理の所要時間を計測しています。
select
count(*) as query_count,
datediff(seconds, min(start_time), max(end_time)) as duration,
sum(credits_used_cloud_services) as credits_used_cloud_services
from snowflake.account_usage.query_history where query_tag = X;
Snowflakeの利用を最適化
SELECTなら、Snowflakeの利用状況を自動で最適化し、手間なく管理できます。
セットアップ
スキーマ10個、各スキーマにテーブル100個を持つデータベースを作成します。
import snowflake.connector
con = snowflake.connector.connect(
...
)
for i in range(1, 11):
con.cursor().execute(f"create schema test.schema_{i};")
for j in range(1, 101):
con.cursor().execute(f"create table test.schema_{i}.table_{j} (i number) as (select 1);")
対照群 - データベースクローン
1create database test_1 clone test;
この処理の実行には22分34秒かかりました。
結果:
| クエリ数 | 1 |
|---|---|
| 所要時間 | 22分34秒 |
| クラウドサービスクレジット | 0.179 |
実験1 - スキーマ単位のクローン
import snowflake.connector
from snowflake.connector import DictCursor
def clone_database_by_schema(con, source_database, target_database):
con.cursor().execute(f"create database {target_database};")
cursor = con.cursor(DictCursor)
cursor.execute(f"show schemas in database {source_database};")
for i in cursor.fetchall():
if i["name"] not in ("INFORMATION_SCHEMA", "PUBLIC"):
con.cursor().execute_async(f"create schema {target_database}.{i['name']} clone {source_database}.{i['name']};")
con = snowflake.connector.connect(
...
session_parameters={
'QUERY_TAG': 'test 2',
コードを展開
結果:
| クエリ数 | 12 |
|---|---|
| 所要時間 | 1分47秒 |
| クラウドサービスクレジット | 0.148 |
execute_asyncを使えば、各SQL文の完了を待たずに実行できるので、10個のスキーマがすべて並列でクローンされます。通常のデータベースクローンと比較して、開始から終了までなんと10倍の速さです。
実験2 - テーブル単位のクローン
import snowflake.connector
from snowflake.connector import DictCursor
def clone_database_by_table(con, source_database, target_database):
con.cursor().execute(f"create database {target_database};")
cursor = con.cursor(DictCursor)
cursor.execute(f"show tables in database {source_database};")
results = cursor.fetchall()
schemas_to_create = {r['schema_name'] for r in results}
tables_to_clone = [f"{r['schema_name']}.{r['name']}" for r in results]
for schema in schemas_to_create:
con.cursor().execute(f"create schema {target_database}.{schema};")
for table in tables_to_clone:
コードを展開
こちらは完了まで1分48秒かかりました。ボトルネックは、クライアント側からクエリを発行できる速度(おそらくネットワークの待機時間が原因)でした。これを緩和するため、コマンドを10スレッドに分散させました。
1import snowflake.connector
2from snowflake.connector import DictCursor
3import threading
4
5class ThreadedRunCommands():
6 """Helper class for running queries across a configurable number of threads"""
7 def __init__(self, con, threads):
8 self.threads = threads
9 self.register_command_thread = 0
10 self.thread_commands = [\
\
11 [] for _ in range(self.threads)\
\
12 ]
13 self.con = con
14
15 def register_command(self, command):
コードを展開
結果:
| クエリ数 | 1012 |
|---|---|
| 所要時間 | 22秒 |
| クラウドサービスクレジット | 0.165 |
10スレッドを使った結果、create databaseコマンドの開始から最後のcreate table ... cloneコマンドの完了までに要した時間はわずか22秒。create database ... cloneコマンドと比べて60倍の速さです。ボトルネックは依然としてクエリの発行速度にあります。
まとめ
結果一覧:
| クローン手法 | 対照群 - データベースクローン | 実験1 - スキーマ単位のクローン | 実験2 - テーブル単位のクローン |
|---|---|---|---|
| クエリ数 | 1 | 12 | 1012 |
| 所要時間 | 22分34秒 | 1分47秒 | 22秒 |
| クラウドサービスクレジット | 0.179 | 0.148 | 0.165 |
実行したクエリはすべてクラウドサービスのみで処理され、稼働中のウェアハウスや一時停止中のウェアハウスの再開は不要でした。
Snowflakeがスキーマおよびデータベースのクローン機能を改善してくれることを願っていますが、それまでは、テーブル単位でクローンするのが最善策と言えそうです。
知見を共有してくれたMiroのFelipe LeiteとStephen Pastanに、改めて感謝します。
Niall Woodward・Co-founder & CTO of SELECT
Niallは、SaaS型のSnowflakeコスト管理・最適化プラットフォームであるSELECTのCo-Founder兼CTOです。SELECT創業以前は、Brooklyn Data Companyや複数のスタートアップでデータエンジニアとして活躍。オープンソース愛好家として、SQLFluffのメンテナーを務めるほか、3つのdbtパッケージ(dbt_artifacts、dbt_snowflake_monitoring、dbt_query_tags)の作者でもあります。