Le modèle de conception de chargement incrémentiel est une solution de gestion des données. Le modèle ne traite que les informations du jeu de données qui ont été créées ou modifiées depuis la dernière exécution du chargement.
Le chargement incrémentiel utilise différentes fonctionnalités fournies par Adobe Experience Platform Query Service, telles que les blocs anonymes et les instantanés. Ce modèle de conception augmente l’efficacité du traitement, car toutes les données déjà traitées à partir de la source sont ignorées. Il peut être utilisé avec le traitement des données en streaming et par lots.
Ce document fournit une série d’instructions pour créer un modèle de conception pour le traitement incrémentiel. Ces étapes peuvent être utilisées comme modèle pour créer vos propres requêtes de chargement incrémentiel de données.
Les exemples SQL de ce document requièrent une connaissance des fonctions d’instantanés et de blocs anonymes. Nous vous recommandons de lire la documentation sur l’exemple de requêtes en blocs anonymes et la documentation sur la clause d’instantané.
Pour vous familiariser avec la terminologie utilisée dans ce guide, constulez le guide de syntaxe SQL.
Les étapes ci-dessous montrent comment créer et charger incrémentiellement des données à l’aide d’instantanés et de la fonction de bloc anonyme. Le modèle de conception peut être utilisé comme modèle pour votre propre séquence de requêtes.
Créez une table checkpoint_log
pour effectuer le suivi de l’instantané le plus récent utilisé pour traiter les données avec succès. La table de suivi (checkpoint_log
dans cet exemple) doit d’abord être initialisé en null
afin de traiter de manière incrémentielle un jeu de données.
DROP TABLE IF EXISTS checkpoint_log;
CREATE TABLE checkpoint_log AS
SELECT
cast(NULL AS string) process_name,
cast(NULL AS string) process_status,
cast(NULL AS string) last_snapshot_id,
cast(NULL AS TIMESTAMP) process_timestamp
WHERE false;
Renseignez la table checkpoint_log
avec un enregistrement vide pour le jeu de données qui nécessite un traitement incrémentiel. DIM_TABLE_ABC
est le jeu de données à traiter dans l’exemple ci-dessous. Lors du premier traitement de DIM_TABLE_ABC
, l’last_snapshot_id
est initialisé en tant que null
. Cela vous permet de traiter l’ensemble du jeu de données la première fois et de manière incrémentielle par la suite.
INSERT INTO
checkpoint_log
SELECT
'DIM_TABLE_ABC' process_name,
'SUCCESSFUL' process_status,
cast(NULL AS string) last_snapshot_id,
CURRENT_TIMESTAMP process_timestamp;
Ensuite, initialisez DIM_TABLE_ABC_Incremental
pour contenir la sortie traitée de DIM_TABLE_ABC
. Le bloc anonyme dans la section d’exécution obligatoire de l’exemple SQL ci-dessous, comme décrit dans les étapes 1 à 4, est exécuté de manière séquentielle pour traiter les données de manière incrémentielle.
from_snapshot_id
qui indique l’endroit où commence le traitement. L’from_snapshot_id
dans l’exemple est interrogé à partir de la table checkpoint_log
à utiliser avec DIM_TABLE_ABC
. Lors de l’exécution initiale, l’ID de l’instantané est null
, ce qui signifie que l’ensemble du jeu de données sera traité.to_snapshot_id
comme ID d’instantané actuel de la table source (DIM_TABLE_ABC
). Dans l’exemple, cette requête provient de la table des métadonnées de la table source.CREATE
pour créer DIM_TABLE_ABC_Incremenal
comme table de destination. La table de destination conserve les données traitées du jeu de données source (DIM_TABLE_ABC
). Cela permet aux données traitées de la table source entre from_snapshot_id
et to_snapshot_id
, d’être ajoutées de manière incrémentielle à la table de destination.checkpoint_log
avec l’to_snapshot_id
des données source que DIM_TABLE_ABC
a traité avec succès.L’élément history_meta('source table name')
est une méthode pratique utilisée pour accéder à l’instantané disponible dans un jeu de données.
$$ BEGIN
SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
(SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
ON a.process_timestamp=b.process_timestamp;
SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true;
SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
CREATE TABLE DIM_TABLE_ABC_Incremental AS
SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id ;
INSERT INTO
checkpoint_log
SELECT
'DIM_TABLE_ABC' process_name,
'SUCCESSFUL' process_status,
cast( @to_snapshot_id AS string) last_snapshot_id,
cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
WHEN OTHER THEN
SELECT 'ERROR';
END
$$;
Utilisez la logique de chargement incrémentiel des données dans l’exemple de bloc anonyme ci-dessous pour permettre le traitement et l’ajout régulier de toutes les nouvelles données du jeu de données source (depuis la date et l’heure les plus récentes) à la table de destination. Dans l’exemple, les données modifiées en DIM_TABLE_ABC
seront traitées et ajoutées à DIM_TABLE_ABC_incremental
.
_ID
est la clé primaire dans DIM_TABLE_ABC_Incremental
et SELECT history_meta('DIM_TABLE_ABC')
.
$$ BEGIN
SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a join
(SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
ON a.process_timestamp=b.process_timestamp;
SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true;
SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
INSERT INTO DIM_TABLE_ABC_Incremental
SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
INSERT INTO
checkpoint_log
SELECT
'DIM_TABLE_ABC' process_name,
'SUCCESSFUL' process_status,
cast( @to_snapshot_id AS string) last_snapshot_id,
cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
WHEN OTHER THEN
SELECT 'ERROR';
END
$$;
Cette logique peut être appliquée à n’importe quelle table pour effectuer des chargements incrémentiels.
Les métadonnées d’instantané expirent après deux jours. Un instantané expiré invalide la logique du script fourni ci-dessus.
Pour résoudre le problème d’un ID d’instantané expiré, insérez la commande suivante au début du bloc anonyme. La ligne de code suivante remplace l’@from_snapshot_id
par l’snapshot_id
disponible en premier à partir des métadonnées.
SET resolve_fallback_snapshot_on_failure=true;
L’ensemble du bloc de code se présente comme suit :
$$ BEGIN
SET resolve_fallback_snapshot_on_failure=true;
SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
(SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
on a.process_timestamp=b.process_timestamp;
SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true;
SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
INSERT INTO DIM_TABLE_ABC_Incremental
SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
Insert Into
checkpoint_log
SELECT
'DIM_TABLE_ABC' process_name,
'SUCCESSFUL' process_status,
cast( @to_snapshot_id AS string) last_snapshot_id,
cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
WHEN OTHER THEN
SELECT 'ERROR';
END
$$;
En lisant ce document, vous devriez mieux comprendre comment utiliser les fonctions d’instantané et de bloc anonyme pour effectuer des chargements incrémentiels et appliquer cette logique à vos propres requêtes spécifiques. Pour des directives générales sur l’exécution de requêtes, consultez le guide sur l’exécution de requêtes dans Query Service.