Incremental load in Query Service
The incremental load design pattern is a solution for managing data. The pattern only processes information in the dataset that has been created or modified since the last load execution.
Incremental load uses various features provided by Adobe Experience Platform Query Service such as anonymous block and snapshots. This design pattern increases processing efficiency as any data already processed from the source is skipped. It can be used with both streaming and batch data processing.
This document provides a series of instructions to build a design pattern for incremental processing. These steps can be used as a template to create your own incremental data load queries.
Introducción
The SQL examples throughout this document require you to have an understanding of the anonymous block and snapshot capabilities. It is recommended that you read the sample anonymous block queries documentation and also the snapshot clause documentation.
For guidance on any terminology used within this guide, refer to the SQL syntax guide.
Incrementally load data
The steps below demonstrate how to create and incrementally load data using snapshots and the anonymous block feature. The design pattern can be used as a template for your own sequence of queries.
-
Create a
checkpoint_logtable to track the most recent snapshot used to process data successfully. The tracking table (checkpoint_login this example) must first be initialized tonullin order to incrementally process a dataset.code language-sql DROP TABLE IF EXISTS checkpoint_log; CREATE TABLE checkpoint_log AS SELECT cast(NULL AS string) process_name, cast(NULL AS string) process_status, cast(NULL AS string) last_snapshot_id, cast(NULL AS TIMESTAMP) process_timestamp WHERE false; -
Populate the
checkpoint_logtable with one empty record for the dataset that needs incremental processing.DIM_TABLE_ABCis the dataset to be processed in the example below. On the first occasion of processingDIM_TABLE_ABC, thelast_snapshot_idis initialized asnull. This allows you to process the entire dataset on the first occasion and incrementally thereafter.code language-sql INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast(NULL AS string) last_snapshot_id, CURRENT_TIMESTAMP process_timestamp; -
Next, initialize
DIM_TABLE_ABC_Incrementalto contain processed output fromDIM_TABLE_ABC. The anonymous block in the required execution section of the SQL example below, as described in steps one to four, is executed sequentially to process data incrementally.- Set the
from_snapshot_idwhich indicates where the processing starts from. Se ha consultado el elementofrom_snapshot_iddel ejemplo en la tablacheckpoint_logpara su uso conDIM_TABLE_ABC. En la primera ejecución, el ID de instantánea seránull, lo que significa que se procesará todo el conjunto de datos. - Establezca
to_snapshot_idcomo el identificador de instantánea actual de la tabla de origen (DIM_TABLE_ABC). En el ejemplo, esto se consulta desde la tabla de metadatos de la tabla de origen. - Use la palabra clave
CREATEpara crearDIM_TABLE_ABC_Incremenalcomo tabla de destino. La tabla de destino conserva los datos procesados del conjunto de datos de origen (DIM_TABLE_ABC). Esto permite que los datos procesados de la tabla de origen entrefrom_snapshot_idyto_snapshot_idse anexen gradualmente a la tabla de destino. - Actualice la tabla
checkpoint_logconto_snapshot_idpara los datos de origen queDIM_TABLE_ABCprocesó correctamente. - Si falla cualquiera de las consultas ejecutadas secuencialmente del bloque anónimo, se ejecuta la sección de excepciones optional. Esto devuelve un error y finaliza el proceso.
note note NOTE history_meta('source table name')es un método cómodo que se usa para obtener acceso a las instantáneas disponibles en un conjunto de datos.code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; CREATE TABLE DIM_TABLE_ABC_Incremental AS SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id ; INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHERS THEN SELECT 'ERROR'; END $$; - Set the
-
Utilice la lógica de carga de datos incremental en el ejemplo de bloque anónimo siguiente para permitir que cualquier dato nuevo del conjunto de datos de origen (desde la marca de tiempo más reciente) se procese y se anexe a la tabla de destino a una cadencia regular. En el ejemplo, los cambios de datos en
DIM_TABLE_ABCse procesarán y se anexarán aDIM_TABLE_ABC_incremental.note note NOTE _IDes la clave principal enDIM_TABLE_ABC_IncrementalySELECT history_meta('DIM_TABLE_ABC').code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a join (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; INSERT INTO DIM_TABLE_ABC_Incremental SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id); INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHERS THEN SELECT 'ERROR'; END $$;
Esta lógica se puede aplicar a cualquier tabla para realizar cargas incrementales.
Instantáneas caducadas
Para resolver el problema de un ID de instantánea caducado, inserte el siguiente comando al principio del bloque anónimo. La siguiente línea de código anula @from_snapshot_id con los primeros snapshot_id disponibles de los metadatos.
SET resolve_fallback_snapshot_on_failure=true;
Todo el bloque de código tiene el siguiente aspecto:
$$ BEGIN
SET resolve_fallback_snapshot_on_failure=true;
SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
(SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
on a.process_timestamp=b.process_timestamp;
SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true;
SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
INSERT INTO DIM_TABLE_ABC_Incremental
SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
Insert Into
checkpoint_log
SELECT
'DIM_TABLE_ABC' process_name,
'SUCCESSFUL' process_status,
cast( @to_snapshot_id AS string) last_snapshot_id,
cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
WHEN OTHERS THEN
SELECT 'ERROR';
END
$$;
Próximos pasos
Al leer este documento, debería comprender mejor cómo utilizar las funciones de bloqueo anónimo e instantáneas para realizar cargas incrementales y puede aplicar esta lógica a sus propias consultas específicas. Para obtener instrucciones generales sobre la ejecución de consultas, lea la guía sobre la ejecución de consultas en el servicio de consultas.