Inkrementell inläsning i frågetjänsten
Designmönstret för inkrementell belastning är en lösning för datahantering. Mönstret bearbetar bara information i datauppsättningen som har skapats eller ändrats sedan den senaste inläsningen.
Inkrementell belastning använder olika funktioner i Adobe Experience Platform Query Service, till exempel anonyma block och ögonblicksbilder. Det här designmönstret ökar bearbetningseffektiviteten eftersom data som redan har bearbetats från källan hoppas över. Den kan användas med både direktuppspelning och batchdatabearbetning.
Det här dokumentet innehåller en serie instruktioner om hur du skapar ett designmönster för stegvis bearbetning. Dessa steg kan användas som mall för att skapa egna inkrementella datainläsningsfrågor.
Komma igång
SQL-exemplen i det här dokumentet kräver att du har en förståelse för de anonyma funktionerna för block och ögonblicksbilder. Vi rekommenderar att du läser dokumentationen för exempel på anonyma blockfrågor och även dokumentationen för snapshot-satsen.
Om du vill ha vägledning om terminologi som används i den här handboken läser du i SQL-syntaxguiden.
Läs in data inkrementellt
Stegen nedan visar hur du skapar och läser in data stegvis med hjälp av ögonblicksbilder och den anonyma blockfunktionen. Designmönstret kan användas som en mall för en egen frågesekvens.
-
Skapa en
checkpoint_log
-tabell för att spåra den senaste ögonblicksbilden som användes för att bearbeta data. Spårningstabellen (checkpoint_log
i det här exemplet) måste först initieras tillnull
för att en datauppsättning ska kunna bearbetas stegvis.code language-sql DROP TABLE IF EXISTS checkpoint_log; CREATE TABLE checkpoint_log AS SELECT cast(NULL AS string) process_name, cast(NULL AS string) process_status, cast(NULL AS string) last_snapshot_id, cast(NULL AS TIMESTAMP) process_timestamp WHERE false;
-
Fyll i tabellen
checkpoint_log
med en tom post för den datauppsättning som behöver inkrementell bearbetning.DIM_TABLE_ABC
är den datauppsättning som ska bearbetas i exemplet nedan. Första gångenDIM_TABLE_ABC
bearbetas initieraslast_snapshot_id
somnull
. Detta gör att du kan bearbeta hela datauppsättningen vid första tillfället och stegvis efter detta.code language-sql INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast(NULL AS string) last_snapshot_id, CURRENT_TIMESTAMP process_timestamp;
-
Initiera sedan
DIM_TABLE_ABC_Incremental
så att den innehåller bearbetade utdata frånDIM_TABLE_ABC
. Det anonyma blocket i körningsavsnittet required i SQL-exemplet nedan, som beskrivs i steg 1 till 4, körs sekventiellt för att bearbeta data stegvis.- Ange
from_snapshot_id
som anger var bearbetningen börjar.from_snapshot_id
i exemplet frågas från tabellencheckpoint_log
för användning medDIM_TABLE_ABC
. Vid den första körningen blir ögonblicksbilds-IDnull
, vilket innebär att hela datauppsättningen kommer att bearbetas. - Ange
to_snapshot_id
som aktuellt ID för ögonblicksbild för källtabellen (DIM_TABLE_ABC
). I exemplet hämtas detta från källtabellens metadatatabell. - Använd nyckelordet
CREATE
för att skapaDIM_TABLE_ABC_Incremenal
som måltabell. Måltabellen innehåller bearbetade data från källdatauppsättningen (DIM_TABLE_ABC
). Detta gör att bearbetade data från källtabellen mellanfrom_snapshot_id
ochto_snapshot_id
kan läggas till stegvis i måltabellen. - Uppdatera tabellen
checkpoint_log
medto_snapshot_id
för de källdata somDIM_TABLE_ABC
har bearbetat. - Om någon av de sekventiellt körda frågorna i det anonyma blocket misslyckas körs valfria-avsnittet. Detta returnerar ett fel och avslutar processen.
note note NOTE history_meta('source table name')
är en praktisk metod som används för att få åtkomst till tillgängliga ögonblicksbilder i en datauppsättning.code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; CREATE TABLE DIM_TABLE_ABC_Incremental AS SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id ; INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$;
- Ange
-
Använd logiken för inkrementell datainläsning i exemplet med anonyma block nedan för att tillåta att nya data från källdatauppsättningen (sedan den senaste tidsstämpeln) bearbetas och läggs till i måltabellen vid en vanlig gräns. I exemplet kommer dataändringar i
DIM_TABLE_ABC
att bearbetas och läggas till iDIM_TABLE_ABC_incremental
.note note NOTE _ID
är primärnyckeln i bådeDIM_TABLE_ABC_Incremental
ochSELECT history_meta('DIM_TABLE_ABC')
.code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a join (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; INSERT INTO DIM_TABLE_ABC_Incremental SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id); INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$;
Denna logik kan tillämpas på alla tabeller för att utföra inkrementella inläsningar.
Utgångna ögonblicksbilder
För att lösa problemet med att ett ögonblicksbild-ID har gått ut infogar du följande kommando i början av det anonyma blocket. Följande kodrad åsidosätter @from_snapshot_id
med den tidigaste tillgängliga snapshot_id
från metadata.
SET resolve_fallback_snapshot_on_failure=true;
Hela kodblocket ser ut så här:
$$ BEGIN
SET resolve_fallback_snapshot_on_failure=true;
SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
(SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
on a.process_timestamp=b.process_timestamp;
SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true;
SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
INSERT INTO DIM_TABLE_ABC_Incremental
SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
Insert Into
checkpoint_log
SELECT
'DIM_TABLE_ABC' process_name,
'SUCCESSFUL' process_status,
cast( @to_snapshot_id AS string) last_snapshot_id,
cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
WHEN OTHER THEN
SELECT 'ERROR';
END
$$;
Nästa steg
Genom att läsa det här dokumentet bör du få en bättre förståelse för hur du använder anonyma funktioner för block och ögonblicksbilder för att utföra inkrementella inläsningar och kan använda den här logiken för dina egna specifika frågor. Allmän vägledning om frågekörning finns i guiden om frågekörning i frågetjänsten.