查询服务中的增量加载

增量式负载设计模式是一种用于管理数据的解决方案。 该模式仅处理自上次加载执行以来创建或修改的数据集中的信息。

增量加载使用Adobe Experience Platform查询服务提供的各种功能,例如匿名块和快照。 此设计模式提高了处理效率,因为已跳过从源处理的任何数据。 它可以用于流数据处理和批量数据处理。

本文档提供了一系列说明,说明如何构建用于增量处理的设计模式。 这些步骤可用作创建自己的增量数据加载查询的模板。

快速入门

本文档中的SQL示例要求您了解匿名块和快照功能。 建议您阅读示例匿名块查询文档以及快照子句文档。

有关本指南中使用的任何术语的指导,请参阅SQL语法指南

增量加载数据

以下步骤演示了如何使用快照和匿名块功能创建和增量加载数据。 设计模式可用作您自己的查询序列的模板。

  1. 创建checkpoint_log表以跟踪用于成功处理数据的最新快照。 跟踪表(checkpoint_log,在本例中)必须首先初始化为null,以便逐步处理数据集。

    code language-sql
    DROP TABLE IF EXISTS checkpoint_log;
    CREATE TABLE  checkpoint_log AS
    SELECT
       cast(NULL AS string) process_name,
       cast(NULL AS string) process_status,
       cast(NULL AS string) last_snapshot_id,
       cast(NULL AS TIMESTAMP) process_timestamp
       WHERE false;
    
  2. 为需要增量处理的数据集填充一个空记录的checkpoint_log表。 DIM_TABLE_ABC是以下示例中要处理的数据集。 在首次处理DIM_TABLE_ABC时,last_snapshot_id被初始化为null。 这样,您就可以在第一次处理整个数据集时,并在其后逐步处理整个数据集。

    code language-sql
    INSERT INTO
       checkpoint_log
       SELECT
          'DIM_TABLE_ABC' process_name,
          'SUCCESSFUL' process_status,
          cast(NULL AS string) last_snapshot_id,
          CURRENT_TIMESTAMP process_timestamp;
    
  3. 接下来,初始化DIM_TABLE_ABC_Incremental以包含来自DIM_TABLE_ABC的已处理输出。 以下SQL示例的​ required ​执行部分中的匿名块(如步骤1到步骤4中所述)按顺序执行以增量方式处理数据。

    1. 设置from_snapshot_id以指示处理从何处开始。 已从checkpoint_log表中查询示例中的from_snapshot_id以与DIM_TABLE_ABC一起使用。 在初始运行时,快照ID将为null,这意味着将处理整个数据集。
    2. to_snapshot_id设置为源表(DIM_TABLE_ABC)的当前快照标识。 在本例中,这是从源表的元数据表中查询的。
    3. 使用CREATE关键字创建DIM_TABLE_ABC_Incremenal作为目标表。 目标表保留源数据集(DIM_TABLE_ABC)中已处理的数据。 这允许将来自from_snapshot_idto_snapshot_id之间的源表的已处理数据增量附加到目标表。
    4. 使用to_snapshot_id更新checkpoint_log表,以获取DIM_TABLE_ABC已成功处理的源数据。
    5. 如果匿名块的任何按顺序执行的查询失败,则执行​ optional ​异常部分。 这将返回错误并结束进程。
    note note
    NOTE
    history_meta('source table name')是一种用于获取数据集中可用快照的访问权限的简便方法。
    code language-sql
    $$ BEGIN
        SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
                                (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
                                    WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
                                    ON a.process_timestamp=b.process_timestamp;
        SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE  is_current = true;
        SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
        CREATE TABLE DIM_TABLE_ABC_Incremental AS
         SELECT  *  FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id ;
    
    INSERT INTO
       checkpoint_log
       SELECT
           'DIM_TABLE_ABC' process_name,
           'SUCCESSFUL' process_status,
          cast( @to_snapshot_id AS string) last_snapshot_id,
          cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
    
    EXCEPTION
      WHEN OTHER THEN
        SELECT 'ERROR';
    END
    $$;
    
  4. 在下面的匿名块示例中使用增量数据加载逻辑,以允许定期处理源数据集中的任何新数据(自最近的时间戳以来)并将其附加到目标表中。 在此示例中,将处理对DIM_TABLE_ABC的数据更改并将其附加到DIM_TABLE_ABC_incremental

    note note
    NOTE
    _IDDIM_TABLE_ABC_IncrementalSELECT history_meta('DIM_TABLE_ABC')中的主键。
    code language-sql
    $$ BEGIN
        SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a join
                                (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
                                    WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
                                    ON a.process_timestamp=b.process_timestamp;
        SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE  is_current = true;
        SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
        INSERT INTO DIM_TABLE_ABC_Incremental
         SELECT  *  FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);
    
    INSERT INTO
       checkpoint_log
       SELECT
           'DIM_TABLE_ABC' process_name,
           'SUCCESSFUL' process_status,
          cast( @to_snapshot_id AS string) last_snapshot_id,
          cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
    
    EXCEPTION
      WHEN OTHER THEN
        SELECT 'ERROR';
    END
    $$;
    

此逻辑可以应用于任何表以执行增量加载。

过期的快照

要解决快照ID过期的问题,请在匿名块的开头插入以下命令。 以下代码行使用元数据中最早可用的snapshot_id覆盖@from_snapshot_id

SET resolve_fallback_snapshot_on_failure=true;

整个代码块如下所示:

$$ BEGIN
    SET resolve_fallback_snapshot_on_failure=true;
    SET @from_snapshot_id = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN
                            (SELECT MAX(process_timestamp)process_timestamp FROM checkpoint_log
                                WHERE process_name = 'DIM_TABLE_ABC' AND process_status = 'SUCCESSFUL' )b
                                on a.process_timestamp=b.process_timestamp;
    SET @to_snapshot_id = SELECT snapshot_id FROM (SELECT history_meta('DIM_TABLE_ABC')) WHERE  is_current = true;
    SET @last_updated_timestamp= SELECT CURRENT_TIMESTAMP;
    INSERT INTO DIM_TABLE_ABC_Incremental
     SELECT  *  FROM DIM_TABLE_ABC SNAPSHOT BETWEEN @from_snapshot_id AND @to_snapshot_id WHERE NOT EXISTS (SELECT _id FROM DIM_TABLE_ABC_Incremental a WHERE _id=a._id);

Insert Into
   checkpoint_log
   SELECT
       'DIM_TABLE_ABC' process_name,
       'SUCCESSFUL' process_status,
      cast( @to_snapshot_id AS string) last_snapshot_id,
      cast( @last_updated_timestamp AS TIMESTAMP) process_timestamp;
EXCEPTION
  WHEN OTHER THEN
    SELECT 'ERROR';
END
$$;

后续步骤

通过阅读本文档,您应该更好地了解如何使用匿名块和快照功能执行增量加载,并且可以将此逻辑应用于您自己的特定查询。 有关查询执行的一般指导,请阅读查询服务🔗中查询执行的指南。

recommendation-more-help
ccf2b369-4031-483f-af63-a93b5ae5e3fb