Incremental load in Query Service

The incremental load design pattern is a solution for managing data. The pattern only processes information in the dataset that has been created or modified since the last load execution.

Incremental load uses various features provided by Adobe Experience Platform Query Service such as anonymous block and snapshots. This design pattern increases processing efficiency as any data already processed from the source is skipped. It can be used with both streaming and batch data processing.

This document provides a series of instructions to build a design pattern for incremental processing. These steps can be used as a template to create your own incremental data load queries.

Introduzione

The SQL examples throughout this document require you to have an understanding of the anonymous block and snapshot capabilities. It is recommended that you read the sample anonymous block queries documentation and also the snapshot clause documentation.

For guidance on any terminology used within this guide, refer to the SQL syntax guide.

Incrementally load data

The steps below demonstrate how to create and incrementally load data using snapshots and the anonymous block feature. The design pattern can be used as a template for your own sequence of queries.

  1. Create a checkpoint_log table to track the most recent snapshot used to process data successfully. The tracking table (checkpoint_log in this example) must first be initialized to null in order to incrementally process a dataset.

    code language-sql
    DROP TABLE IF EXISTS checkpoint_log;
    CREATE TABLE  checkpoint_log AS
    SELECT
       cast(NULL AS string) process_name,
       cast(NULL AS string) process_status,
       cast(NULL AS string) last_snapshot_id,
       cast(NULL AS TIMESTAMP) process_timestamp
       WHERE false;
    
  2. Populate the checkpoint_log table with one empty record for the dataset that needs incremental processing. DIM_TABLE_ABC is the dataset to be processed in the example below. On the first occasion of processing DIM_TABLE_ABC, the last_snapshot_id is initialized as null. This allows you to process the entire dataset on the first occasion and incrementally thereafter.

    code language-sql
    INSERT INTO
       checkpoint_log
       SELECT
          'DIM_TABLE_ABC' process_name,
          'SUCCESSFUL' process_status,
          cast(NULL AS string) last_snapshot_id,
          CURRENT_TIMESTAMP process_timestamp;
    
  3. Next, initialize DIM_TABLE_ABC_Incremental to contain processed output from DIM_TABLE_ABC. The anonymous block in the required execution section of the SQL example below, as described in steps one to four, is executed sequentially to process data incrementally.

    1. Set the from_snapshot_id which indicates where the processing starts from. Viene eseguita una query per from_snapshot_id nell'esempio dalla tabella checkpoint_log per l'utilizzo con DIM_TABLE_ABC. All'esecuzione iniziale, l'ID snapshot sarà null, il che significa che l'intero set di dati verrà elaborato.
    2. Imposta to_snapshot_id come ID snapshot corrente della tabella di origine (DIM_TABLE_ABC). Nell’esempio, questa viene eseguita dalla tabella dei metadati della tabella di origine.
    3. Utilizzare la parola chiave CREATE per creare DIM_TABLE_ABC_Incremenal come tabella di destinazione. La tabella di destinazione mantiene i dati elaborati dal set di dati di origine (DIM_TABLE_ABC). Ciò consente di aggiungere in modo incrementale alla tabella di destinazione i dati elaborati dalla tabella di origine tra from_snapshot_id e to_snapshot_id.
    4. Aggiornare la tabella checkpoint_log con to_snapshot_id per i dati di origine elaborati correttamente da DIM_TABLE_ABC.
    5. Se una delle query eseguite in sequenza del blocco anonimo non riesce, viene eseguita la sezione dell'eccezione optional. Questo restituisce un errore e termina il processo.
    note note
    NOTE
    history_meta('source table name') è un metodo pratico utilizzato per ottenere l'accesso allo snapshot disponibile in un set di dati.
    code language-sql
    $$ BEGIN
        SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
                                (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
                                    WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
                                    ON a.process_timestamp=b.process_timestamp;
        SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE  is_current = true;
        SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
        CREATE TABLE DIM_TABLE_ABC_Incremental AS
         SELECT  *  FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id ;
    
    INSERT INTO
       checkpoint_log
       SELECT
           'DIM_TABLE_ABC' process_name,
           'SUCCESSFUL' process_status,
          cast( @to_snapshot_id AS string) last_snapshot_id,
          cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
    
    EXCEPTION
      WHEN OTHERS THEN
        SELECT 'ERROR';
    END
    $$;
    
  4. Utilizza la logica di caricamento dati incrementale nell’esempio di blocco anonimo seguente per consentire a tutti i nuovi dati dal set di dati di origine (dal timestamp più recente) di essere elaborati e aggiunti alla tabella di destinazione a una cadenza regolare. Nell'esempio, le modifiche dei dati a DIM_TABLE_ABC verranno elaborate e aggiunte a DIM_TABLE_ABC_incremental.

    note note
    NOTE
    _ID è la chiave primaria in DIM_TABLE_ABC_Incremental e SELECT history_meta('DIM_TABLE_ABC').
    code language-sql
    $$ BEGIN
        SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a join
                                (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
                                    WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
                                    ON a.process_timestamp=b.process_timestamp;
        SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE  is_current = true;
        SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
        INSERT INTO DIM_TABLE_ABC_Incremental
         SELECT  *  FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
    
    INSERT INTO
       checkpoint_log
       SELECT
           'DIM_TABLE_ABC' process_name,
           'SUCCESSFUL' process_status,
          cast( @to_snapshot_id AS string) last_snapshot_id,
          cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
    
    EXCEPTION
      WHEN OTHERS THEN
        SELECT 'ERROR';
    END
    $$;
    

Questa logica può essere applicata a qualsiasi tabella per eseguire caricamenti incrementali.

Snapshot scaduti

Per risolvere il problema di un ID di snapshot scaduto, inserisci il seguente comando all’inizio del blocco anonimo. La seguente riga di codice sostituisce @from_snapshot_id con il primo snapshot_id disponibile dai metadati.

SET resolve_fallback_snapshot_on_failure=true;

L’aspetto dell’intero blocco di codice è il seguente:

$$ BEGIN
    SET resolve_fallback_snapshot_on_failure=true;
    SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
                            (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
                                WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
                                on a.process_timestamp=b.process_timestamp;
    SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE  is_current = true;
    SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
    INSERT INTO DIM_TABLE_ABC_Incremental
     SELECT  *  FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);

Insert Into
   checkpoint_log
   SELECT
       'DIM_TABLE_ABC' process_name,
       'SUCCESSFUL' process_status,
      cast( @to_snapshot_id AS string) last_snapshot_id,
      cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
  WHEN OTHERS THEN
    SELECT 'ERROR';
END
$$;

Passaggi successivi

Una volta letto questo documento, sarai in grado di comprendere meglio come utilizzare le funzioni di blocco e istantanea anonime per eseguire caricamenti incrementali e di applicare questa logica alle tue query specifiche. Per informazioni generali sull'esecuzione delle query, leggere la guida sull'esecuzione delle query in Query Service.

recommendation-more-help
ccf2b369-4031-483f-af63-a93b5ae5e3fb