Carga incremental no Serviço de consulta
- Tópicos:
- Consultas
Criado para:
- Usuário
- Desenvolvedor
O padrão de design de carga incremental é uma solução para gerenciamento de dados. O padrão processa apenas as informações no conjunto de dados que foi criado ou modificado desde a última execução de carregamento.
A carga incremental usa vários recursos fornecidos pelo Serviço de consulta da Adobe Experience Platform, como blocos anônimos e instantâneos. Esse padrão de design aumenta a eficiência do processamento, pois quaisquer dados já processados da origem são ignorados. Ele pode ser usado com processamento de dados em lote e de transmissão.
Este documento fornece uma série de instruções para criar um padrão de design para processamento incremental. Essas etapas podem ser usadas como um modelo para criar suas próprias consultas de carregamento de dados incrementais.
Introdução
Os exemplos de SQL neste documento exigem que você tenha uma compreensão dos recursos de bloco e instantâneo anônimos. É recomendável que você leia a documentação das consultas de exemplo de bloco anônimo e também a documentação da cláusula de instantâneo.
Para orientação sobre qualquer terminologia usada neste guia, consulte o guia de sintaxe SQL.
Carregar dados incrementalmente
As etapas abaixo demonstram como criar e carregar dados de forma incremental usando instantâneos e o recurso de bloqueio anônimo. O padrão de design pode ser usado como um modelo para sua própria sequência de consultas.
-
Crie uma tabela
checkpoint_log
para rastrear o instantâneo mais recente usado para processar dados com êxito. A tabela de rastreamento (checkpoint_log
neste exemplo) deve primeiro ser inicializada emnull
para processar de forma incremental um conjunto de dados.DROP TABLE IF EXISTS checkpoint_log; CREATE TABLE checkpoint_log AS SELECT cast(NULL AS string) process_name, cast(NULL AS string) process_status, cast(NULL AS string) last_snapshot_id, cast(NULL AS TIMESTAMP) process_timestamp WHERE false;
-
Preencha a tabela
checkpoint_log
com um registro vazio para o conjunto de dados que precisa de processamento incremental.DIM_TABLE_ABC
é o conjunto de dados a ser processado no exemplo abaixo. Na primeira ocasião do processamento deDIM_TABLE_ABC
,last_snapshot_id
é inicializado comonull
. Isso permite processar todo o conjunto de dados na primeira vez e de forma incremental posteriormente.INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast(NULL AS string) last_snapshot_id, CURRENT_TIMESTAMP process_timestamp;
-
Em seguida, inicialize
DIM_TABLE_ABC_Incremental
para conter saída processada deDIM_TABLE_ABC
. O bloco anônimo na seção de execução required do exemplo SQL abaixo, conforme descrito nas etapas de um a quatro, é executado sequencialmente para processar dados de forma incremental.- Defina o
from_snapshot_id
que indica de onde o processamento começa. Ofrom_snapshot_id
no exemplo é consultado a partir da tabelacheckpoint_log
para uso comDIM_TABLE_ABC
. Na execução inicial, a ID do instantâneo seránull
, o que significa que todo o conjunto de dados será processado. - Defina
to_snapshot_id
como a ID de instantâneo atual da tabela de origem (DIM_TABLE_ABC
). No exemplo, isso é consultado a partir da tabela de metadados da tabela de origem. - Use a palavra-chave
CREATE
para criarDIM_TABLE_ABC_Incremenal
como a tabela de destino. A tabela de destino mantém os dados processados do conjunto de dados de origem (DIM_TABLE_ABC
). Isso permite que os dados processados da tabela de origem entrefrom_snapshot_id
eto_snapshot_id
sejam acrescentados de forma incremental à tabela de destino. - Atualize a tabela
checkpoint_log
comto_snapshot_id
para os dados de origem queDIM_TABLE_ABC
processou com êxito. - Se qualquer uma das consultas executadas sequencialmente do bloco anônimo falhar, a seção de exceção opcional será executada. Isso retorna um erro e encerra o processo.
NOTEOhistory_meta('source table name')
é um método conveniente usado para obter acesso ao instantâneo disponível em um conjunto de dados.$$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; CREATE TABLE DIM_TABLE_ABC_Incremental AS SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id ; INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$;
- Defina o
-
Use a lógica de carregamento de dados incrementais no exemplo de bloco anônimo abaixo para permitir que quaisquer novos dados do conjunto de dados de origem (desde o carimbo de data e hora mais recente) sejam processados e anexados à tabela de destino em uma cadência regular. No exemplo, as alterações de dados em
DIM_TABLE_ABC
serão processadas e anexadas aDIM_TABLE_ABC_incremental
.NOTE_ID
é a Chave Primária emDIM_TABLE_ABC_Incremental
eSELECT history_meta('DIM_TABLE_ABC')
.$$ BEGIN SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a join (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b ON a.process_timestamp=b.process_timestamp; SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true; SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP; INSERT INTO DIM_TABLE_ABC_Incremental SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id); INSERT INTO checkpoint_log SELECT 'DIM_TABLE_ABC' process_name, 'SUCCESSFUL' process_status, cast( @to_snapshot_id AS string) last_snapshot_id, cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp; EXCEPTION WHEN OTHER THEN SELECT 'ERROR'; END $$;
Essa lógica pode ser aplicada a qualquer tabela para executar cargas incrementais.
Instantâneos expirados
Para resolver o problema de uma ID de snapshot expirada, insira o seguinte comando no início do bloco anônimo. A linha de código a seguir substitui o @from_snapshot_id
pelo snapshot_id
mais antigo disponível nos metadados.
SET resolve_fallback_snapshot_on_failure=true;
O bloco de código inteiro tem a seguinte aparência:
$$ BEGIN
SET resolve_fallback_snapshot_on_failure=true;
SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
(SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
on a.process_timestamp=b.process_timestamp;
SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE is_current = true;
SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
INSERT INTO DIM_TABLE_ABC_Incremental
SELECT * FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
Insert Into
checkpoint_log
SELECT
'DIM_TABLE_ABC' process_name,
'SUCCESSFUL' process_status,
cast( @to_snapshot_id AS string) last_snapshot_id,
cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
WHEN OTHER THEN
SELECT 'ERROR';
END
$$;
Próximas etapas
Ao ler este documento, você deve entender melhor como usar os recursos de instantâneos e blocos anônimos para executar cargas incrementais e pode aplicar essa lógica às suas próprias consultas específicas. Para orientação geral sobre execução de consulta, leia o guia sobre execução de consulta no Serviço de Consulta.