Chargement incrémentiel dans Query Service
Le modèle de conception de chargement incrémentiel est une solution de gestion des données. Le modèle ne traite que les informations du jeu de données qui ont été créées ou modifiées depuis la dernière exécution du chargement.
Le chargement incrémentiel utilise différentes fonctionnalités fournies par Adobe Experience Platform Query Service, telles que les blocs anonymes et les instantanés. Ce modèle de conception augmente l’efficacité du traitement, car toutes les données déjà traitées à partir de la source sont ignorées. Il peut être utilisé avec le traitement des données en streaming et par lots.
Ce document fournit une série d’instructions pour créer un modèle de conception pour le traitement incrémentiel. Ces étapes peuvent être utilisées comme modèle pour créer vos propres requêtes de chargement incrémentiel de données.
Prise en main
Les exemples SQL de ce document requièrent une connaissance des fonctions d’instantanés et de blocs anonymes. Nous vous recommandons de lire la documentation sur l’exemple de requêtes en blocs anonymes et la documentation sur la clause d’instantané.
Pour vous familiariser avec la terminologie utilisée dans ce guide, constulez le guide de syntaxe SQL.
Chargement incrémentiel des données
Les étapes ci-dessous montrent comment créer et charger incrémentiellement des données à l’aide d’instantanés et de la fonction de bloc anonyme. Le modèle de conception peut être utilisé comme modèle pour votre propre séquence de requêtes.
-
Créez une table
checkpoint_logpour effectuer le suivi de l’instantané le plus récent utilisé pour traiter les données avec succès. La table de suivi (checkpoint_logdans cet exemple) doit d’abord être initialisé ennullafin de traiter de manière incrémentielle un jeu de données.code language-sql DROP TABLE IF EXISTS checkpoint_log; CREATE TABLE checkpoint_log AS SELECT cast(NULL AS string) process_name, cast(NULL AS string) process_status, cast(NULL AS string) last_snapshot_id, cast(NULL AS TIMESTAMP) process_timestamp WHERE false; -
Renseignez la table
checkpoint_logavec un enregistrement vide pour le jeu de données qui nécessite un traitement incrémentiel.DIM_TABLE_ABCest le jeu de données à traiter dans l’exemple ci-dessous. Lors du premier traitement deDIM_TABLE_ABC, l’last_snapshot_idest initialisé en tant quenull. Cela vous permet de traiter l’ensemble du jeu de données la première fois et de manière incrémentielle par la suite.code language-sql INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast(NULL AS string) last_snapshot_id, CURRENT_TIMESTAMP process_timestamp; -
Ensuite, initialisez
DIM_TABLE_ABC_Incrementalpour contenir la sortie traitée deDIM_TABLE_ABC. Le bloc anonyme dans la section d’exécution obligatoire de l’exemple SQL ci-dessous, comme décrit dans les étapes 1 à 4, est exécuté de manière séquentielle pour traiter les données de manière incrémentielle.- Définissez l’
from_snapshot_idqui indique l’endroit où commence le traitement. L’from_snapshot_iddans l’exemple est interrogé à partir de la tablecheckpoint_logà utiliser avecDIM_TABLE_ABC. Lors de l’exécution initiale, l’ID de l’instantané estnull, ce qui signifie que l’ensemble du jeu de données sera traité. - Définissez l’
to_snapshot_idcomme ID d’instantané actuel de la table source (DIM_TABLE_ABC). Dans l’exemple, cette requête provient de la table des métadonnées de la table source. - Utilisez le mot-clé
CREATEpour créerDIM_TABLE_ABC_Incremenalcomme table de destination. La table de destination conserve les données traitées du jeu de données source (DIM_TABLE_ABC). Cela permet aux données traitées de la table source entrefrom_snapshot_idetto_snapshot_id, d’être ajoutées de manière incrémentielle à la table de destination. - Mettez à jour la table
checkpoint_logavec l’to_snapshot_iddes données source queDIM_TABLE_ABCa traité avec succès. - Si l’une des requêtes exécutées de manière séquentielle du bloc anonyme échoue, la section d’exception facultative est exécutée. Cela renvoie une erreur et met fin au processus.
note note NOTE L’élément history_meta('source table name')est une méthode pratique utilisée pour accéder à l’instantané disponible dans un jeu de données.code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; CREATE TABLE DIM_TABLE_ABC_Incremental AS SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id ; INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$; - Définissez l’
-
Utilisez la logique de chargement incrémentiel des données dans l’exemple de bloc anonyme ci-dessous pour permettre le traitement et l’ajout régulier de toutes les nouvelles données du jeu de données source (depuis la date et l’heure les plus récentes) à la table de destination. Dans l’exemple, les données modifiées en
DIM_TABLE_ABCseront traitées et ajoutées àDIM_TABLE_ABC_incremental.note note NOTE _IDest la clé primaire dansDIM_TABLE_ABC_IncrementaletSELECT history_meta('DIM_TABLE_ABC').code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a join (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; INSERT INTO DIM_TABLE_ABC_Incremental SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id); INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$;
Cette logique peut être appliquée à n’importe quelle table pour effectuer des chargements incrémentiels.
Instantanés expirés
Pour résoudre le problème d’un ID d’instantané expiré, insérez la commande suivante au début du bloc anonyme. La ligne de code suivante remplace l’@from_snapshot_id par l’snapshot_id disponible en premier à partir des métadonnées.
SET resolve_fallback_snapshot_on_failure=true;
L’ensemble du bloc de code se présente comme suit :
$$ BEGIN
SET resolve_fallback_snapshot_on_failure=true;
SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
(SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
on a.process_timestamp=b.process_timestamp;
SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true;
SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
INSERT INTO DIM_TABLE_ABC_Incremental
SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
Insert Into
checkpoint_log
SELECT
'DIM_TABLE_ABC' process_name,
'SUCCESSFUL' process_status,
cast( @to_snapshot_id AS string) last_snapshot_id,
cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
WHEN OTHER THEN
SELECT 'ERROR';
END
$$;
Étapes suivantes
En lisant ce document, vous devriez mieux comprendre comment utiliser les fonctions d’instantané et de bloc anonyme pour effectuer des chargements incrémentiels et appliquer cette logique à vos propres requêtes spécifiques. Pour des directives générales sur l’exécution de requêtes, consultez le guide sur l’exécution de requêtes dans Query Service.