Incremental load in Query Service
The incremental load design pattern is a solution for managing data. The pattern only processes information in the dataset that has been created or modified since the last load execution.
Incremental load uses various features provided by Adobe Experience Platform Query Service such as anonymous block and snapshots. This design pattern increases processing efficiency as any data already processed from the source is skipped. It can be used with both streaming and batch data processing.
This document provides a series of instructions to build a design pattern for incremental processing. These steps can be used as a template to create your own incremental data load queries.
Introduzione
The SQL examples throughout this document require you to have an understanding of the anonymous block and snapshot capabilities. It is recommended that you read the sample anonymous block queries documentation and also the snapshot clause documentation.
For guidance on any terminology used within this guide, refer to the SQL syntax guide.
Incrementally load data
The steps below demonstrate how to create and incrementally load data using snapshots and the anonymous block feature. The design pattern can be used as a template for your own sequence of queries.
-
Create a
checkpoint_logtable to track the most recent snapshot used to process data successfully. The tracking table (checkpoint_login this example) must first be initialized tonullin order to incrementally process a dataset.code language-sql DROP TABLE IF EXISTS checkpoint_log; CREATE TABLE checkpoint_log AS SELECT cast(NULL AS string) process_name, cast(NULL AS string) process_status, cast(NULL AS string) last_snapshot_id, cast(NULL AS TIMESTAMP) process_timestamp WHERE false; -
Populate the
checkpoint_logtable with one empty record for the dataset that needs incremental processing.DIM_TABLE_ABCis the dataset to be processed in the example below. On the first occasion of processingDIM_TABLE_ABC, thelast_snapshot_idis initialized asnull. This allows you to process the entire dataset on the first occasion and incrementally thereafter.code language-sql INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast(NULL AS string) last_snapshot_id, CURRENT_TIMESTAMP process_timestamp; -
Next, initialize
DIM_TABLE_ABC_Incrementalto contain processed output fromDIM_TABLE_ABC. The anonymous block in the required execution section of the SQL example below, as described in steps one to four, is executed sequentially to process data incrementally.- Set the
from_snapshot_idwhich indicates where the processing starts from. Viene eseguita una query perfrom_snapshot_idnell'esempio dalla tabellacheckpoint_logper l'utilizzo conDIM_TABLE_ABC. All'esecuzione iniziale, l'ID snapshot sarànull, il che significa che l'intero set di dati verrà elaborato. - Imposta
to_snapshot_idcome ID snapshot corrente della tabella di origine (DIM_TABLE_ABC). Nell’esempio, questa viene eseguita dalla tabella dei metadati della tabella di origine. - Utilizzare la parola chiave
CREATEper creareDIM_TABLE_ABC_Incremenalcome tabella di destinazione. La tabella di destinazione mantiene i dati elaborati dal set di dati di origine (DIM_TABLE_ABC). Ciò consente di aggiungere in modo incrementale alla tabella di destinazione i dati elaborati dalla tabella di origine trafrom_snapshot_ideto_snapshot_id. - Aggiornare la tabella
checkpoint_logconto_snapshot_idper i dati di origine elaborati correttamente daDIM_TABLE_ABC. - Se una delle query eseguite in sequenza del blocco anonimo non riesce, viene eseguita la sezione dell'eccezione optional. Questo restituisce un errore e termina il processo.
note note NOTE history_meta('source table name')è un metodo pratico utilizzato per ottenere l'accesso allo snapshot disponibile in un set di dati.code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; CREATE TABLE DIM_TABLE_ABC_Incremental AS SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id ; INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHERS THEN SELECT 'ERROR'; END $$; - Set the
-
Utilizza la logica di caricamento dati incrementale nell’esempio di blocco anonimo seguente per consentire a tutti i nuovi dati dal set di dati di origine (dal timestamp più recente) di essere elaborati e aggiunti alla tabella di destinazione a una cadenza regolare. Nell'esempio, le modifiche dei dati a
DIM_TABLE_ABCverranno elaborate e aggiunte aDIM_TABLE_ABC_incremental.note note NOTE _IDè la chiave primaria inDIM_TABLE_ABC_IncrementaleSELECT history_meta('DIM_TABLE_ABC').code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a join (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; INSERT INTO DIM_TABLE_ABC_Incremental SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id); INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHERS THEN SELECT 'ERROR'; END $$;
Questa logica può essere applicata a qualsiasi tabella per eseguire caricamenti incrementali.
Snapshot scaduti
Per risolvere il problema di un ID di snapshot scaduto, inserisci il seguente comando all’inizio del blocco anonimo. La seguente riga di codice sostituisce @from_snapshot_id con il primo snapshot_id disponibile dai metadati.
SET resolve_fallback_snapshot_on_failure=true;
L’aspetto dell’intero blocco di codice è il seguente:
$$ BEGIN
SET resolve_fallback_snapshot_on_failure=true;
SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
(SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
on a.process_timestamp=b.process_timestamp;
SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true;
SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
INSERT INTO DIM_TABLE_ABC_Incremental
SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
Insert Into
checkpoint_log
SELECT
'DIM_TABLE_ABC' process_name,
'SUCCESSFUL' process_status,
cast( @to_snapshot_id AS string) last_snapshot_id,
cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
WHEN OTHERS THEN
SELECT 'ERROR';
END
$$;
Passaggi successivi
Una volta letto questo documento, sarai in grado di comprendere meglio come utilizzare le funzioni di blocco e istantanea anonime per eseguire caricamenti incrementali e di applicare questa logica alle tue query specifiche. Per informazioni generali sull'esecuzione delle query, leggere la guida sull'esecuzione delle query in Query Service.