本教程介绍了以下步骤:从流源连接器检索原始数据,并使用来Experience Platform它们 Flow Service API.
本教程要求您实际了解Adobe Experience Platform的以下组件:
{TENANT_ID}
、“容器”的概念以及发出请求所需的标头(请特别注意“接受”标头及其可能的值)。有关如何成功调用Platform API的信息,请参阅 Platform API快速入门.
本教程还要求您具有适用于流连接器的有效源连接ID。 如果您没有此信息,请先参阅以下有关创建流源连接的教程,然后再尝试本教程:
为了在Platform中使用源数据,必须创建目标架构,以根据您的需求构建源数据。 然后,使用目标架构创建包含源数据的Platform数据集。 此目标XDM架构还扩展了XDM Individual Profile 类。
POST要创建目标XDM架构,请向 /schemas
的端点 Schema Registry API.
API格式
POST /tenant/schemas
请求
以下示例请求创建一个扩展XDM的XDM架构 Individual Profile 类。
curl -X POST \
'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"type": "object",
"title": "Sample schema for a streaming connector",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
}
],
"meta:containerId": "tenant",
"meta:resourceType": "schemas",
"meta:xdmType": "object",
"meta:class": "https://ns.adobe.com/xdm/context/profile"
}'
响应
成功的响应会返回新创建架构的详细信息,包括其唯一标识符($id
)。 在后续步骤中,创建目标数据集、映射和数据流时需要此ID。
{
"$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:altId": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:resourceType": "schemas",
"version": "1.0",
"title": "Sample schema for a streaming connector",
"type": "object",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
"type": "object",
"meta:xdmType": "object"
}
],
"refs": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/context/profile"
],
"imsOrg": "{ORG_ID}",
"meta:extensible": false,
"meta:abstract": false,
"meta:extends": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/common/auditable",
"https://ns.adobe.com/xdm/data/record",
"https://ns.adobe.com/xdm/context/profile"
],
"meta:xdmType": "object",
"meta:registryMetadata": {
"repo:createdDate": 1604960074752,
"repo:lastModifiedDate": 1604960074752,
"xdm:createdClientId": "{CREATED_CLIENT_ID}",
"xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
"xdm:createdUserId": "{CREATED_USER_ID}",
"xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
"eTag": "8522a151effd974429518ed90c3eaf6efc9bf6ffb6644087a85c6d4455dcd045",
"meta:globalLibVersion": "1.16.1"
},
"meta:class": "https://ns.adobe.com/xdm/context/profile",
"meta:containerId": "tenant",
"meta:sandboxId": "{SANDBOX_ID}",
"meta:sandboxType": "production",
"meta:tenantNamespace": "_{TENANT_ID}"
}
创建了目标XDM架构,该架构具有 $id
您现在可以创建目标数据集以包含源数据。 POST要创建目标数据集,请向 dataSets
的端点 目录服务API,同时在有效负载中提供目标架构的ID。
API格式
POST /catalog/dataSets
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Test streaming dataset",
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"contentType": "application/vnd.adobe.xed-full-notext+json; version=1"
},
"tags": {
"identity": [
"enabled:true"
],
"profile": [
"enabled:true"
]
}
}'
属性 | 描述 |
---|---|
name |
要创建的数据集的名称。 |
schemaRef.id |
URI $id 对于XDM架构,数据集将基于该架构。 |
schemaRef.contentType |
架构的版本。 此值必须设置为 application/vnd.adobe.xed-full-notext+json;version=1 ,返回架构的最新次版本。 请参阅以下部分 架构版本控制 有关更多信息,请参阅XDM API指南。 |
响应
成功的响应会返回一个数组,该数组以格式包含新创建的数据集的ID "@/datasets/{DATASET_ID}"
. 数据集ID是系统生成的只读字符串,用于在API调用中引用数据集。 在后续步骤中,需要目标数据集ID才能创建目标连接和数据流。
[
"@/dataSets/5f7187bac6d00f194fb937c0"
]
Target连接可创建并管理到Platform的目标连接或传输的数据将要到达的任何位置。 目标连接包含有关创建数据流所需的数据目标、数据格式和目标连接ID的信息。 Target连接实例特定于租户和组织。
POST要创建Target连接,请向 /targetConnections
的端点 Flow Service API。 作为请求的一部分,您必须提供数据格式 dataSetId
在上一步中检索,并且与关联的固定连接规范ID Data Lake. 此ID为 c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
API格式
POST /targetConnections
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming target connection",
"description": "Streaming target connection",
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
},
"data": {
"format": "parquet_xdm",
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f7187bac6d00f194fb937c0"
}
}'
属性 | 描述 |
---|---|
data.format |
您带入数据湖的数据的指定格式。 |
params.dataSetId |
上一步中生成的目标数据集的ID。 注意:创建目标连接时,必须提供有效的数据集ID。 无效的数据集ID将导致错误。 |
connectionSpec.id |
用于连接到数据湖的连接规范ID。 此ID为: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
响应
成功的响应将返回新目标连接的唯一标识符(id
)。 此ID在后续步骤中是必需的。
{
"id": "d9300194-6a82-4163-b001-946a821163b8",
"etag": "\"4006d3e4-0000-0200-0000-5f7189220000\""
}
要将源数据摄取到目标数据集中,必须首先将其映射到目标数据集所遵循的目标架构。
POST要创建映射集,请向 mappingSets
的端点 Data Prep API 提供目标XDM架构时 $id
以及要创建的映射集的详细信息。
API格式
POST /mappingSets
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"xdmVersion": "1.0",
"mappings": [
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "firstName",
"identity": false,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "lastName",
"identity": false,
"version": 0
}
]
}'
属性 | 描述 |
---|---|
xdmSchema |
此 $id 目标XDM架构的。 |
响应
成功响应将返回新创建映射的详细信息,包括其唯一标识符(id
)。 此ID是稍后步骤创建数据流所必需的。
{
"id": "380b032b445a46008e77585e046efe5e",
"version": 0,
"createdDate": 1604960750613,
"modifiedDate": 1604960750613,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
数据流负责从源收集数据并将这些数据导入Platform。 GET要创建数据流,您必须首先通过对 Flow Service API。
API格式
GET /flowSpecs
请求
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
响应
成功的响应将返回数据流规范列表。 使用任意创建数据流时需要检索的数据流规范ID Amazon Kinesis, Azure Event Hubs,或 Google PubSub,是 d69717ba-71b4-4313-b654-49f9cf126d7a
.
{
"items": [
{
"id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
"name": "Stream data with optional transformation",
"providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
"version": "1.0",
"sourceConnectionSpecIds": [
"bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
"bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
"86043421-563b-46ec-8e6c-e23184711bf6",
"70116022-a743-464a-bbfe-e226a7f8210c"
],
"targetConnectionSpecIds": [
"bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
"c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"db4fe783-ef79-4a12-bda9-32b2b1bc3b2c"
],
"transformationSpecs": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
}
}
}
}
],
"attributes": {
"uiAttributes": {
"apiFeatures": {
"flowRunsSupported": false
}
}
},
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"write"
]
}
]
}
},
]
}
收集流数据的最后一步是创建数据流。 现在,您已准备以下必需值:
数据流负责从源中计划和收集数据。 您可以通过在有效负载中提供上述值时执行POST请求来创建数据流。
API格式
POST /flows
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming dataflow",
"description": "Streaming dataflow",
"flowSpec": {
"id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
"version": "1.0"
},
"sourceConnectionIds": [
"e96d6135-4b50-446e-922c-6dd66672b6b2"
],
"targetConnectionIds": [
"723222e2-6ab9-4b0b-b222-e26ab9bb0bc2"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "380b032b445a46008e77585e046efe5e",
"mappingVersion": 0
}
}
]
}'
属性 | 描述 |
---|---|
flowSpec.id |
此 流量规范ID 在上一步中检索。 |
sourceConnectionIds |
此 源连接ID 在之前的步骤中检索。 |
targetConnectionIds |
此 目标连接Id 在之前的步骤中检索。 |
transformations.params.mappingId |
此 映射ID 在之前的步骤中检索。 |
响应
成功的响应会返回ID (id
)。
{
"id": "1f086c23-2ea8-4d06-886c-232ea8bd061d",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
通过阅读本教程,您已创建一个数据流以从流连接器收集流数据。 传入数据现在可供下游平台服务使用,例如 Real-Time Customer Profile 和 Data Science Workspace. 有关更多详细信息,请参阅以下文档: