机器学习的工程师功能
本文档演示如何将Adobe Experience Platform中的数据转换为 功能 或可由机器学习模型使用的变量。 此过程称为 功能工程。 使用Data Distiller可大规模计算ML功能,并将这些功能共享到您的机器学习环境。 其中涉及以下各项:
- 创建查询模板以定义要为模型计算的目标标签和特征
- 执行查询并将结果存储在训练数据集中
定义培训数据 define-training-data
下方的示例说明了从Experience Events数据集为模型导出训练数据的查询,该数据用于预测用户订阅新闻稿的倾向。 订阅事件由事件类型web.formFilledOut
表示,数据集中的其他行为事件用于派生配置文件级功能以预测订阅。
查询正标签和负标签 query-positive-and-negative-labels
用于训练(监督)机器学习模型的完整数据集包括表示要预测的结果的目标变量或标签,以及一组用于描述用于训练模型的示例用户档案的特征或解释变量。
在这种情况下,标签是一个名为subscriptionOccurred
的变量,如果用户配置文件具有类型为web.formFilledOut
的事件,则其等于1,否则为0。 以下查询从事件数据集返回一组50,000个用户,其中包括具有正标签(subscriptionOccurred = 1
)的所有用户,加上随机选择的具有负标签的用户集,以完成50,000个用户样本的大小。 这可确保训练数据包含可供模型学习的正面和负面示例。
from aepp import queryservice
dd_conn = queryservice.QueryService().connection()
dd_cursor = queryservice.InteractiveQuery2(dd_conn)
query_labels = f"""
SELECT *
FROM (
SELECT
eventType,
_{tenant_id}.user_id as userId,
SUM(CASE WHEN eventType='web.formFilledOut' THEN 1 ELSE 0 END)
OVER (PARTITION BY _{tenant_id}.user_id)
AS "subscriptionOccurred",
row_number() OVER (PARTITION BY _{tenant_id}.user_id ORDER BY randn()) AS random_row_number_for_user
FROM {table_name}
)
WHERE (subscriptionOccurred = 1 AND eventType = 'web.formFilledOut') OR (subscriptionOccurred = 0 AND random_row_number_for_user = 1)
"""
df_labels = dd_cursor.query(query_labels, output="dataframe")
print(f"Number of classes: {len(df_labels)}")
df_labels.head()
示例输出
类数: 50000
聚合事件以定义ML的功能 define-features
通过适当的查询,您可以将数据集中的事件收集到有意义的数字特征中,这些特征可用于训练倾向模型。 下面显示了示例事件:
- 发送用于营销目的并由用户接收的电子邮件数量。
- 这些电子邮件中 打开的部分。
- 用户 选择 链接的电子邮件部分。
- 查看的产品数。
- 与 交互的 建议数。
- 已驳回的 建议数。
- 已选择的 链接数。
- 收到两封连续电子邮件之间的分钟数。
- 打开的两封连续电子邮件之间的分钟数。
- 用户实际选择链接的两封连续电子邮件之间的分钟数。
- 两次连续产品查看之间的分钟数。
- 与两个建议交互的分钟数。
- 两个被驳回的建议之间的分钟数。
- 已选择两个链接之间的分钟数。
以下查询汇总这些事件:
code language-python |
---|
|
示例输出
组合标签和功能查询 combine-queries
最后,标签查询和特征查询可以合并为一个查询,返回标签和特征的训练数据集:
code language-python |
---|
|
示例输出
创建查询模板以增量计算培训数据
通常使用更新的训练数据定期重新训练模型,以保持随时间变化的模型精度。 作为有效更新训练数据集的最佳实践,您可以从训练集查询创建模板以增量计算新训练数据。 这样,您就只能根据自上次更新训练数据以来添加到原始Experience Events数据集的数据计算标签和功能,并将新标签和功能插入现有训练数据集。
这样做需要对训练集查询进行一些修改:
-
添加逻辑以创建新训练数据集(如果不存在),否则,将新标签和功能插入到现有训练数据集中。 这需要训练集查询的一系列两个版本:
- 首先,使用
CREATE TABLE IF NOT EXISTS {table_name} AS
语句 - 接下来,对训练数据集已存在的情况下使用
INSERT INTO {table_name}
语句
- 首先,使用
-
添加
SNAPSHOT BETWEEN $from_snapshot_id AND $to_snapshot_id
语句以将查询限制为在指定间隔内添加的事件数据。 快照ID上的$
前缀表示它们是在执行查询模板时要传入的变量。
应用这些更改会导致以下查询:
code language-python |
---|
|
最后,以下代码将查询模板保存在Data Distiller中:
template_res = dd.createQueryTemplate({
"sql": query_training_set_template,
"queryParameters": {},
"name": "Template for propensity training data"
})
template_id = template_res["id"]
print(f"Template for propensity training data created as ID {template_id}")
示例输出
Template for propensity training data created as ID f3d1ec6b-40c2-4d13-93b6-734c1b3c7235
保存模板后,您可以随时通过引用模板ID并执行查询,并指定应包含在查询中的快照ID的范围。 以下查询可检索原始Experience Events数据集的快照:
query_snapshots = f"""
SELECT snapshot_id
FROM (
SELECT history_meta('{table_name}')
)
WHERE is_current = true OR snapshot_generation = 0
ORDER BY snapshot_generation ASC
"""
df_snapshots = dd_cursor.query(query_snapshots, output="dataframe")
以下代码演示了如何使用第一个和最后一个快照来查询整个数据集,从而执行查询模板:
snapshot_start_id = str(df_snapshots["snapshot_id"].iloc[0])
snapshot_end_id = str(df_snapshots["snapshot_id"].iloc[1])
query_final_res = qs.postQueries(
name=f"[CMLE][Week2] Query to generate training data created by {username}",
templateId=template_id,
queryParameters={
"from_snapshot_id": snapshot_start_id,
"to_snapshot_id": snapshot_end_id,
},
dbname=f"{cat_conn.sandbox}:all"
)
query_final_id = query_final_res["id"]
print(f"Query started successfully and got assigned ID {query_final_id} - it will take some time to execute")
示例输出
Query started successfully and got assigned ID c6ea5009-1315-4839-b072-089ae01e74fd - it will take some time to execute
您可以定义以下函数以定期检查查询的状态:
def wait_for_query_completion(query_id):
while True:
query_info = qs.getQuery(query_id)
query_state = query_info["state"]
if query_state in ["SUCCESS", "FAILED"]:
break
print("Query is still in progress, sleeping…")
time.sleep(60)
duration_secs = query_info["elapsedTime"] / 1000
if query_state == "SUCCESS":
print(f"Query completed successfully in {duration_secs} seconds")
else:
print(f"Query failed with the following errors:", file=sys.stderr)
for error in query_info["errors"]:
print(f"Error code {error['code']}: {error['message']}", file=sys.stderr)
wait_for_query_completion(query_final_id)
示例输出
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query completed successfully in 473.8 seconds
后续步骤:
通过阅读本文档,您已了解如何将Adobe Experience Platform中的数据转换为机器学习模型可以使用的功能或变量。 在机器学习环境中创建从Experience Platform到馈送自定义模型的功能管道的下一步是导出功能数据集。