Inkrementelles Laden im Abfrage-Service
Das Designmuster für inkrementelles Laden ist eine Lösung für die Datenverwaltung. Das Muster verarbeitet nur Informationen im Datensatz, die seit der letzten Ladeausführung erstellt oder geändert wurden.
Inkrementelles Laden verwendet verschiedene Funktionen, die der Adobe Experience Platform-Abfrage-Service bereitstellt, wie anonyme Blöcke und Momentaufnahmen. Dieses Designmuster erhöht die Verarbeitungseffizienz, da alle bereits verarbeiteten Daten aus der Quelle übersprungen werden. Es kann sowohl bei der Streaming- als auch bei der Batch-Datenverarbeitung verwendet werden.
Dieses Dokument enthält eine Reihe von Anweisungen zum Erstellen eines Designmusters für die inkrementelle Verarbeitung. Diese Schritte können als Vorlage für die Erstellung Ihrer eigenen inkrementellen Datenladeabfragen verwendet werden.
Erste Schritte
Die SQL-Beispiele in diesem Dokument erfordern ein Verständnis der Funktionen anonymer Blöcke und der Momentaufnahme. Es wird empfohlen, die Dokumentationen Beispielabfragen für anonyme Blöcke und Momentaufnahme-Klausel zu lesen.
Eine Erklärung zu den in diesem Handbuch verwendeten Begriffen finden Sie im Handbuch zur SQL-Syntax.
Inkrementelles Laden von Daten
Die folgenden Schritte zeigen, wie Sie Daten mithilfe von Momentaufnahmen und der Funktion für anonyme Blöcke erstellen und inkrementell laden können. Das Designmuster kann als Vorlage für Ihre eigene Abfolge von Abfragen verwendet werden.
-
Erstellen Sie eine
checkpoint_log
-Tabelle, um die letzte Momentaufnahme festzuhalten, die zur erfolgreichen Verarbeitung von Daten verwendet wurde. Die Tracking-Tabelle (checkpoint_log
in diesem Beispiel) muss zuerst aufnull
initialisiert werden, um einen Datensatz inkrementell zu verarbeiten.code language-sql DROP TABLE IF EXISTS checkpoint_log; CREATE TABLE checkpoint_log AS SELECT cast(NULL AS string) process_name, cast(NULL AS string) process_status, cast(NULL AS string) last_snapshot_id, cast(NULL AS TIMESTAMP) process_timestamp WHERE false;
-
Füllen Sie die
checkpoint_log
-Tabelle mit einer leeren Eingabe für den Datensatz, wodurch eine inkrementelle Verarbeitung erfordert wird.DIM_TABLE_ABC
ist der Datensatz, der im folgenden Beispiel verarbeitet werden soll. Bei der erstmaligen Verarbeitung vonDIM_TABLE_ABC
istlast_snapshot_id
alsnull
initialisiert. Auf diese Weise können Sie den gesamten Datensatz beim ersten Mal und danach inkrementell verarbeiten.code language-sql INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast(NULL AS string) last_snapshot_id, CURRENT_TIMESTAMP process_timestamp;
-
Als Nächstes initialisieren Sie
DIM_TABLE_ABC_Incremental
, um verarbeitete Ausgaben vonDIM_TABLE_ABC
zu enthalten. Der anonyme Block im erforderlichen Ausführungsabschnitt des folgenden SQL-Beispiels wird, wie in den Schritten 1 bis 4 beschrieben, sequenziell ausgeführt, um Daten inkrementell zu verarbeiten.- Legen Sie die
from_snapshot_id
fest, die angibt, wo die Verarbeitung beginnt. Diefrom_snapshot_id
im Beispiel wird aus dercheckpoint_log
-Tabelle zur Verwendung mitDIM_TABLE_ABC
abgefragt. Beim ersten Ausführen ist die Momentaufnahme-IDnull
, was bedeutet, dass der gesamte Datensatz verarbeitet wird. - Legen Sie die
to_snapshot_id
als aktuelle Momentaufnahme-ID der Quellentabelle fest (DIM_TABLE_ABC
). Im Beispiel wird dies aus der Metadatentabelle der Quelltabelle abgefragt. - Verwenden Sie das
CREATE
-Keyword, umDIM_TABLE_ABC_Incremenal
als Zieltabelle zu erstellen. Die Zieltabelle bewahrt die verarbeiteten Daten aus dem Quelldatensatz auf (DIM_TABLE_ABC
). Dadurch können die verarbeiteten Daten aus der Quelltabelle zwischenfrom_snapshot_id
undto_snapshot_id
inkrementell an die Zieltabelle angehängt werden. - Aktualisieren Sie die
checkpoint_log
-Tabelle mit derto_snapshot_id
für die Quelldaten, dieDIM_TABLE_ABC
erfolgreich verarbeitet hat. - Wenn eine der sequenziell ausgeführten Abfragen des anonymen Blocks fehlschlägt, wird der optionale Ausnahmeabschnitt ausgeführt. Dadurch wird ein Fehler zurückgegeben und der Prozess beendet.
note note NOTE history_meta('source table name')
ist eine praktische Methode, um auf verfügbare Momentaufnahmen in einem Datensatz zuzugreifen.code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; CREATE TABLE DIM_TABLE_ABC_Incremental AS SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id ; INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$;
- Legen Sie die
-
Verwenden Sie die inkrementelle Datenladelogik im folgenden Beispiel für anonyme Blöcke, um zu ermöglichen, dass neue Daten (seit dem letzten Zeitstempel) aus dem Quelldatensatz verarbeitet und regelmäßig an die Zieltabelle angehängt werden. Im Beispiel werden Datenänderungen an
DIM_TABLE_ABC
verarbeitet und anDIM_TABLE_ABC_incremental
angehängt.note note NOTE _ID
ist der Primärschlüssel sowohl inDIM_TABLE_ABC_Incremental
als auch inSELECT history_meta('DIM_TABLE_ABC')
.code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a join (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; INSERT INTO DIM_TABLE_ABC_Incremental SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id); INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$;
Diese Logik kann auf jede Tabelle angewendet werden, um inkrementelles Laden durchzuführen.
Abgelaufene Momentaufnahmen
Um das Problem einer abgelaufenen Momentaufnahme-ID zu beheben, fügen Sie den folgenden Befehl am Anfang des anonymen Blocks ein. Die folgende Codezeile überschreibt die @from_snapshot_id
mit der frühesten verfügbaren snapshot_id
aus Metadaten.
SET resolve_fallback_snapshot_on_failure=true;
Der gesamte Codeblock sieht wie folgt aus:
$$ BEGIN
SET resolve_fallback_snapshot_on_failure=true;
SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
(SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
on a.process_timestamp=b.process_timestamp;
SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true;
SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
INSERT INTO DIM_TABLE_ABC_Incremental
SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
Insert Into
checkpoint_log
SELECT
'DIM_TABLE_ABC' process_name,
'SUCCESSFUL' process_status,
cast( @to_snapshot_id AS string) last_snapshot_id,
cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
WHEN OTHER THEN
SELECT 'ERROR';
END
$$;
Nächste Schritte
Durch dieses Dokument sollten Sie besser verstehen, wie Sie Funktionen für anonyme Blöcke und Momentaufnahmen verwenden können, um inkrementelles Laden durchzuführen, und diese Logik auf Ihre eigenen spezifischen Abfragen anwenden können. Allgemeine Hinweise zur Ausführung von Abfragen finden Sie im Handbuch zum Ausführen von Abfragen in Abfrage-Service.