查询服务(数据Distiller)和导出数据集

本文概述如何使用Experience Platform查询服务(Data Distiller)和数据集导出的组合实现以下数据导出用例

  • 数据验证
  • Data Lake,BI工具Data Warehouse
  • 为人工智能和机器学习做好准备。

Adobe Analytics可以使用其数据馈送功能实施这些用例。 数据馈送是从 Adobe Analytics 中获取原始数据的有效方法。本文介绍了如何从Experience Platform中获得相似类型的原始数据,以便您实施上述用例。 在适用的情况下,将本文中描述的功能与Adobe Analytics数据馈送进行比较,以阐明数据和流程中的差异。

简介

使用查询服务(数据Distiller)导出数据以及数据集导出包含以下内容:

  • 定义一个​ 计划查询,该查询使用​ 查询服务 ​将您的数据馈送的数据生成为输出数据集 输出数据集
  • 定义使用​ 数据集导出 ​将输出数据集导出到云存储目标的​ 计划数据集导出

数据馈送

先决条件

在使用本使用案例中所述的功能之前,请确保您满足以下所有要求:

  • 将数据收集到Experience Platform数据湖中的有效实施。
  • 访问Data Distiller加载项,以确保您有权执行批量查询。 有关详细信息,请参阅查询服务打包
  • 访问导出数据集功能,在您购买Real-Time CDP Prime或Ultimate包、Adobe Journey Optimizer或Customer Journey Analytics后可用。 有关详细信息,请参阅将数据集导出到云存储目标
  • 将一个或多个已配置目标(例如:Amazon S3、Google Cloud Storage)导出到的数据馈送原始数据。

查询服务

Experience Platform查询服务允许您查询和联接Experience Platform数据湖中的任何数据集,就像它是一个数据库表一样。 然后,您可以将结果捕获为新数据集,以供进一步在报表中使用或导出。

您可以使用查询服务用户界面、通过PostgresQL协议🔗连接的客户端或RESTful API创建和计划收集数据馈送数据的查询。

创建查询

您可以使用标准ANSI SQL for SELECT语句的所有功能以及其他有限命令来创建和执行查询,以便为数据馈送生成数据。 有关详细信息,请参阅SQL语法。 除了此SQL语法外,Adobe还支持:

数据馈送列

可在查询中使用的XDM字段取决于数据集所基于的架构定义。 确保您确实了解数据集背后的架构。 请参阅数据集UI指南以了解更多信息。

为了帮助您定义数据馈送列和XDM字段之间的映射,请参阅Analytics字段映射。 另请参阅架构UI概述,了解有关如何管理XDM资源(包括架构、类、字段组和数据类型)的更多信息。

例如,如果要使用​ 页面名称 ​作为数据馈送的一部分,请执行以下操作:

  • 在Adobe Analytics数据馈送的UI中,您可以选择​ pagename ​作为要添加到数据馈送定义的列。
  • 在查询服务中,您在查询中包含来自sample_event_dataset_for_website_global_v1_1数据集的web.webPageDetails.name(基于网站的​ 示例事件架构(全局v1.1) ​体验事件架构)。 有关详细信息,请参阅Web详细信息架构字段组

标识

在Experience Platform中,可以使用各种标识。 创建查询时,请确保正确查询了标识。

通常,您会在单独的字段组中找到身份。 在实现中,ECID (ecid)可以定义为具有core对象的字段组的一部分,该对象本身是identification对象的一部分(例如: _sampleorg.identification.core.ecid)。 ECID在架构中的组织方式可能有所不同。

或者,您可以使用identityMap查询身份。 identityMap的类型为Map,并使用嵌套数据结构

有关如何在Experience Platform中定义标识字段的更多信息,请参阅在UI中定义标识字段

请参阅Analytics数据中的主要标识符,了解在使用Analytics源连接器时,Adobe Analytics标识如何映射到Experience Platform标识。 此映射可用作设置标识的指导,即使未使用Analytics Source Connector也是如此。

点击级别数据和识别

根据实施情况,传统上在Adobe Analytics中收集的点击级别数据现在作为Experience Platform中的时间戳事件数据存储。 下表是从Analytics字段映射中提取的,并显示了如何将特定于点击级别的Adobe Analytics数据馈送列与查询中的相应XDM字段进行映射的示例。 该表还显示了如何使用XDM字段识别点击、访问和访客的示例。

数据馈送列
XDM字段
类型
描述
hitid_high + hitid_low
_id
字符串
用于标识点击的唯一标识符。
hitid_low
_id
字符串
hitid_high一起使用以唯一标识点击。
hitid_high
_id
字符串
hitid_high一起使用以唯一标识点击。
hit_time_gmt
receivedTimestamp
字符串
点击的时间戳,基于UNIX®时间。
cust_hit_time_gmt
timestamp
字符串
此时间戳仅在启用了时间戳的数据集中使用。 此时间戳基于UNIX®时间,随点击一起发送。
visid_high + visid_low
identityMap
对象
访问的唯一标识符。
visid_high + visid_low
endUserIDs._experience.aaid.id
字符串
访问的唯一标识符。
visid_high
endUserIDs._experience.aaid.primary
布尔值
visid_low一起使用以唯一标识访问。
visid_high
endUserIDs._experience.aaid.namespace.code
字符串
visid_low一起使用以唯一标识访问。
visid_low
identityMap
对象
visid_high一起使用以唯一标识访问。
cust_visid
identityMap
对象
客户访客ID。
cust_visid
endUserIDs._experience.aacustomid.id
对象
客户访客ID。
cust_visid
endUserIDs._experience.aacustomid.primary
布尔值
客户访客ID命名空间代码。
cust_visid
endUserIDs._experience.aacustomid.namespace.code
字符串
visid_low一起使用以唯一地标识客户访客ID。
geo\_*
placeContext.geo.*
字符串,数字
地理位置数据,如国家/地区、地区、城市等
event_list
commerce.purchasescommerce.productViewscommerce.productListOpenscommerce.checkoutscommerce.productListAddscommerce.productListRemovalscommerce.productListViews_experience.analytics.event101to200.*、…、_experience.analytics.event901_1000.*
字符串
点击时触发的标准商务和自定义事件。
page_event
web.webInteraction.type
字符串
在图像请求中发送的点击类型(标准点击、下载链接、退出链接或单击的自定义链接)。
page_event
web.webInteraction.linkClicks.value
数字
在图像请求中发送的点击类型(标准点击、下载链接、退出链接或单击的自定义链接)。
page_event_var_1
web.webInteraction.URL
字符串
仅在链接跟踪图像请求中使用的变量。 此变量包含下载链接、退出链接或单击的自定义链接的URL。
page_event_var_2
web.webInteraction.name
字符串
仅在链接跟踪图像请求中使用的变量。 这会列出链接的自定义名称(如果已指定)。
paid_search
search.isPaid
布尔值
如果点击与付费搜索检测相匹配,则设置此标记。
ref_type
web.webReferrertype
字符串
表示点击的反向链接类型的数字 ID。

发布列

Adobe Analytics数据馈送使用带有post_前缀的列的概念,这些列是包含处理之后的数据的列。 有关更多信息,请参阅数据馈送常见问题解答

通过Experience PlatformEdge Network(Web SDK、Mobile SDK、服务器API)在数据集中所收集的数据不包含post_字段的概念。 因此,post_个带有前缀和​ -post_个带有前缀的数据馈送列映射到相同的XDM字段。 例如,page_urlpost_page_url数据馈送列都映射到相同的web.webPageDetails.URL XDM字段。

请参阅跨Adobe Analytics和Customer Journey Analytics比较数据处理,以了解数据处理差异的概述。

在Experience Platform数据湖中收集的post_前缀列类型的数据确实需要高级转换,然后才能成功地用于数据馈送用例。 在查询中执行这些高级转换涉及使用Adobe定义的函数进行会话化、归因和重复数据删除。 请参阅有关如何使用这些函数的示例

查找

若要从其他数据集查找数据,请使用标准SQL功能(WHERE子句、INNER JOINOUTER JOIN等)。

计算

要对字段(列)执行计算,请使用标准SQL函数(例如COUNT(*)),或Spark SQL的数学和统计运算符及函数部分。 此外,窗口函数支持更新聚合并为有序子集中的每一行返回单个项。 请参阅有关如何使用这些函数的示例

嵌套数据结构

数据集所基于的架构通常包含复杂的数据类型,包括嵌套的数据结构。 前面提到的identityMap是嵌套数据结构的示例。 有关identityMap数据的示例,请参见下文。

{
   "identityMap":{
      "FPID":[
         {
            "id":"55613368189701342632255821452918751312",
            "authenticatedState":"ambiguous"
         }
      ],
      "CRM":[
         {
            "id":"2394509340-30453470347",
            "authenticatedState":"authenticated"
         }
      ]
   }
}

您可以使用Spark SQL中的explode()或其他Arrays函数来获取嵌套数据结构中的数据,例如:

select explode(identityMap) from demosys_cja_ee_v1_website_global_v1_1 limit 15;

或者,您可以使用点表示法引用单个元素。 例如:

select identityMap.ecid from demosys_cja_ee_v1_website_global_v1_1 limit 15;

有关更多信息,请参阅在 Query Service 中使用嵌套数据结构

示例

对于查询:

  • 那些使用来自Experience Platform数据湖中数据集的数据,
  • 正在点击Adobe定义的函数和/或Spark SQL的附加功能,并且
  • 会将类似的结果交付给等效的Adobe Analytics数据馈送,

请参阅:

下面是一个跨会话正确应用归因的示例,其中说明如何

  • 以过去90天作为回顾,
  • 应用窗口函数,如会话流程和/或归因,以及
  • 基于ingest_time限制输出。

+++
详细信息

要做到这一点,您必须……

  • 使用处理状态表checkpoint_log跟踪当前引入时间与上次引入时间。 有关详细信息,请参阅本指南
  • 禁用删除系统列,以便您可以使用_acp_system_metadata.ingestTime
  • 使用最内层SELECT获取要使用的字段,并将事件限制在回看时段内以进行会话和/或归因计算。 例如,90天。
  • 使用下一级别SELECT来应用会话化和/或归因窗口函数以及其他计算。
  • 在输出表中使用INSERT INTO将回顾限制为仅回顾自上次处理时间以来到达的事件。 为此,请筛选_acp_system_metadata.ingestTime 而不是上次存储在处理状态表中的时间。

会话流程窗口函数示例

$$ BEGIN
   -- Disable dropping system columns
   set drop_system_columns=false;

   -- Initialize variables
   SET @last_updated_timestamp = SELECT CURRENT_TIMESTAMP;

   -- Get the last processed batch ingestion time
   SET @from_batch_ingestion_time = SELECT coalesce(last_batch_ingestion_time, 'HEAD')
      FROM checkpoint_log a
      JOIN (
            SELECT MAX(process_timestamp) AS process_timestamp
            FROM checkpoint_log
            WHERE process_name = 'data_feed'
            AND process_status = 'SUCCESSFUL'
      ) b
      ON a.process_timestamp = b.process_timestamp;

   -- Get the last batch ingestion time
   SET @to_batch_ingestion_time = SELECT MAX(_acp_system_metadata.ingestTime)
      FROM events_dataset;

   -- Sessionize the data and insert into data_feed.
   INSERT INTO data_feed
   SELECT *
   FROM (
      SELECT
            userIdentity,
            timestamp,
            SESS_TIMEOUT(timestamp, 60 * 30) OVER (
               PARTITION BY userIdentity
               ORDER BY timestamp
               ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
            ) AS session_data,
            page_name,
            ingest_time
      FROM (
            SELECT
               userIdentity,
               timestamp,
               web.webPageDetails.name AS page_name,
               _acp_system_metadata.ingestTime AS ingest_time
            FROM events_dataset
            WHERE timestamp >= current_date - 90
      ) AS a
      ORDER BY userIdentity, timestamp ASC
   ) AS b
   WHERE b.ingest_time >= @from_batch_ingestion_time;

   -- Update the checkpoint_log table
   INSERT INTO checkpoint_log
   SELECT
      'data_feed' process_name,
      'SUCCESSFUL' process_status,
      cast(@to_batch_ingestion_time AS string) last_batch_ingestion_time,
      cast(@last_updated_timestamp AS TIMESTAMP) process_timestamp
END
$$;

归因窗口函数示例

$$ BEGIN
 SET drop_system_columns=false;

-- Initialize variables
 SET @last_updated_timestamp = SELECT CURRENT_TIMESTAMP;

-- Get the last processed batch ingestion time 1718755872325
 SET @from_batch_ingestion_time =
     SELECT coalesce(last_snapshot_id, 'HEAD')
     FROM checkpoint_log a
     JOIN (
         SELECT MAX(process_timestamp) AS process_timestamp
         FROM checkpoint_log
         WHERE process_name = 'data_feed'
         AND process_status = 'SUCCESSFUL'
     ) b
     ON a.process_timestamp = b.process_timestamp;

 -- Get the last batch ingestion time 1718758687865
 SET @to_batch_ingestion_time =
     SELECT MAX(_acp_system_metadata.ingestTime)
     FROM demo_data_trey_mcintyre_midvalues;

 -- Sessionize the data and insert into new_sessionized_data
 INSERT INTO new_sessionized_data
 SELECT *
 FROM (
     SELECT
         _id,
         timestamp,
         struct(User_Identity,
         cast(SESS_TIMEOUT(timestamp, 60 * 30) OVER (
             PARTITION BY User_Identity
             ORDER BY timestamp
             ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
         ) as string) AS SessionData,
         to_timestamp(from_unixtime(ingest_time/1000, 'yyyy-MM-dd HH:mm:ss')) AS IngestTime,
         PageName,
         first_url,
         first_channel_type
           ) as _demosystem5
     FROM (
         SELECT
             _id,
             ENDUSERIDS._EXPERIENCE.MCID.ID as User_Identity,
             timestamp,
             web.webPageDetails.name AS PageName,
            attribution_first_touch(timestamp, '', web.webReferrer.url) OVER (PARTITION BY ENDUSERIDS._EXPERIENCE.MCID.ID ORDER BY timestamp ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).value AS first_url,
            attribution_first_touch(timestamp, '',channel.typeAtSource) OVER (PARTITION BY ENDUSERIDS._EXPERIENCE.MCID.ID ORDER BY timestamp ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).value AS first_channel_type,
             _acp_system_metadata.ingestTime AS ingest_time
         FROM demo_data_trey_mcintyre_midvalues
         WHERE timestamp >= current_date - 90
     )
     ORDER BY User_Identity, timestamp ASC
 )
 WHERE _demosystem5.IngestTime >= to_timestamp(from_unixtime(@from_batch_ingestion_time/1000, 'yyyy-MM-dd HH:mm:ss'));

-- Update the checkpoint_log table
INSERT INTO checkpoint_log
SELECT
   'data_feed' as process_name,
   'SUCCESSFUL' as process_status,
   cast(@to_batch_ingestion_time AS string) as last_snapshot_id,
   cast(@last_updated_timestamp AS timestamp) as process_timestamp;

END
$$;

+++

计划查询

您可以计划查询,以确保按首选间隔执行查询并生成结果。

使用查询编辑器

您可以使用查询编辑器计划查询。 在计划查询时,您可以定义输出数据集。 有关详细信息,请参阅查询计划

使用查询服务API

或者,您可以使用RESTful API为查询定义查询和计划。 有关详细信息,请参阅查询服务API指南
在创建查询(创建查询)或为查询创建计划时(创建计划查询),请确保将输出数据集定义为可选ctasParameters属性的一部分。

导出数据集

创建并计划查询并验证结果后,您可以将原始数据集导出到云存储目标。 此导出位于称为“Experience Platform导出目标”的数据集目标术语中。 有关概述,请参阅将数据集导出到云存储目标

支持以下云存储目标:

EXPERIENCE PLATFORMUI

您可以通过Experience PlatformUI导出和计划导出输出数据集。 本节介绍所涉及的步骤。

选择目标

确定要将输出数据集导出到的云存储目标后,选择目标。 如果尚未为首选云存储配置目标,则必须创建新的目标连接

在配置目标时,您可以

  • 定义文件类型(JSON或Parquet),
  • 是否应该压缩结果文件,以及
  • 是否应该包含清单文件。

选择数据集

选择目标后,在下一个​ 选择数据集 ​步骤中,您必须从数据集列表中选择输出数据集。 如果您创建了多个计划查询,并且希望将输出数据集发送到同一云存储目标,则可以选择相应的输出数据集。 有关详细信息,请参阅选择您的数据集

计划数据集导出

最后,要计划数据集导出作为​ 计划 ​步骤的一部分。 在该步骤中,您可以定义计划以及输出数据集导出是否应增量导出。 有关详细信息,请参阅计划数据集导出

最后步骤

查看您的选择,如果正确,开始将输出数据集导出到云存储目标。

您必须验证数据导出是否成功。 导出数据集时,Experience Platform会在目标中定义的存储位置创建一个或多个.json.parquet文件。 根据您设置的导出计划,希望将新文件存储在您的存储位置。 Experience Platform会在您指定为选定目标的一部分的存储位置中创建文件夹结构,存放导出的文件。 每次导出时都会创建一个新文件夹,其模式为: folder-name-you-provided/datasetID/exportTime=YYYYMMDDHHMM。 默认文件名是随机生成的,并确保导出的文件名是唯一的。

流服务API

或者,您可以使用API导出和计划导出输出数据集。 使用流服务API🔗在导出数据集中记录了所涉及的步骤。

快速入门

要导出数据集,请确保您具有所需的权限。 此外,还要验证要将输出数据集发送到的目标是否支持导出数据集。 然后,您必须收集在API调用中使用的必需和可选标头的值。 您还需要识别要将数据集导出到的目标的连接规范和流规范ID。

检索符合条件的数据集

您可以检索符合条件的数据集列表以供导出,并使用GET /connectionSpecs/{id}/configs API验证您的输出数据集是否属于该列表。

创建源连接

接下来,您必须使用要导出到云存储目标的输出数据集的唯一ID 创建源连接。 您使用POST /sourceConnections API。

向目标进行身份验证(创建基本连接)

您现在必须创建基本连接以使用POST /targetConection API进行身份验证并将凭据安全地存储到您的云存储目标。

提供导出参数

接下来,您必须🔗再使用POST /targetConection API创建一个目标连接,用于存储输出数据集的导出参数。 这些导出参数包括位置、文件格式、压缩等。

设置数据流

最后,您设置数据流,以确保使用POST /flows API将输出数据集导出到云存储目标。 在此步骤中,您可以使用scheduleParams参数定义导出的计划。

验证数据流

检查数据流是否成功执行,请使用GET /runs API,将数据流ID指定为查询参数。 此数据流ID是您在设置数据流时返回的标识符。

验证数据导出是否成功。 导出数据集时,Experience Platform会在目标中定义的存储位置创建一个或多个.json.parquet文件。 根据您设置的导出计划,希望将新文件存储在您的存储位置。 Experience Platform会在您指定为选定目标的一部分的存储位置中创建文件夹结构,存放导出的文件。 每次导出时都会创建一个新文件夹,其模式为: folder-name-you-provided/datasetID/exportTime=YYYYMMDDHHMM。 默认文件名是随机生成的,并确保导出的文件名是唯一的。

结论

简而言之,模拟Adobe Analytics数据馈送功能意味着使用查询服务设置计划查询,并在计划的数据集导出中使用这些查询的结果。

IMPORTANT
此用例涉及两个调度程序。 要保证模拟数据馈送功能正常工作,请确保在查询服务和数据导出中配置的计划不会干预。
recommendation-more-help
080e5213-7aa2-40d6-9dba-18945e892f79