使用Flow Service API为云存储源创建数据流
本教程介绍从云存储源检索数据以及使用Flow Service API将数据引入平台的步骤。
快速入门
本教程要求您实际了解Adobe Experience Platform的以下组件:
-
Experience Data Model (XDM) System:Experience Platform用于组织客户体验数据的标准化框架。
- 架构组合的基础知识:了解XDM架构的基本构建块,包括架构组合中的关键原则和最佳实践。
- 架构注册表开发人员指南:包含成功执行对架构注册表API的调用所需了解的重要信息。 这包括您的
{TENANT_ID}
、“容器”的概念以及发出请求所需的标头(请特别注意“接受”标头及其可能的值)。
-
Catalog Service:目录是Experience Platform中数据位置和历程的记录系统。
-
Batch ingestion:批量摄取API允许您将数据作为批处理文件摄取到Experience Platform中。
-
沙盒:Experience Platform提供了将单个Platform实例划分为多个单独的虚拟环境的虚拟沙盒,以帮助开发和改进数字体验应用程序。
使用平台API
有关如何成功调用平台API的信息,请参阅平台API快速入门指南。
创建源连接 source
您可以在提供基本连接ID、要摄取的源文件的路径以及源对应的连接规范ID的同时,通过向Flow Service API的sourceConnections
端点发出POST请求来创建源连接。
创建源连接时,还必须为数据格式属性定义一个枚举值。
为基于文件的源使用以下枚举值:
delimited
json
parquet
对于所有基于表的源,将该值设置为tabular
。
API格式
POST /sourceConnections
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}",
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
baseConnectionId
data.format
delimited
、JSON
和parquet
。data.properties
data.properties.columnDelimiter
,
)作为默认值。 注意: columnDelimiter
属性只能在引入分隔文件时使用。data.properties.encoding
UTF-8
和ISO-8859-1
。 注意: encoding
参数仅在摄取分隔的CSV文件时可用。 将使用默认编码UTF-8
摄取其他文件类型。data.properties.compressionType
bzip2
、gzip
、deflate
、zipDeflate
、tarGzip
和tar
。 注意: compressionType
属性只能在引入分隔文件或JSON文件时使用。params.path
/acme/summerCampaign/*.csv
将摄取整个/acme/summerCampaign/
文件夹。params.type
file
摄取单个文件,使用类型folder
摄取整个文件夹。connectionSpec.id
响应
成功的响应返回新创建的源连接的唯一标识符(id
)。 此ID是稍后步骤创建数据流所必需的。
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
使用正则表达式选择要摄取的特定文件集 regex
在创建源连接时,可以使用正则表达式将源中的一组特定文件摄取到Platform。
API格式
POST /sourceConnections
请求
在以下示例中,文件路径中使用正则表达式以指定摄取名称中包含premium
的所有CSV文件。
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/*premium*.csv",
"type": "folder"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
配置源连接以递归方式摄取数据
创建源连接时,可以使用recursive
参数从深度嵌套的文件夹中摄取数据。
API格式
POST /sourceConnections
请求
在以下示例中,recursive: true
参数通知Flow Service在摄取过程中以递归方式读取所有子文件夹。
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source with recursive ingestion",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/customers/premium/buyers/recursive",
"type": "folder",
"recursive": true
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
创建目标XDM架构 target-schema
为了在Platform中使用源数据,必须创建目标架构,以根据您的需求构建源数据。 然后,使用目标架构创建包含源数据的Platform数据集。
通过向架构注册表API执行POST请求,可以创建目标XDM架构。
有关如何创建目标XDM架构的详细步骤,请参阅有关使用API 创建架构的教程。
创建目标数据集 target-dataset
可以通过向目录服务API执行POST请求,在有效负载中提供目标架构的ID来创建目标数据集。
有关如何创建目标数据集的详细步骤,请参阅有关使用API创建数据集的教程。
创建目标连接 target-connection
目标连接表示与所摄取数据所登陆的目标之间的连接。 要创建目标连接,您必须提供与数据湖关联的固定连接规范ID。 此连接规范ID为: c604ff05-7f1a-43c0-8e18-33bf874cb11c
。
现在,您拥有目标架构、目标数据集以及到数据湖的连接规范ID。 使用这些标识符,您可以使用Flow Service API创建目标连接,以指定将包含入站源数据的数据集。
API格式
POST /targetConnections
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
。data.schema.version
application/vnd.adobe.xed-full+json;version=1
,这将返回架构的最新次版本。params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
。响应
成功的响应返回新目标连接的唯一标识符(id
)。 此ID在后续步骤中是必需的。
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
创建映射 mapping
要将源数据摄取到目标数据集中,必须首先将其映射到目标数据集所遵循的目标架构。
要创建映射集,请在提供目标XDM架构$id
和要创建的映射集的详细信息时,向Data Prep API的mappingSets
端点发出POST请求。
API格式
POST /conversion/mappingSets
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
xdmSchema
响应
成功的响应返回新创建的映射的详细信息,包括其唯一标识符(id
)。 在后续步骤中需要使用此值来创建数据流。
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
检索数据流规范 specs
数据流负责从源收集数据并将这些数据导入Platform。 要创建数据流,您必须首先获取负责收集云存储数据的数据流规范。
API格式
GET /flowSpecs?property=name=="CloudStorageToAEP"
请求
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
响应
成功的响应将返回数据流规范的详细信息,该规范负责将数据从源引入Platform。 响应包括创建新数据流所需的唯一流规范id
。
code language-json |
---|
|
创建数据流
收集云存储数据的最后一步是创建数据流。 现在,您已准备以下必需值:
数据流负责从源中计划和收集数据。 您可以通过在有效负载中提供上述值时执行POST请求来创建数据流。
要计划摄取,您必须先将开始时间值设置为纪元时间(以秒为单位)。 然后,必须将频率值设置为五个选项之一: once
、minute
、hour
、day
或week
。 间隔值用于指定两次连续摄取之间的时间段,创建一次性摄取不需要设置间隔。 对于所有其他频率,间隔值必须设置为等于或大于15
。
API格式
POST /flows
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Platform",
"description": "Cloud Storage flow to Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
once
、minute
、hour
、day
或week
。scheduleParams.interval
间隔指定两次连续流运行之间的周期。 间隔的值应为非零整数。 每个频率的最小接受间隔值如下:
- 一次:不适用
- 分钟: 15
- 小时: 1
- 天: 1
- 周: 1
响应
成功的响应返回新创建的数据流的ID (id
)。
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
监测数据流
创建数据流后,您可以监视通过它摄取的数据,以查看有关流运行、完成状态和错误的信息。 有关如何监视数据流的详细信息,请参阅有关API中监视数据流的教程
后续步骤
在本教程之后,您已创建一个源连接器,以按计划从云存储中收集数据。 传入数据现在可供下游平台服务(如Real-Time Customer Profile和Data Science Workspace)使用。 有关更多详细信息,请参阅以下文档:
附录 appendix
以下部分列出了不同的云存储源连接器及其连接规范。
连接规范
ecadc60c-7455-4d87-84dc-2a0e293d997b
86043421-563b-46ec-8e6c-e23184711bf6
4c10e202-c428-4796-9208-5f1f5732b1cf
b3ba5556-48be-44b7-8b85-ff2b69b46dc4
bf9f5905-92b7-48bf-bf20-455bc6b60a4e
be5ec48c-5b78-49d5-b8fa-7c89ec4569b8
32e8f412-cdf7-464c-9885-78184cb113fd
54e221aa-d342-4707-bcff-7a4bceef0001
c85f9425-fb21-426c-ad0b-405e9bd8a46c
bf367b0d-3d9b-4060-b67b-0d3d9bd06094