Inkrementell inläsning i frågetjänsten
Designmönstret för inkrementell belastning är en lösning för datahantering. Mönstret bearbetar bara information i datauppsättningen som har skapats eller ändrats sedan den senaste inläsningen.
Inkrementell belastning använder olika funktioner i Adobe Experience Platform Query Service, till exempel anonyma block och ögonblicksbilder. Det här designmönstret ökar bearbetningseffektiviteten eftersom data som redan har bearbetats från källan hoppas över. Den kan användas med både direktuppspelning och batchdatabearbetning.
Det här dokumentet innehåller en serie instruktioner om hur du skapar ett designmönster för stegvis bearbetning. Dessa steg kan användas som mall för att skapa egna inkrementella datainläsningsfrågor.
Komma igång
SQL-exemplen i det här dokumentet kräver att du har en förståelse för de anonyma funktionerna för block och ögonblicksbilder. Vi rekommenderar att du läser dokumentationen för exempel på anonyma blockfrågor och även dokumentationen för snapshot-satsen.
Om du vill ha vägledning om terminologi som används i den här handboken läser du i SQL-syntaxguiden.
Läs in data inkrementellt
Stegen nedan visar hur du skapar och läser in data stegvis med hjälp av ögonblicksbilder och den anonyma blockfunktionen. Designmönstret kan användas som en mall för en egen frågesekvens.
- 
                  
Skapa en
checkpoint_log-tabell för att spåra den senaste ögonblicksbilden som användes för att bearbeta data. Spårningstabellen (checkpoint_logi det här exemplet) måste först initieras tillnullför att en datauppsättning ska kunna bearbetas stegvis.code language-sql DROP TABLE IF EXISTS checkpoint_log; CREATE TABLE checkpoint_log AS SELECT cast(NULL AS string) process_name, cast(NULL AS string) process_status, cast(NULL AS string) last_snapshot_id, cast(NULL AS TIMESTAMP) process_timestamp WHERE false; - 
                  
Fyll i tabellen
checkpoint_logmed en tom post för den datauppsättning som behöver inkrementell bearbetning.DIM_TABLE_ABCär den datauppsättning som ska bearbetas i exemplet nedan. Första gångenDIM_TABLE_ABCbearbetas initieraslast_snapshot_idsomnull. Detta gör att du kan bearbeta hela datauppsättningen vid första tillfället och stegvis efter detta.code language-sql INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast(NULL AS string) last_snapshot_id, CURRENT_TIMESTAMP process_timestamp; - 
                  
Initiera sedan
DIM_TABLE_ABC_Incrementalså att den innehåller bearbetade utdata frånDIM_TABLE_ABC. Det anonyma blocket i körningsavsnittet required i SQL-exemplet nedan, som beskrivs i steg 1 till 4, körs sekventiellt för att bearbeta data stegvis.- Ange 
from_snapshot_idsom anger var bearbetningen börjar.from_snapshot_idi exemplet frågas från tabellencheckpoint_logför användning medDIM_TABLE_ABC. Vid den första körningen blir ögonblicksbilds-IDnull, vilket innebär att hela datauppsättningen kommer att bearbetas. - Ange 
to_snapshot_idsom aktuellt ID för ögonblicksbild för källtabellen (DIM_TABLE_ABC). I exemplet hämtas detta från källtabellens metadatatabell. - Använd nyckelordet 
CREATEför att skapaDIM_TABLE_ABC_Incremenalsom måltabell. Måltabellen innehåller bearbetade data från källdatauppsättningen (DIM_TABLE_ABC). Detta gör att bearbetade data från källtabellen mellanfrom_snapshot_idochto_snapshot_idkan läggas till stegvis i måltabellen. - Uppdatera tabellen 
checkpoint_logmedto_snapshot_idför de källdata somDIM_TABLE_ABChar bearbetat. - Om någon av de sekventiellt körda frågorna i det anonyma blocket misslyckas körs valfria-avsnittet. Detta returnerar ett fel och avslutar processen.
 
note note NOTE history_meta('source table name')är en praktisk metod som används för att få åtkomst till tillgängliga ögonblicksbilder i en datauppsättning.code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; CREATE TABLE DIM_TABLE_ABC_Incremental AS SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id ; INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$; - Ange 
 - 
                  
Använd logiken för inkrementell datainläsning i exemplet med anonyma block nedan för att tillåta att nya data från källdatauppsättningen (sedan den senaste tidsstämpeln) bearbetas och läggs till i måltabellen vid en vanlig gräns. I exemplet kommer dataändringar i
DIM_TABLE_ABCatt bearbetas och läggas till iDIM_TABLE_ABC_incremental.note note NOTE _IDär primärnyckeln i bådeDIM_TABLE_ABC_IncrementalochSELECT history_meta('DIM_TABLE_ABC').code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a join (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; INSERT INTO DIM_TABLE_ABC_Incremental SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id); INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$; 
Denna logik kan tillämpas på alla tabeller för att utföra inkrementella inläsningar.
Utgångna ögonblicksbilder
För att lösa problemet med att ett ögonblicksbild-ID har gått ut infogar du följande kommando i början av det anonyma blocket. Följande kodrad åsidosätter @from_snapshot_id med den tidigaste tillgängliga snapshot_id från metadata.
SET resolve_fallback_snapshot_on_failure=true;
            Hela kodblocket ser ut så här:
$$ BEGIN
    SET resolve_fallback_snapshot_on_failure=true;
    SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
                            (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
                                WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
                                on a.process_timestamp=b.process_timestamp;
    SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE  is_current = true;
    SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
    INSERT INTO DIM_TABLE_ABC_Incremental
     SELECT  *  FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
Insert Into
   checkpoint_log
   SELECT
       'DIM_TABLE_ABC' process_name,
       'SUCCESSFUL' process_status,
      cast( @to_snapshot_id AS string) last_snapshot_id,
      cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
  WHEN OTHER THEN
    SELECT 'ERROR';
END
$$;
            Nästa steg
Genom att läsa det här dokumentet bör du få en bättre förståelse för hur du använder anonyma funktioner för block och ögonblicksbilder för att utföra inkrementella inläsningar och kan använda den här logiken för dina egna specifika frågor. Allmän vägledning om frågekörning finns i guiden om frågekörning i frågetjänsten.