SELECTSELECT

SELECT

Come accelerare i range join in Snowflake di 300 volte

By Ian WhitestoneJan 15, 202312 min read

Questa pagina è disponibile anche in English, Deutsch, Español, Français, 日本語 e Português.

I range join e gli altri tipi di non-equi join sono notoriamente lenti nella maggior parte dei database. Snowflake è velocissimo sulla maggior parte delle query, ma anche lui soffre di performance scadenti quando deve gestire questo tipo di join. In questo articolo vedremo una tecnica di ottimizzazione che consente di accelerare fino a 300 volte le query con range join 1.

Prima di entrare nel merito della tecnica, faremo un breve ripasso sui vari tipi di join e sui motivi che rendono i range join così lenti in Snowflake. Se conosce già l'argomento, può passare direttamente alla sezione successiva.

Equi-join e non-equi join a confronto

Un equi-join è un join che prevede una condizione di uguaglianza. La maggior parte degli utenti scrive query con una o più condizioni di equi-join.

select
  ...
from orders
join customers
  on orders.customer_id=customers.id -- esempio di condizione di equi-join

Un non-equi join, invece, prevede una condizione di disuguaglianza. Un esempio potrebbe essere l'individuazione dei clienti che hanno acquistato lo stesso prodotto:

select distinct
  o1.customer_id,
  o2.customer_id,
  o1.product_id
from orders_items as o1
join orders_items as o2
  on o1.product_id=o2.product_id -- condizione di equi-join
  and o1.customer_id<>o2.customer_id -- condizione di non-equi join

Oppure trovare, per ciascun cliente, tutti gli ordini successivi a una determinata data:

select
  ...
from orders
inner join customers
  on orders.customer_id=customers.id
  and orders.created_at > customers.one_year_anniversary_date

Cosa sono i range join?

Un range join è un tipo specifico di non-equi join. Si verifica quando un join controlla se un valore ricade in un determinato intervallo ("point in interval join") oppure quando cerca due periodi che si sovrappongono ("interval overlap join").

Point in interval range join

Un esempio di point in interval range join è il calcolo del numero di query in esecuzione per ogni secondo:

select
    seconds.timestamp,
    count(queries.query_id) as num_queries
from seconds
left join queries
    on seconds.timestamp between
      date_trunc('second', queries.start_time) and date_trunc('second', queries.end_time)
group by 1

Questo tipo di join può basarsi anche su timestamp derivati. Per esempio, trovare tutti gli eventi di acquisto avvenuti entro 24 ore dalla visualizzazione della home page da parte degli utenti:

select
  ...
from page_views
inner join events
  on events.event_type='purchase' -- condizione di filtro
  and page_views.pathname = '/' -- condizione di filtro
  and page_views.user_id=events.user_id -- condizione di equi-join
  and page_views.viewed_at < events.event_at -- condizione di range join
  and dateadd('hour', 24, page_views.viewed_at) >= events.event_at -- condizione di range join

Interval overlap range join

Si parla di interval overlap range join quando una query cerca di mettere in corrispondenza periodi che si sovrappongono. Immagini di dover trovare, per ogni sessione di navigazione sulla landing page, tutte le altre sessioni avvenute contemporaneamente all'interno dell'applicazione:

select
    s1.session_id,
    array_agg(s2.session_id) as concurrent_sessions
from landing_page_sessions as s1
inner join app_sessions as s2
   on s1.end_time > s2.start_time
   and s1.start_time < s2.end_time
group by 1

Perché i range join sono lenti in Snowflake?

I range join sono lenti in Snowflake perché vengono eseguiti come join cartesiani con una condizione di filtro applicata a valle. Un join cartesiano, noto anche come cross join, restituisce il prodotto cartesiano dei record tra i due dataset coinvolti. Se entrambe le tabelle contengono 10mila record, l'output del join cartesiano sarà di 100 milioni di record! Tra gli addetti ai lavori si parla spesso di "join explosion" 💥. L'esecuzione della query può rallentare in modo significativo quando Snowflake si trova a elaborare dataset intermedi così grandi.

Approfondiamo il discorso riprendendo l'esempio del "numero di query in esecuzione al secondo".

select
    s1.session_id,
    array_agg(s2.session_id) as concurrent_sessions
from landing_page_sessions as s1
inner join app_sessions as s2
   on s1.end_time > s2.start_time
   and s1.start_time < s2.end_time
group by 1

La nostra tabella seconds contiene 1 riga per ogni secondo, mentre la tabella queries contiene 1 riga per query. L'obiettivo è individuare quali query erano in esecuzione in ciascun secondo, per poi aggregare e contare.

Esempio di tabelle di input per una query con range join in Snowflake

Durante l'esecuzione del join, Snowflake crea prima un dataset intermedio che è il prodotto cartesiano dei due dataset di input. In questo esempio la tabella seconds ha 7 righe e la tabella queries ne ha 4, quindi il dataset intermedio esplode a 28 righe. La condizione di range join che esegue il controllo "point in interval" viene applicata dopo la creazione di questo dataset intermedio, come filtro post-join. Nell'immagine qui sotto può vedere una rappresentazione del processo (apra questa pagina per la versione a schermo intero in alta risoluzione).

Esempio di query con range join Snowflake non ottimizzata

L'esecuzione di questa query su un campione di 30 giorni di dati con 267K query ha richiesto 12 minuti e 30 secondi. Come si vede nel query profile, il join è chiaramente il collo di bottiglia. Si può notare anche la condizione di range join, espressa come "Additional Join Condition":

Esempio di query profile di un range join Snowflake non ottimizzato

Come ottimizzare i range join in Snowflake

Nell'esecuzione dei range join, il collo di bottiglia per Snowflake è il volume di dati prodotto nel dataset intermedio prima che la condizione di range join venga applicata come filtro post-join. Per accelerare queste query bisogna trovare il modo di ridurre al minimo le dimensioni del dataset intermedio. Lo si può fare aggiungendo una condizione di equi-join, che Snowflake elabora molto rapidamente tramite hash join. 2

Ridurre al minimo l'esplosione delle righe

Il principio di fondo è intuitivo — rimpicciolire i dataset — ma metterlo in pratica è meno semplice. Come si può ridurre il dataset intermedio prima di applicare il filtro post-join del range join? Restando all'esempio delle query al secondo, viene istintivo aggiungere una condizione di equi-join su qualcosa come l'hour di ciascun timestamp:

select
    seconds.timestamp,
    count(queries.query_id) as num_queries
from seconds
left join queries
    on date_trunc('hour', seconds.timestamp)=date_trunc('hour', queries.start_time) -- NUOVA: condizione di equi-join
    and seconds.timestamp -- condizione di range join
      between date_trunc('second', queries.start_time) and date_trunc('second', queries.end_time)
group by 1

Sembra promettente, ma l'approccio non regge quando l'intervallo (la durata totale della query) supera l'ora. Dato che l'equi-join si basa sull'ora di inizio della query, tutti i record nelle ore successive verrebbero esclusi dal conteggio.

Si può risolvere il problema creando un dataset intermedio, query_hours, che contiene 1 riga per query per ogni ora in cui la query è stata in esecuzione. A quel punto diventa sicuro fare il join sull'hour, perché avremo 1 riga per ogni ora di esecuzione. Nessun record viene scartato per errore.

with
query_hours as (
  select
    queries.*,
    hours_list.timestamp as query_hour
  from queries
  inner join hours_list -- dataset che contiene 1 riga per ora
    on hours_list.timestamp between date_trunc('hour', queries.start_time) and date_trunc('hour', queries.end_time)
)
select
    seconds.timestamp,
    count(queries.query_id) as num_queries
from seconds
left join query_hours as queries
    on date_trunc('hour', seconds.timestamp)=queries.query_hour -- NUOVA: condizione di equi-join

Espandi codice

Avrà notato che la CTE query_hours contiene a sua volta un range join: non sarà lento anche quello? Applicato alle query giuste, il tempo extra speso nella preparazione del dataset di input si traduce in una query complessivamente molto più veloce 3. Un'altra perplessità potrebbe riguardare il fatto che il dataset query_hours diventi molto più grande di quello queries originale, dato che si espande a 1 riga per query per ora. Ma poiché la maggior parte delle query termina ben prima di un'ora, il dataset query_hours avrà dimensioni simili a quello queries di partenza.

Aggiungere la nuova condizione di equi-join sugli hour aiuta ad accelerare questa query con range join limitando le dimensioni del dataset intermedio. Tuttavia, l'approccio non è ideale per diverse ragioni. Forse hour non è la scelta migliore e converrebbe usare un altro vincolo. Inoltre, come si può estendere questo approccio per supportare range join su altri tipi numerici, come interi e float?

Ottimizzazione binned range join

Possiamo estendere le idee viste sopra a un approccio più generico, basato sui 'bin' 4.

Indicando a Snowflake di applicare la condizione di range join solo su sottoinsiemi di dati più piccoli, l'operazione di join diventa molto più veloce. Per ogni timestamp, Snowflake ora si limita a fare il join con le query eseguite nella stessa ora, anziché con tutte le query mai eseguite.

Invece di limitarci a intervalli predefiniti come "ora", "minuto" o "giorno", possiamo usare bin di dimensione arbitraria. Per esempio, se la maggior parte delle query si esegue in meno di 2 secondi, possiamo raggruppare le query in bin da 2 secondi ciascuno.

L'algoritmo sarebbe più o meno questo:

  1. Generare i bin e aggiungere il numero di bin a ciascun dataset
  2. Aggiungere il vincolo di equi-join al range join usando bin_num, in modo analogo a quanto fatto sopra con hour.
  3. Il dataset intermedio creato sarà ora molto più piccolo.
  4. Come al solito, Snowflake applica il range join come filtro post-join. Questa volta, però, molto più rapidamente.

Nell'immagine qui sotto può vedere una rappresentazione del processo (apra questa pagina per la versione a schermo intero in alta risoluzione).

Esempio di query con range join Snowflake ottimizzata

Esempio di query binned range join

I numeri di bin sono semplicemente interi che rappresentano un intervallo di dati. Un modo per crearli è dividere il numero per la dimensione di bin desiderata. Con i timestamp possiamo prima convertire il timestamp in unix time, che è un intero, e poi fare la divisione:

-- per bin da 60 secondi
select
  timestamp,
  floor(date_part(epoch_second, timestamp) / 60) as bin_num

Salveremo questa logica in una funzione, get_bin_number 5, per non doverla ripetere ogni volta.

Seguendo i passaggi descritti sopra, dobbiamo prima generare la lista dei bin applicabili. Lo si fa utilizzando un generatore per creare una lista di interi, per poi filtrarla in base ai numeri di bin di inizio e fine desiderati 6.

set bin_size_s = 60;

with
metadata as (
    select
        -- questa sarebbe una query sull'intervallo di tempo desiderato
        min(timestamp) as start_time,
        max(timestamp) as end_time,
        get_bin_number(start_time, $bin_size_s) as bin_num_start,
        get_bin_number(end_time, $bin_size_s) as bin_num_end
    from seconds
),
-- serve una CTE con 1 riga tra bin_num_start e bin_num_end
-- bisogna prima generare una lista enorme, poi filtrarla perché non si possono passare valori calcolati
-- con bins_base a 1 trilione il filtraggio richiede 5 secondi. 106 ms per 1 milione

Espandi codice

Ora possiamo aggiungere il numero di bin a ciascun dataset. Per queries produrremo un dataset con 1 riga per query per ogni bin in cui la query è stata eseguita. Per seconds, ogni timestamp verrà mappato a un singolo bin.

queries_w_bin_number as (
    select
        start_time,
        end_time,
        warehouse_id,
        cluster_number,
        bins.bin_num
    from queries
    inner join bins
        on bins.bin_num between
          get_bin_number(queries.start_time, $bin_size_s) and get_bin_number(queries.end_time, $bin_size_s)
),
seconds_w_bin_number as (
    select
        timestamp,

Espandi codice

E applichiamo la condizione di join finale, con la condizione di equi-join aggiuntiva su bin_num:

select
    s.timestamp,
    count(q.warehouse_id) as num_queries
from seconds_w_bin_number as s
left join queries_w_bin_number as q
    on s.bin_num=q.bin_num
    and s.timestamp between date_trunc('second', q.start_time) and date_trunc('second', q.end_time)
group by 1

Sullo stesso dataset di prima, questa query{% superscript id=7 /%} viene eseguita in 2,2 secondi, contro i 750 secondi della versione non ottimizzata vista in precedenza. Un miglioramento di oltre 300 volte. Il query profile è mostrato qui sotto. Si noti come la condizione di join compaia ora in due sezioni: una per la condizione di equi-join su bin_num e l'altra per la condizione di range join.

Esempio di query profile di un range join Snowflake ottimizzato

Scegliere la dimensione di bin corretta

Un aspetto fondamentale per far funzionare questa strategia è scegliere la giusta dimensione di bin. Ogni bin deve contenere un intervallo di valori ristretto, in modo da ridurre al minimo l'esplosione di righe nel dataset intermedio prima dell'applicazione del filtro di range join. D'altra parte, se la dimensione scelta è troppo piccola, le dimensioni della "tabella di destra" (queries) aumenteranno in modo significativo quando viene espansa a 1 riga per bin.

Secondo Databricks, una buona regola empirica è scegliere il 90° percentile della durata dell'intervallo. Lo si può calcolare con la funzione approx_percentile insieme a DATEDIFF. Qui sotto trova i valori per il dataset di esempio queries usato in tutto l'articolo.

select
    approx_percentile(datediff('second', start_time, end_time), 0.5) as p50, -- 2s
    approx_percentile(datediff('second', start_time, end_time), 0.90) as p90, -- 30s
    approx_percentile(datediff('second', start_time, end_time), 0.95) as p95, -- 120s
    approx_percentile(datediff('second', start_time, end_time), 0.99) as p99, -- 600s
    approx_percentile(datediff('second', start_time, end_time), 0.999) as p999, -- 900s
    count(*) -- 267K
from queries

Le regole empiriche non sono perfette. Se possibile, provi la query con diverse dimensioni di bin per capire quale offre le prestazioni migliori. Ecco la curva di performance per la query precedente, al variare della dimensione di bin. In questo caso, scegliere il 99,9° percentile invece del 90° non ha fatto grande differenza. Come previsto, i tempi di esecuzione hanno cominciato a peggiorare quando la dimensione di bin è diventata davvero piccola.

Curva di performance dell'ottimizzazione binned range join in Snowflake

Come estendere questa tecnica a un join con intervallo fisso?

  • Spiegare come estendere la tecnica a un point in interval join con intervallo fisso
  • La dimensione di bin va impostata pari alla dimensione dell'intervallo fisso

Se ha un point in interval range join con intervallo fisso, come la query vista prima:

select
  ...
from page_views
inner join events
  on events.event_type='purchase' -- condizione di filtro
  and page_views.pathname = '/' -- condizione di filtro
  and page_views.user_id=events.user_id -- condizione di equi-join
  and page_views.viewed_at < events.event_at -- condizione di range join
  and dateadd('hour', 24, page_views.viewed_at) >= events.event_at -- condizione di range join

Imposti la dimensione di bin pari a quella dell'intervallo: 24 ore.

Come estendere la tecnica a un interval overlap range join?

Se ha a che fare con un interval overlap range join, come quello qui sotto:

select
    s1.session_id,
    array_agg(s2.session_id) as concurrent_sessions
from landing_page_sessions as s1
inner join app_sessions as s2
   on s1.end_time > s2.start_time
   and s1.start_time < s2.end_time
group by 1

Può applicare la stessa tecnica di binned range join dopo aver espanso sia landing_page_sessions sia app_sessions in modo che contengano 1 riga per sessione per ogni bin in cui la sessione ricade (come fatto sopra con queries).

Quando conviene usare questa ottimizzazione?

Come primo passo, verifichi che il range join sia davvero un collo di bottiglia usando il query profile di Snowflake per controllare che sia tra i nodi più costosi nell'esecuzione della query. Aggiungere l'ottimizzazione binned range join, infatti, rende le query più difficili da leggere e mantenere.

La tecnica di binned range join funziona solo per point in interval e interval overlap range join su tipi numerici. Non funziona invece per gli altri tipi di non-equi join, anche se è sempre possibile applicare lo stesso principio: aggiungere un vincolo di equi-join ovunque possibile per ridurre l'esplosione di righe.

Se il dataset "di destra", quello con i tempi di start ed end, presenta una distribuzione relativamente piatta delle durate degli intervalli, questa tecnica risulterà meno efficace.

Note

  1. Il dato si riferisce a una singola query, quindi va preso con le dovute cautele. I risultati possono variare in base a molti fattori.

  2. L'approccio si ispira al post di Simeon Pilgrim del 2016 (quando Snowflake era ancora snowflake.net!). L'ho usato con ottimi risultati fino a quando non ho implementato l'approccio più generico basato sul binning.

  3. Il range join con la tabella hours sarà molto più rapido del range join con la tabella seconds, perché la tabella intermedia sarà circa 3600 volte più piccola.

  4. L'approccio si ispira a Databricks. Non scendono nel dettaglio dell'implementazione del loro algoritmo, ma presumo funzioni in modo analogo.

  5. In via opzionale, può creare una funzione get_bin_number per evitare di duplicare lo stesso calcolo in tutta la query:

create or replace function get_bin_number(timestamp timestamp_tz, bin_size_s integer)
  returns integer
  as
  $$
    floor(date_part(epoch_second, timestamp) / bin_size_s)
  $$
  1. Snowflake non permette di passare valori calcolati al generatore, quindi è stato necessario procedere in due fasi. A breve renderemo open source alcune macro dbt per astrarre questo processo.

  2. La query completa di esempio per l'ottimizzazione binned range join:

create or replace function get_bin_number(timestamp timestamp_tz, bin_size_s integer)
  returns integer
  as
  $$
    floor(date_part(epoch_second, timestamp) / bin_size_s)
  $$
;

set bin_size_s = 60;

with
metadata as (
    select
        -- Ottieni l'intervallo di tempo coperto dalla query
        min(timestamp) as start_time,

Espandi codice

Ian Whitestone·Co-founder & CEO di SELECT

Ian è Co-founder & CEO di SELECT, piattaforma SaaS per la gestione e l'ottimizzazione dei costi di Snowflake. Prima di fondare SELECT, ha guidato per 6 anni team full stack di data science ed engineering in Shopify e Capital One. In Shopify si è occupato dell'ottimizzazione del data warehouse e dell'aumento della cost observability.