Chargement incrémentiel dans Query Service
Le modèle de conception de chargement incrémentiel est une solution de gestion des données. Le modèle ne traite que les informations du jeu de données qui ont été créées ou modifiées depuis la dernière exécution du chargement.
Le chargement incrémentiel utilise différentes fonctionnalités fournies par Adobe Experience Platform Query Service, telles que les blocs anonymes et les instantanés. Ce modèle de conception augmente l’efficacité du traitement, car toutes les données déjà traitées à partir de la source sont ignorées. Il peut être utilisé avec le traitement des données en streaming et par lots.
Ce document fournit une série d’instructions pour créer un modèle de conception pour le traitement incrémentiel. Ces étapes peuvent être utilisées comme modèle pour créer vos propres requêtes de chargement incrémentiel de données.
Prise en main
Les exemples SQL de ce document requièrent une connaissance des fonctions d’instantanés et de blocs anonymes. Nous vous recommandons de lire la documentation sur l’exemple de requêtes en blocs anonymes et la documentation sur la clause d’instantané.
Pour vous familiariser avec la terminologie utilisée dans ce guide, constulez le guide de syntaxe SQL.
Chargement incrémentiel des données
Les étapes ci-dessous montrent comment créer et charger incrémentiellement des données à l’aide d’instantanés et de la fonction de bloc anonyme. Le modèle de conception peut être utilisé comme modèle pour votre propre séquence de requêtes.
-
Créez une table
checkpoint_log
pour effectuer le suivi de l’instantané le plus récent utilisé pour traiter les données avec succès. La table de suivi (checkpoint_log
dans cet exemple) doit d’abord être initialisé ennull
afin de traiter de manière incrémentielle un jeu de données.code language-sql DROP TABLE IF EXISTS checkpoint_log; CREATE TABLE checkpoint_log AS SELECT cast(NULL AS string) process_name, cast(NULL AS string) process_status, cast(NULL AS string) last_snapshot_id, cast(NULL AS TIMESTAMP) process_timestamp WHERE false;
-
Renseignez la table
checkpoint_log
avec un enregistrement vide pour le jeu de données qui nécessite un traitement incrémentiel.DIM_TABLE_ABC
est le jeu de données à traiter dans l’exemple ci-dessous. Lors du premier traitement deDIM_TABLE_ABC
, l’last_snapshot_id
est initialisé en tant quenull
. Cela vous permet de traiter l’ensemble du jeu de données la première fois et de manière incrémentielle par la suite.code language-sql INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast(NULL AS string) last_snapshot_id, CURRENT_TIMESTAMP process_timestamp;
-
Ensuite, initialisez
DIM_TABLE_ABC_Incremental
pour contenir la sortie traitée deDIM_TABLE_ABC
. Le bloc anonyme dans la section d’exécution obligatoire de l’exemple SQL ci-dessous, comme décrit dans les étapes 1 à 4, est exécuté de manière séquentielle pour traiter les données de manière incrémentielle.- Définissez l’
from_snapshot_id
qui indique l’endroit où commence le traitement. L’from_snapshot_id
dans l’exemple est interrogé à partir de la tablecheckpoint_log
à utiliser avecDIM_TABLE_ABC
. Lors de l’exécution initiale, l’ID de l’instantané estnull
, ce qui signifie que l’ensemble du jeu de données sera traité. - Définissez l’
to_snapshot_id
comme ID d’instantané actuel de la table source (DIM_TABLE_ABC
). Dans l’exemple, cette requête provient de la table des métadonnées de la table source. - Utilisez le mot-clé
CREATE
pour créerDIM_TABLE_ABC_Incremenal
comme table de destination. La table de destination conserve les données traitées du jeu de données source (DIM_TABLE_ABC
). Cela permet aux données traitées de la table source entrefrom_snapshot_id
etto_snapshot_id
, d’être ajoutées de manière incrémentielle à la table de destination. - Mettez à jour la table
checkpoint_log
avec l’to_snapshot_id
des données source queDIM_TABLE_ABC
a traité avec succès. - Si l’une des requêtes exécutées de manière séquentielle du bloc anonyme échoue, la section d’exception facultative est exécutée. Cela renvoie une erreur et met fin au processus.
note note NOTE L’élément history_meta('source table name')
est une méthode pratique utilisée pour accéder à l’instantané disponible dans un jeu de données.code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; CREATE TABLE DIM_TABLE_ABC_Incremental AS SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id ; INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$;
- Définissez l’
-
Utilisez la logique de chargement incrémentiel des données dans l’exemple de bloc anonyme ci-dessous pour permettre le traitement et l’ajout régulier de toutes les nouvelles données du jeu de données source (depuis la date et l’heure les plus récentes) à la table de destination. Dans l’exemple, les données modifiées en
DIM_TABLE_ABC
seront traitées et ajoutées àDIM_TABLE_ABC_incremental
.note note NOTE _ID
est la clé primaire dansDIM_TABLE_ABC_Incremental
etSELECT history_meta('DIM_TABLE_ABC')
.code language-sql $$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a join (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; INSERT INTO DIM_TABLE_ABC_Incremental SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id); INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$;
Cette logique peut être appliquée à n’importe quelle table pour effectuer des chargements incrémentiels.
Instantanés expirés
Pour résoudre le problème d’un ID d’instantané expiré, insérez la commande suivante au début du bloc anonyme. La ligne de code suivante remplace l’@from_snapshot_id
par l’snapshot_id
disponible en premier à partir des métadonnées.
SET resolve_fallback_snapshot_on_failure=true;
L’ensemble du bloc de code se présente comme suit :
$$ BEGIN
SET resolve_fallback_snapshot_on_failure=true;
SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
(SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
on a.process_timestamp=b.process_timestamp;
SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true;
SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
INSERT INTO DIM_TABLE_ABC_Incremental
SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
Insert Into
checkpoint_log
SELECT
'DIM_TABLE_ABC' process_name,
'SUCCESSFUL' process_status,
cast( @to_snapshot_id AS string) last_snapshot_id,
cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
WHEN OTHER THEN
SELECT 'ERROR';
END
$$;
Étapes suivantes
En lisant ce document, vous devriez mieux comprendre comment utiliser les fonctions d’instantané et de bloc anonyme pour effectuer des chargements incrémentiels et appliquer cette logique à vos propres requêtes spécifiques. Pour des directives générales sur l’exécution de requêtes, consultez le guide sur l’exécution de requêtes dans Query Service.