机器学习的工程师功能

本文档演示如何将Adobe Experience Platform中的数据转换为 功能 ​或变量一起使用。 此过程称为 特征工程. 使用Data Distiller可大规模计算ML功能,并将这些功能共享到您的机器学习环境。 其中涉及以下各项:

  1. 创建查询模板以定义要为模型计算的目标标签和特征
  2. 执行查询并将结果存储在训练数据集中

定义培训数据 define-training-data

下方的示例说明了从Experience Events数据集为模型导出训练数据的查询,该数据用于预测用户订阅新闻稿的倾向。 订阅事件由事件类型表示 web.formFilledOut数据集中的、和其他行为事件用于派生用户档案级别的功能,以预测订阅。

查询正标签和负标签 query-positive-and-negative-labels

用于训练(监督)机器学习模型的完整数据集包括表示要预测的结果的目标变量或标签,以及一组用于描述用于训练模型的示例用户档案的特征或解释变量。

在这种情况下,标签是一个变量,名为 subscriptionOccurred 如果用户配置文件具有类型为的事件,则等于1 web.formFilledOut ,否则为0。 以下查询从事件数据集中返回一组50,000个用户,其中包括具有正标签的所有用户(subscriptionOccurred = 1)加上一组随机选择的带负标签的用户以完成50,000个用户的样本大小。 这可确保训练数据包含可供模型学习的正面和负面示例。

from aepp import queryservice

dd_conn = queryservice.QueryService().connection()
dd_cursor = queryservice.InteractiveQuery2(dd_conn)

query_labels = f"""
SELECT *
FROM (
    SELECT
        eventType,
        _{tenant_id}.user_id as userId,
        SUM(CASE WHEN eventType='web.formFilledOut' THEN 1 ELSE 0 END)
            OVER (PARTITION BY _{tenant_id}.user_id)
            AS "subscriptionOccurred",
        row_number() OVER (PARTITION BY _{tenant_id}.user_id ORDER BY randn()) AS random_row_number_for_user
    FROM {table_name}
)
WHERE (subscriptionOccurred = 1 AND eventType = 'web.formFilledOut') OR (subscriptionOccurred = 0 AND random_row_number_for_user = 1)
"""

df_labels = dd_cursor.query(query_labels, output="dataframe")
print(f"Number of classes: {len(df_labels)}")
df_labels.head()

示例输出

类数: 50000

事件类型
userId
subscriptionOcked
random_row_number_for_user
0
directMarketing.emailClicked
01027994177972439148069092698714414382
0
1
1
directMarketing.emailOpened
01054714817856066632264746967668888198
0
1
2
web.formFilledOut
01117296890525140996735553609305695042
1
15
3
directMarketing.emailClicked
01149554820363915324573708359099551093
0
1
4
directMarketing.emailClicked
01172121447143590196349410086995740317
0
1

聚合事件以定义ML的功能 define-features

通过适当的查询,您可以将数据集中的事件收集到有意义的数字特征中,这些特征可用于训练倾向模型。 下面显示了示例事件:

  • 电子邮件数量 已发送用于营销目的并由用户接收的数量。
  • 这些电子邮件中属于 已打开.
  • 这些电子邮件中用户在 已选择 链接。
  • 产品数量 已查看的项目。
  • 数量 与之交互的建议.
  • 数量 已驳回的建议.
  • 数量 已选择的链接.
  • 收到两封连续电子邮件之间的分钟数。
  • 打开的两封连续电子邮件之间的分钟数。
  • 用户实际选择链接的两封连续电子邮件之间的分钟数。
  • 两次连续产品查看之间的分钟数。
  • 与两个建议交互的分钟数。
  • 两个被驳回的建议之间的分钟数。
  • 已选择两个链接之间的分钟数。

以下查询汇总这些事件:

选择以查看示例查询
code language-python
query_features = f"""
SELECT
    _{tenant_id}.user_id as userId,
    SUM(CASE WHEN eventType='directMarketing.emailSent' THEN 1 ELSE 0 END)
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "emailsReceived",
    SUM(CASE WHEN eventType='directMarketing.emailOpened' THEN 1 ELSE 0 END)
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "emailsOpened",
    SUM(CASE WHEN eventType='directMarketing.emailClicked' THEN 1 ELSE 0 END)
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "emailsClicked",
    SUM(CASE WHEN eventType='commerce.productViews' THEN 1 ELSE 0 END)
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "productsViewed",
    SUM(CASE WHEN eventType='decisioning.propositionInteract' THEN 1 ELSE 0 END)
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "propositionInteracts",
    SUM(CASE WHEN eventType='decisioning.propositionDismiss' THEN 1 ELSE 0 END)
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "propositionDismissed",
    SUM(CASE WHEN eventType='web.webinteraction.linkClicks' THEN 1 ELSE 0 END)
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "webLinkClicks" ,
    TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'directMarketing.emailSent', 'minutes')
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "minutes_since_emailSent",
    TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'directMarketing.emailOpened', 'minutes')
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "minutes_since_emailOpened",
    TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'directMarketing.emailClicked', 'minutes')
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "minutes_since_emailClick",
    TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'commerce.productViews', 'minutes')
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "minutes_since_productView",
    TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'decisioning.propositionInteract', 'minutes')
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "minutes_since_propositionInteract",
    TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'propositionDismiss', 'minutes')
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "minutes_since_propositionDismiss",
    TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'web.webinteraction.linkClicks', 'minutes')
       OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
       AS "minutes_since_linkClick"
FROM {table_name}
"""

df_features = dd_cursor.query(query_features, output="dataframe")
df_features.head()

示例输出

userId
电子邮件已接收
电子邮件已打开
已单击电子邮件
已查看的产品
propositionInteracts
propositionMissed
webLinkClicks
minutes_since_emailSent
minutes_since_emailOpened
minutes_since_emailClick
minutes_since_productView
minutes_since_propositionInteract
minutes_since_propositionDiscisse
minutes_since_linkClick
0
01102546977582484968046916668339306826
1
0
0
0
0
0
0
0.0
NaN
NaN
NaN
NaN
None
NaN
1
01102546977582484968046916668339306826
2
0
0
0
0
0
0
0.0
NaN
NaN
NaN
NaN
None
NaN
2
01102546977582484968046916668339306826
3
0
0
0
0
0
0
0.0
NaN
NaN
NaN
NaN
None
NaN
3
01102546977582484968046916668339306826
3
1
0
0
0
0
0
540.0
0.0
NaN
NaN
NaN
None
NaN
4
01102546977582484968046916668339306826
3
2
0
0
0
0
0
588.0
0.0
NaN
NaN
NaN
None
NaN

组合标签和功能查询 combine-queries

最后,标签查询和特征查询可以合并为一个查询,返回标签和特征的训练数据集:

选择以查看示例查询
code language-python
query_training_set = f"""
SELECT *
FROM (
    SELECT _{tenant_id}.user_id as userId,
       eventType,
       timestamp,
       SUM(CASE WHEN eventType='web.formFilledOut' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id)
           AS "subscriptionOccurred",
       SUM(CASE WHEN eventType='directMarketing.emailSent' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "emailsReceived",
       SUM(CASE WHEN eventType='directMarketing.emailOpened' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "emailsOpened",
       SUM(CASE WHEN eventType='directMarketing.emailClicked' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "emailsClicked",
       SUM(CASE WHEN eventType='commerce.productViews' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "productsViewed",
       SUM(CASE WHEN eventType='decisioning.propositionInteract' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "propositionInteracts",
       SUM(CASE WHEN eventType='decisioning.propositionDismiss' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "propositionDismissed",
       SUM(CASE WHEN eventType='web.webinteraction.linkClicks' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "webLinkClicks" ,
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'directMarketing.emailSent', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_emailSent",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'directMarketing.emailOpened', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_emailOpened",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'directMarketing.emailClicked', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_emailClick",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'commerce.productViews', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_productView",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'decisioning.propositionInteract', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_propositionInteract",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'propositionDismiss', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_propositionDismiss",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'web.webinteraction.linkClicks', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_linkClick",
        row_number() OVER (PARTITION BY _{tenant_id}.user_id ORDER BY randn()) AS random_row_number_for_user
    FROM {table_name} LIMIT 1000
)
WHERE (subscriptionOccurred = 1 AND eventType = 'web.formFilledOut') OR (subscriptionOccurred = 0 AND random_row_number_for_user = 1)
ORDER BY timestamp
"""

df_training_set = dd_cursor.query(query_training_set, output="dataframe")
df_training_set.head()

示例输出

userId
事件类型
时间戳
subscriptionOcked
电子邮件已接收
电子邮件已打开
已单击电子邮件
已查看的产品
propositionInteracts
propositionMissed
webLinkClicks
minutes_since_emailSent
minutes_since_emailOpened
minutes_since_emailClick
minutes_since_productView
minutes_since_propositionInteract
minutes_since_propositionDiscisse
minutes_since_linkClick
random_row_number_for_user
0
02554909162592418347780983091131567290
directMarketing.emailSent
2023-06-17 13:44:59.086
0
2
0
0
0
0
0
0
0.0
NaN
NaN
NaN
NaN
None
NaN
1
1
01130334080340815140184601481559659945
directMarketing.emailOpened
2023-06-19 06:01:55.366
0
1
3
0
1
0
0
0
1921.0
0.0
NaN
1703.0
NaN
None
NaN
1
2
01708961660028351393477273586554010192
web.formFilledOut
2023-06-19 18:36:49.083
1
1
2
2
0
0
0
0
2365.0
26.0
1.0
NaN
NaN
None
NaN
7
3
01809182902320674899156240602124740853
directMarketing.emailSent
2023-06-21 19:17:12.535
0
1
0
0
0
0
0
0
0.0
NaN
NaN
NaN
NaN
None
NaN
1
4
03441761949943678951106193028739001197
directMarketing.emailSent
2023-06-21 21:58:29.482
0
1
0
0
0
0
0
0
0.0
NaN
NaN
NaN
NaN
None
NaN
1

创建查询模板以增量计算培训数据

通常使用更新的训练数据定期重新训练模型,以保持随时间变化的模型精度。 作为有效更新训练数据集的最佳实践,您可以从训练集查询创建模板以增量计算新训练数据。 这样,您就只能根据自上次更新训练数据以来添加到原始Experience Events数据集的数据计算标签和功能,并将新标签和功能插入现有训练数据集。

这样做需要对训练集查询进行一些修改:

  • 添加逻辑以创建新训练数据集(如果不存在),否则,将新标签和功能插入到现有训练数据集中。 这需要训练集查询的一系列两个版本:

    • 首先,使用 CREATE TABLE IF NOT EXISTS {table_name} AS 语句
    • 接下来,使用 INSERT INTO {table_name} 语句适用于训练数据集已存在的情况
  • 添加 SNAPSHOT BETWEEN $from_snapshot_id AND $to_snapshot_id 语句,将查询限制为在指定间隔内添加的事件数据。 此 $ 快照ID的前缀指示它们是执行查询模板时将传入的变量。

应用这些更改会导致以下查询:

选择以查看示例查询
code language-python
ctas_table_name = "propensity_training_set"

query_training_set_template = f"""
$$ BEGIN

SET @my_table_exists = SELECT table_exists('{ctas_table_name}');

CREATE TABLE IF NOT EXISTS {ctas_table_name} AS
SELECT *
FROM (
    SELECT _{tenant_id}.user_id as userId,
       eventType,
       timestamp,
       SUM(CASE WHEN eventType='web.formFilledOut' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id)
           AS "subscriptionOccurred",
       SUM(CASE WHEN eventType='directMarketing.emailSent' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "emailsReceived",
       SUM(CASE WHEN eventType='directMarketing.emailOpened' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "emailsOpened",
       SUM(CASE WHEN eventType='directMarketing.emailClicked' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "emailsClicked",
       SUM(CASE WHEN eventType='commerce.productViews' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "productsViewed",
       SUM(CASE WHEN eventType='decisioning.propositionInteract' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "propositionInteracts",
       SUM(CASE WHEN eventType='decisioning.propositionDismiss' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "propositionDismissed",
       SUM(CASE WHEN eventType='web.webinteraction.linkClicks' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "webLinkClicks" ,
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'directMarketing.emailSent', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_emailSent",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'directMarketing.emailOpened', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_emailOpened",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'directMarketing.emailClicked', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_emailClick",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'commerce.productViews', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_productView",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'decisioning.propositionInteract', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_propositionInteract",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'propositionDismiss', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_propositionDismiss",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'web.webinteraction.linkClicks', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_linkClick",
        row_number() OVER (PARTITION BY _{tenant_id}.user_id ORDER BY randn()) AS random_row_number_for_user
    FROM {table_name}
    SNAPSHOT BETWEEN $from_snapshot_id AND $to_snapshot_id
)
WHERE (subscriptionOccurred = 1 AND eventType = 'web.formFilledOut') OR (subscriptionOccurred = 0 AND random_row_number_for_user = 1)
ORDER BY timestamp;

INSERT INTO {ctas_table_name}
SELECT *
FROM (
    SELECT _{tenant_id}.user_id as userId,
       eventType,
       timestamp,
       SUM(CASE WHEN eventType='web.formFilledOut' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id)
           AS "subscriptionOccurred",
       SUM(CASE WHEN eventType='directMarketing.emailSent' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "emailsReceived",
       SUM(CASE WHEN eventType='directMarketing.emailOpened' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "emailsOpened",
       SUM(CASE WHEN eventType='directMarketing.emailClicked' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "emailsClicked",
       SUM(CASE WHEN eventType='commerce.productViews' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "productsViewed",
       SUM(CASE WHEN eventType='decisioning.propositionInteract' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "propositionInteracts",
       SUM(CASE WHEN eventType='decisioning.propositionDismiss' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "propositionDismissed",
       SUM(CASE WHEN eventType='web.webinteraction.linkClicks' THEN 1 ELSE 0 END)
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "webLinkClicks" ,
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'directMarketing.emailSent', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_emailSent",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'directMarketing.emailOpened', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_emailOpened",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'directMarketing.emailClicked', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_emailClick",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'commerce.productViews', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_productView",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'decisioning.propositionInteract', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_propositionInteract",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'propositionDismiss', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_propositionDismiss",
       TIME_BETWEEN_PREVIOUS_MATCH(timestamp, eventType = 'web.webinteraction.linkClicks', 'minutes')
           OVER (PARTITION BY _{tenant_id}.user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
           AS "minutes_since_linkClick",
        row_number() OVER (PARTITION BY _{tenant_id}.user_id ORDER BY randn()) AS random_row_number_for_user
    FROM {table_name}
    SNAPSHOT BETWEEN $from_snapshot_id AND $to_snapshot_id
)
WHERE
    @my_table_exists = 't' AND
    ((subscriptionOccurred = 1 AND eventType = 'web.formFilledOut') OR (subscriptionOccurred = 0 AND random_row_number_for_user = 1))
ORDER BY timestamp;

EXCEPTION
  WHEN OTHER THEN
    SELECT 'ERROR';

END $$;
"""

最后,以下代码将查询模板保存在Data Distiller中:

template_res = dd.createQueryTemplate({
  "sql": query_training_set_template,
  "queryParameters": {},
  "name": "Template for propensity training data"
})
template_id = template_res["id"]

print(f"Template for propensity training data created as ID {template_id}")

示例输出

Template for propensity training data created as ID f3d1ec6b-40c2-4d13-93b6-734c1b3c7235

保存模板后,您可以随时通过引用模板ID并执行查询,并指定应包含在查询中的快照ID的范围。 以下查询可检索原始Experience Events数据集的快照:

query_snapshots = f"""
SELECT snapshot_id
FROM (
    SELECT history_meta('{table_name}')
)
WHERE is_current = true OR snapshot_generation = 0
ORDER BY snapshot_generation ASC
"""

df_snapshots = dd_cursor.query(query_snapshots, output="dataframe")

以下代码演示了如何使用第一个和最后一个快照来查询整个数据集,从而执行查询模板:

snapshot_start_id = str(df_snapshots["snapshot_id"].iloc[0])
snapshot_end_id = str(df_snapshots["snapshot_id"].iloc[1])

query_final_res = qs.postQueries(
    name=f"[CMLE][Week2] Query to generate training data created by {username}",
    templateId=template_id,
    queryParameters={
        "from_snapshot_id": snapshot_start_id,
        "to_snapshot_id": snapshot_end_id,
    },
    dbname=f"{cat_conn.sandbox}:all"
)
query_final_id = query_final_res["id"]
print(f"Query started successfully and got assigned ID {query_final_id} - it will take some time to execute")

示例输出

Query started successfully and got assigned ID c6ea5009-1315-4839-b072-089ae01e74fd - it will take some time to execute

您可以定义以下函数以定期检查查询的状态:

def wait_for_query_completion(query_id):
    while True:
        query_info = qs.getQuery(query_id)
        query_state = query_info["state"]
        if query_state in ["SUCCESS", "FAILED"]:
            break
        print("Query is still in progress, sleeping…")
        time.sleep(60)

    duration_secs = query_info["elapsedTime"] / 1000
    if query_state == "SUCCESS":
        print(f"Query completed successfully in {duration_secs} seconds")
    else:
        print(f"Query failed with the following errors:", file=sys.stderr)
        for error in query_info["errors"]:
            print(f"Error code {error['code']}: {error['message']}", file=sys.stderr)

wait_for_query_completion(query_final_id)

示例输出

Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query completed successfully in 473.8 seconds

后续步骤:

通过阅读本文档,您已了解如何将Adobe Experience Platform中的数据转换为机器学习模型可以使用的功能或变量。 在机器学习环境中创建从Experience Platform到馈送自定义模型的功能管道的下一步是 导出功能数据集.

recommendation-more-help
ccf2b369-4031-483f-af63-a93b5ae5e3fb