机器学习的工程师功能
本文档演示如何将Adobe Experience Platform中的数据转换为 功能 或可由机器学习模型使用的变量。 此过程称为功能工程。 使用Data Distiller可大规模计算ML功能,并将这些功能共享到您的机器学习环境。 其中涉及以下各项:
- 创建查询模板以定义要为模型计算的目标标签和特征
- 执行查询并将结果存储在训练数据集中
定义培训数据 define-training-data
下方的示例说明了从Experience Events数据集为模型导出训练数据的查询,该数据用于预测用户订阅新闻稿的倾向。 订阅事件由事件类型web.formFilledOut表示,数据集中的其他行为事件用于派生配置文件级功能以预测订阅。
查询正标签和负标签 query-positive-and-negative-labels
用于训练(监督)机器学习模型的完整数据集包括表示要预测的结果的目标变量或标签,以及一组用于描述用于训练模型的示例用户档案的特征或解释变量。
在这种情况下,标签是一个名为subscriptionOccurred的变量,如果用户配置文件具有类型为web.formFilledOut的事件,则其等于1,否则为0。 以下查询从事件数据集返回一组50,000个用户,其中包括具有正标签(subscriptionOccurred = 1)的所有用户,加上随机选择的具有负标签的用户集,以完成50,000个用户样本的大小。 这可确保训练数据包含可供模型学习的正面和负面示例。
from aepp import queryservice
dd_conn = queryservice.QueryService().connection()
dd_cursor = queryservice.InteractiveQuery2(dd_conn)
query_labels = f"""
SELECT *
FROM (
SELECT
eventType,
_{tenant_id}.user_id as userId,
SUM(CASE WHEN eventType='web.formFilledOut' THEN 1 ELSE 0 END)
OVER (PARTITION BY _{tenant_id}.user_id)
AS "subscriptionOccurred",
row_number() OVER (PARTITION BY _{tenant_id}.user_id ORDER BY randn()) AS random_row_number_for_user
FROM {table_name}
)
WHERE (subscriptionOccurred = 1 AND eventType = 'web.formFilledOut') OR (subscriptionOccurred = 0 AND random_row_number_for_user = 1)
"""
df_labels = dd_cursor.query(query_labels, output="dataframe")
print(f"Number of classes: {len(df_labels)}")
df_labels.head()
示例输出
类数: 50000
聚合事件以定义ML的功能 define-features
通过适当的查询,您可以将数据集中的事件收集到有意义的数字特征中,这些特征可用于训练倾向模型。 下面显示了示例事件:
- 发送用于营销目的并由用户接收的电子邮件数量。
- 这些电子邮件中打开的部分。
- 用户 选择 链接的电子邮件部分。
- 查看的产品数。
- 与 交互的 建议数。
- 已驳回的建议数。
- 已选择的链接数。
- 收到两封连续电子邮件之间的分钟数。
- 打开的两封连续电子邮件之间的分钟数。
- 用户实际选择链接的两封连续电子邮件之间的分钟数。
- 两次连续产品查看之间的分钟数。
- 与两个建议交互的分钟数。
- 两个被驳回的建议之间的分钟数。
- 已选择两个链接之间的分钟数。
以下查询汇总这些事件:
| code language-python |
|---|
|
示例输出
组合标签和功能查询 combine-queries
最后,标签查询和特征查询可以合并为一个查询,返回标签和特征的训练数据集:
| code language-python |
|---|
|
示例输出
Create a query template to incrementally compute training data
It is typical to periodically retrain a model with updated training data to maintain accuracy of the model over time. As a best practice for efficiently updating your training dataset, you can create a template from your training set query to compute new training data incrementally. This allows you compute labels and features only from data that was added to the original Experience Events dataset since the training data was last updated, and insert the new labels and features into the existing training dataset.
Doing so requires a few modifications to the training set query:
-
Add logic to create a new training dataset if it doesn't exist, and insert the new labels and features into the existing training dataset otherwise. This requires a series of two versions of the training set query:
- First, using the
CREATE TABLE IF NOT EXISTS {table_name} ASstatement - Next, using the
INSERT INTO {table_name}statement for the case where the training dataset already exists
- First, using the
-
Add a
SNAPSHOT BETWEEN $from_snapshot_id AND $to_snapshot_idstatement to limit the query to event data that was added within a specified interval. The$prefix on the snapshot IDs indicates that thy are variables that will be passed in when the query template is executed.
Applying those changes results in the following query:
| code language-python |
|---|
|
Finally, the following code saves the query template in Data Distiller:
template_res = dd.createQueryTemplate({
"sql": query_training_set_template,
"queryParameters": {},
"name": "Template for propensity training data"
})
template_id = template_res["id"]
print(f"Template for propensity training data created as ID {template_id}")
示例输出
Template for propensity training data created as ID f3d1ec6b-40c2-4d13-93b6-734c1b3c7235
With the template saved, you can execute the query at any time by referencing the template ID and specify the range of snapshot IDs that should be included in the query. The following query retrieves the snapshots of the original Experience Events dataset:
query_snapshots = f"""
SELECT snapshot_id
FROM (
SELECT history_meta('{table_name}')
)
WHERE is_current = true OR snapshot_generation = 0
ORDER BY snapshot_generation ASC
"""
df_snapshots = dd_cursor.query(query_snapshots, output="dataframe")
The following code demonstrates execution of the query template, using the first and last snapshots to query the entire dataset:
snapshot_start_id = str(df_snapshots["snapshot_id"].iloc[0])
snapshot_end_id = str(df_snapshots["snapshot_id"].iloc[1])
query_final_res = qs.postQueries(
name=f"[CMLE][Week2] Query to generate training data created by {username}",
templateId=template_id,
queryParameters={
"from_snapshot_id": snapshot_start_id,
"to_snapshot_id": snapshot_end_id,
},
dbname=f"{cat_conn.sandbox}:all"
)
query_final_id = query_final_res["id"]
print(f"Query started successfully and got assigned ID {query_final_id} - it will take some time to execute")
示例输出
Query started successfully and got assigned ID c6ea5009-1315-4839-b072-089ae01e74fd - it will take some time to execute
You can define the following function to periodically check the status of the query:
def wait_for_query_completion(query_id):
while True:
query_info = qs.getQuery(query_id)
query_state = query_info["state"]
if query_state in ["SUCCESS", "FAILED"]:
break
print("Query is still in progress, sleeping…")
time.sleep(60)
duration_secs = query_info["elapsedTime"] / 1000
if query_state == "SUCCESS":
print(f"Query completed successfully in {duration_secs} seconds")
else:
print(f"Query failed with the following errors:", file=sys.stderr)
for error in query_info["errors"]:
print(f"Error code {error['code']}: {error['message']}", file=sys.stderr)
wait_for_query_completion(query_final_id)
示例输出
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query is still in progress, sleeping…
Query completed successfully in 473.8 seconds
Next steps:
By reading this document you have learned how to transform data in Adobe Experience Platform into features, or variables, that can be consumed by a machine learning model. The next step in creating feature pipelines from Experience Platform to feed custom models in your machine learning environment is to export feature datasets.