Incrementele belasting in Query-service
Het patroon van het stijgende ladingsontwerp is een oplossing voor het beheren van gegevens. Het patroon verwerkt slechts informatie in de dataset die sinds de laatste ladingsuitvoering is gecreeerd of gewijzigd.
De stijgende lading gebruikt diverse eigenschappen die door de Dienst van de Vraag van Adobe Experience Platform zoals anonieme blok en momentopnamen worden verstrekt. Dit ontwerppatroon verhoogt de verwerkingsefficiëntie omdat alle gegevens die al uit de bron zijn verwerkt, worden overgeslagen. Deze kan zowel bij streaming- als batchgegevensverwerking worden gebruikt.
Dit document bevat een aantal instructies voor het maken van een ontwerppatroon voor incrementele verwerking. Deze stappen kunnen als malplaatje worden gebruikt om uw eigen stijgende vragen van de gegevenslading tot stand te brengen.
Aan de slag
De SQL-voorbeelden in dit document vereisen dat u een goed inzicht hebt in de anonieme mogelijkheden voor blokken en momentopnamen. Het wordt geadviseerd dat u de steekproef anonieme blokvragendocumentatie en ook de momentopnameclausuledocumentatie leest.
Voor begeleiding op om het even welke die terminologie binnen deze gids wordt gebruikt, verwijs naar de SQL syntaxisgids.
Gegevens stapsgewijs laden
In de onderstaande stappen wordt getoond hoe u gegevens kunt maken en incrementeel kunt laden met behulp van momentopnamen en de anonieme blokfunctie. Het ontwerppatroon kan als malplaatje voor uw eigen opeenvolging van vragen worden gebruikt.
-
Maak een
checkpoint_log
-tabel waarin de meest recente opname wordt bijgehouden die is gebruikt om gegevens te verwerken. De volgende lijst (checkpoint_log
in dit voorbeeld) moet eerst aannull
worden geïnitialiseerd om een dataset stapsgewijs te verwerken.code language-sql DROP TABLE IF EXISTS checkpoint_log; CREATE TABLE checkpoint_log AS SELECT cast(NULL AS string) process_name, cast(NULL AS string) process_status, cast(NULL AS string) last_snapshot_id, cast(NULL AS TIMESTAMP) process_timestamp WHERE false;
-
Vul de
checkpoint_log
lijst met één leeg verslag voor de dataset die stijgende verwerking vereist.DIM_TABLE_ABC
is de gegevensset die in het onderstaande voorbeeld moet worden verwerkt. Bij de eerste keer datDIM_TABLE_ABC
wordt verwerkt, wordtlast_snapshot_id
geïnitialiseerd alsnull
. Dit staat u toe om de volledige dataset bij de eerste gelegenheid en incrementeel daarna te verwerken.code language-sql INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast(NULL AS string) last_snapshot_id, CURRENT_TIMESTAMP process_timestamp;
-
Start vervolgens
DIM_TABLE_ABC_Incremental
om verwerkte uitvoer vanDIM_TABLE_ABC
te bevatten. Het anonieme blok in de vereiste uitvoeringssectie van het SQL hieronder voorbeeld, zoals die in stap één tot vier wordt beschreven, wordt opeenvolgend uitgevoerd om gegevens incrementeel te verwerken.- Stel de
from_snapshot_id
in die aangeeft vanaf welk punt de verwerking begint. Defrom_snapshot_id
in het voorbeeld wordt vanuit decheckpoint_log
-tabel opgevraagd voor gebruik metDIM_TABLE_ABC
. Bij de eerste uitvoering zal de opname-idnull
zijn, wat betekent dat de volledige gegevensset wordt verwerkt. - Stel de
to_snapshot_id
in als de huidige opname-id van de brontabel (DIM_TABLE_ABC
). In het voorbeeld wordt dit gevraagd vanuit de metagegevenstabel van de brontabel. - Gebruik het trefwoord
CREATE
omDIM_TABLE_ABC_Incremenal
te maken als de doeltabel. De bestemmingslijst voortduurt de verwerkte gegevens van de brondataset (DIM_TABLE_ABC
). Hierdoor kunnen de verwerkte gegevens van de brontabel tussenfrom_snapshot_id
ento_snapshot_id
incrementeel aan de doeltabel worden toegevoegd. - Werk de
checkpoint_log
tabel bij met deto_snapshot_id
voor de brongegevens dieDIM_TABLE_ABC
correct heeft verwerkt. - Als om het even welke opeenvolgend uitgevoerde vragen van het anonieme blok ontbreken, wordt de facultatieve uitzonderingssectie uitgevoerd. Dit retourneert een fout en beëindigt het proces.
note note NOTE history_meta('source table name')
is een geschikte methode die wordt gebruikt om toegang tot beschikbare momentopname in een dataset te krijgen.code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; CREATE TABLE DIM_TABLE_ABC_Incremental AS SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id ; INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$;
- Stel de
-
Gebruik de stijgende logica van de gegevenslading in het anonieme blokvoorbeeld hieronder om het even welke nieuwe gegevens van de brondataset (sinds meest recente timestamp) toe te staan om aan de bestemmingstabel bij een regelmatige kadentie worden verwerkt en worden toegevoegd. In het voorbeeld worden gegevenswijzigingen in
DIM_TABLE_ABC
verwerkt en toegevoegd aanDIM_TABLE_ABC_incremental
.note note NOTE _ID
is de primaire sleutel in zowelDIM_TABLE_ABC_Incremental
alsSELECT history_meta('DIM_TABLE_ABC')
.code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a join (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; INSERT INTO DIM_TABLE_ABC_Incremental SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id); INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$;
Deze logica kan op om het even welke lijst worden toegepast om stijgende lasten uit te voeren.
Verlopen momentopnamen
Om de kwestie van een verlopen momentopname identiteitskaart op te lossen, neem het volgende bevel aan het begin van het anonieme blok op. De volgende coderegel overschrijft de @from_snapshot_id
met de oudste beschikbare snapshot_id
op basis van metagegevens.
SET resolve_fallback_snapshot_on_failure=true;
Het volledige codeblok ziet er als volgt uit:
$$ BEGIN
SET resolve_fallback_snapshot_on_failure=true;
SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
(SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
on a.process_timestamp=b.process_timestamp;
SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true;
SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
INSERT INTO DIM_TABLE_ABC_Incremental
SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
Insert Into
checkpoint_log
SELECT
'DIM_TABLE_ABC' process_name,
'SUCCESSFUL' process_status,
cast( @to_snapshot_id AS string) last_snapshot_id,
cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
WHEN OTHER THEN
SELECT 'ERROR';
END
$$;
Volgende stappen
Door dit document te lezen, zou u een beter inzicht in moeten hebben hoe te om anonieme blok en momentopnamefuncties te gebruiken om stijgende ladingen uit te voeren en kan deze logica op uw eigen specifieke vragen toepassen. Voor algemene begeleiding bij vraaguitvoering, te lezen gelieve de gids op vraaguitvoering in de Dienst van de Vraag.