Carico incrementale in Query Service

Il modello di progettazione del carico incrementale è una soluzione per la gestione dei dati. Il modello elabora solo le informazioni nel set di dati che è stato creato o modificato dopo l’ultima esecuzione del caricamento.

Il caricamento incrementale utilizza varie funzioni fornite da Adobe Experience Platform Query Service, ad esempio blocchi anonimi e snapshot. Questo modello di progettazione aumenta l’efficienza di elaborazione quando tutti i dati già elaborati dall’origine vengono ignorati. Può essere utilizzato sia con l’elaborazione dei dati in streaming che in batch.

Questo documento fornisce una serie di istruzioni per creare un modello di progettazione per l’elaborazione incrementale. Questi passaggi possono essere utilizzati come modello per creare query di caricamento dati incrementali personalizzate.

Introduzione

Gli esempi SQL illustrati in questo documento richiedono una conoscenza approfondita delle funzionalità di blocco anonimo e snapshot. Si consiglia di leggere la documentazione query di blocco anonime di esempio e la documentazione clausola snapshot.

Per informazioni su qualsiasi terminologia utilizzata in questa guida, fare riferimento alla guida alla sintassi SQL.

Caricamento incrementale dei dati

I passaggi seguenti mostrano come creare e caricare i dati in modo incrementale utilizzando gli snapshot e la funzione di blocco anonimo. Il modello struttura può essere utilizzato come modello per la propria sequenza di query.

  1. Creare una tabella checkpoint_log per tenere traccia dello snapshot più recente utilizzato per elaborare correttamente i dati. La tabella di rilevamento (checkpoint_log in questo esempio) deve prima essere inizializzata in null per elaborare in modo incrementale un set di dati.

    code language-sql
    DROP TABLE IF EXISTS checkpoint_log;
    CREATE TABLE  checkpoint_log AS
    SELECT
       cast(NULL AS string) process_name,
       cast(NULL AS string) process_status,
       cast(NULL AS string) last_snapshot_id,
       cast(NULL AS TIMESTAMP) process_timestamp
       WHERE false;
    
  2. Popolare la tabella checkpoint_log con un record vuoto per il set di dati che richiede l'elaborazione incrementale. DIM_TABLE_ABC è il set di dati da elaborare nell'esempio seguente. Alla prima elaborazione di DIM_TABLE_ABC, last_snapshot_id è inizializzato come null. Questo consente di elaborare l’intero set di dati la prima volta e in modo incrementale in seguito.

    code language-sql
    INSERT INTO
       checkpoint_log
       SELECT
          'DIM_TABLE_ABC' process_name,
          'SUCCESSFUL' process_status,
          cast(NULL AS string) last_snapshot_id,
          CURRENT_TIMESTAMP process_timestamp;
    
  3. Inizializzare quindi DIM_TABLE_ABC_Incremental per contenere l'output elaborato da DIM_TABLE_ABC. Il blocco anonimo nella sezione di esecuzione required dell'esempio SQL seguente, come descritto nei passaggi da uno a quattro, viene eseguito in sequenza per elaborare i dati in modo incrementale.

    1. Imposta from_snapshot_id che indica da dove inizia l'elaborazione. Viene eseguita una query per from_snapshot_id nell'esempio dalla tabella checkpoint_log per l'utilizzo con DIM_TABLE_ABC. All'esecuzione iniziale, l'ID snapshot sarà null, il che significa che l'intero set di dati verrà elaborato.
    2. Imposta to_snapshot_id come ID snapshot corrente della tabella di origine (DIM_TABLE_ABC). Nell’esempio, questa viene eseguita dalla tabella dei metadati della tabella di origine.
    3. Utilizzare la parola chiave CREATE per creare DIM_TABLE_ABC_Incremenal come tabella di destinazione. La tabella di destinazione mantiene i dati elaborati dal set di dati di origine (DIM_TABLE_ABC). Ciò consente di aggiungere in modo incrementale alla tabella di destinazione i dati elaborati dalla tabella di origine tra from_snapshot_id e to_snapshot_id.
    4. Aggiornare la tabella checkpoint_log con to_snapshot_id per i dati di origine elaborati correttamente da DIM_TABLE_ABC.
    5. Se una delle query eseguite in sequenza del blocco anonimo non riesce, viene eseguita la sezione dell'eccezione optional. Questo restituisce un errore e termina il processo.
    note note
    NOTE
    history_meta('source table name') è un metodo pratico utilizzato per ottenere l'accesso allo snapshot disponibile in un set di dati.
    code language-sql
    $$ BEGIN
        SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
                                (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
                                    WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
                                    ON a.process_timestamp=b.process_timestamp;
        SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE  is_current = true;
        SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
        CREATE TABLE DIM_TABLE_ABC_Incremental AS
         SELECT  *  FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id ;
    
    INSERT INTO
       checkpoint_log
       SELECT
           'DIM_TABLE_ABC' process_name,
           'SUCCESSFUL' process_status,
          cast( @to_snapshot_id AS string) last_snapshot_id,
          cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
    
    EXCEPTION
      WHEN OTHER THEN
        SELECT 'ERROR';
    END
    $$;
    
  4. Utilizza la logica di caricamento dati incrementale nell’esempio di blocco anonimo seguente per consentire a tutti i nuovi dati dal set di dati di origine (dal timestamp più recente) di essere elaborati e aggiunti alla tabella di destinazione a una cadenza regolare. Nell'esempio, le modifiche dei dati a DIM_TABLE_ABC verranno elaborate e aggiunte a DIM_TABLE_ABC_incremental.

    note note
    NOTE
    _ID è la chiave primaria in DIM_TABLE_ABC_Incremental e SELECT history_meta('DIM_TABLE_ABC').
    code language-sql
    $$ BEGIN
        SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a join
                                (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
                                    WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
                                    ON a.process_timestamp=b.process_timestamp;
        SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE  is_current = true;
        SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
        INSERT INTO DIM_TABLE_ABC_Incremental
         SELECT  *  FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
    
    INSERT INTO
       checkpoint_log
       SELECT
           'DIM_TABLE_ABC' process_name,
           'SUCCESSFUL' process_status,
          cast( @to_snapshot_id AS string) last_snapshot_id,
          cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
    
    EXCEPTION
      WHEN OTHER THEN
        SELECT 'ERROR';
    END
    $$;
    

Questa logica può essere applicata a qualsiasi tabella per eseguire caricamenti incrementali.

Snapshot scaduti

Per risolvere il problema di un ID di snapshot scaduto, inserisci il seguente comando all’inizio del blocco anonimo. La seguente riga di codice sostituisce @from_snapshot_id con il primo snapshot_id disponibile dai metadati.

SET resolve_fallback_snapshot_on_failure=true;

L’aspetto dell’intero blocco di codice è il seguente:

$$ BEGIN
    SET resolve_fallback_snapshot_on_failure=true;
    SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
                            (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
                                WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
                                on a.process_timestamp=b.process_timestamp;
    SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE  is_current = true;
    SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
    INSERT INTO DIM_TABLE_ABC_Incremental
     SELECT  *  FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);

Insert Into
   checkpoint_log
   SELECT
       'DIM_TABLE_ABC' process_name,
       'SUCCESSFUL' process_status,
      cast( @to_snapshot_id AS string) last_snapshot_id,
      cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
  WHEN OTHER THEN
    SELECT 'ERROR';
END
$$;

Passaggi successivi

Una volta letto questo documento, sarai in grado di comprendere meglio come utilizzare le funzioni di blocco e istantanea anonime per eseguire caricamenti incrementali e di applicare questa logica alle tue query specifiche. Per informazioni generali sull'esecuzione delle query, leggere la guida sull'esecuzione delle query in Query Service.

recommendation-more-help
ccf2b369-4031-483f-af63-a93b5ae5e3fb