使用Flow Service API为电子商务源创建数据流
创建对象:
- 开发人员
本教程介绍了从电子商务源检索数据以及使用Flow Service API将这些数据引入Experience Platform的步骤。
- 要创建数据流,您必须已具有包含电子商务源的有效基本连接ID。 如果您没有此ID,请查看源概述,以了解可创建基础连接的电子商务源的列表。
- 要让Experience Platform摄取数据,必须将所有基于表的批处理源的时区配置为UTC时区。
快速入门
本教程要求您实际了解Adobe Experience Platform的以下组件:
-
Experience Data Model (XDM) System: Experience Platform用于组织客户体验数据的标准化框架。
-
Catalog Service:目录是Experience Platform内数据位置和历程的记录系统。
-
Batch ingestion:批量摄取API允许您将数据作为批处理文件摄取到Experience Platform。
-
Sandboxes: Experience Platform提供了将单个Experience Platform实例划分为多个单独的虚拟环境的虚拟沙箱,以帮助开发和改进数字体验应用程序。
使用Experience Platform API
有关如何成功调用Experience Platform API的信息,请参阅Experience Platform API快速入门指南。
创建源连接
您可以通过向Flow Service API发出POST请求来创建源连接。 源连接由连接ID、源数据文件的路径以及连接规范ID组成。
要创建源连接,还必须为数据格式属性定义一个枚举值。
为基于文件的连接器使用以下枚举值:
数据格式 | 枚举值 |
---|---|
已分隔 | delimited |
JSON | json |
Parquet | parquet |
对于所有基于表的连接器,将该值设置为tabular
。
API格式
POST /sourceConnections
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Shopify source connection",
"baseConnectionId": "582f4f8d-71e9-4a5c-a164-9d2056318d6c",
"description": "Shopify source connection",
"data": {
"format": "tabular"
},
"params": {
"tableName": "Shopify.Orders",
"columns": [
{
"name": "Email",
"type": "string"
},
{
"name": "Phone",
"type": "string"
},
]
},
"connectionSpec": {
"id": "4f63aa36-bd48-4e33-bb83-49fbcd11c708",
"version": "1.0"
}
}'
属性 | 描述 |
---|---|
baseConnectionId | 电子商务源的连接ID。 |
params.path | 源文件的路径。 |
connectionSpec.id | 电子商务来源的连接规范ID。 |
响应
成功的响应返回新创建的源连接的唯一标识符(id
)。 在后续步骤中,创建目标连接时需要此ID。
{
"id": "c278ab14-acdf-440b-b67f-1265d15a7655",
"etag": "\"10007c3f-0000-0200-0000-5fa9be720000\""
}
创建目标XDM架构
为了在Experience Platform中使用源数据,必须创建目标架构,以根据您的需求构建源数据。 然后,使用目标架构创建包含源数据的Experience Platform数据集。
通过对架构注册表API执行POST请求,可以创建目标XDM架构。
有关如何创建目标XDM架构的详细步骤,请参阅有关使用API 创建架构的教程。
创建目标数据集
通过向目录服务API执行POST请求,在有效负载中提供目标架构的ID,可以创建目标数据集。
有关如何创建目标数据集的详细步骤,请参阅有关使用API创建数据集的教程。
创建目标连接
目标连接表示与所摄取数据所登陆的目标之间的连接。 要创建目标连接,您必须提供与数据湖关联的固定连接规范ID。 此连接规范ID为: c604ff05-7f1a-43c0-8e18-33bf874cb11c
。
现在,您拥有目标架构、目标数据集以及到数据湖的连接规范ID的唯一标识符。 使用Flow Service API,您可以通过指定这些标识符以及将包含入站源数据的数据集来创建目标连接。
API格式
POST /targetConnections
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Shopify target connection",
"description": "Shopify target connection",
"data": {
"format": "parquet_xdm",
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/854ddc36ad2c7bd001f66a4392575ed4004f81883328772f",
"version": "application/vnd.adobe.xed-full-notext+json; version=1"
}
},
"params": {
"dataSetId": "5fa9c083de62e418dd170b42"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
。data.schema.version
application/vnd.adobe.xed-full+json;version=1
,这将返回架构的最新次版本。params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
。响应
成功的响应返回新目标连接的唯一标识符(id
)。 在后续步骤中需要使用此值来创建数据流。
{
"id": "6c0ba537-a96b-4d74-8c95-450eb88baee8",
"etag": "\"00005506-0000-0200-0000-5fa9c13c0000\""
}
创建映射
要将源数据摄取到目标数据集中,必须首先将其映射到目标数据集所遵循的目标架构。
要创建映射集,请在提供目标XDM架构$id
和要创建的映射集的详细信息时,向Data Prep API的mappingSets
端点发出POST请求。
API格式
POST /mappingSets
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/854ddc36ad2c7bd001f66a4392575ed4004f81883328772f",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "personalEmail.address",
"sourceAttribute": "Email",
"identity": false,
"version": 0
},
{
"destinationXdmPath": "mobilePhone.number",
"sourceAttribute": "Shipping_Address_Phone",
"identity": false,
"version": 0
}
]
}'
xdmSchema
$id
。响应
成功的响应返回新创建的映射的详细信息,包括其唯一标识符(id
)。 此ID是稍后步骤创建数据流所必需的。
{
"id": "22922102bffd4369b6209c102a604062",
"version": 0,
"createdDate": 1604960750613,
"modifiedDate": 1604960750613,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
查找数据流规范
数据流负责从源中收集数据并将它们引入Experience Platform。 要创建数据流,您必须首先通过向Flow Service API执行GET请求来获取数据流规范。 数据流规范负责从电子商务源收集数据。
API格式
GET /flowSpecs?property=name=="CRMToAEP"
请求
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
响应
成功的响应将返回数据流规范的详细信息,该规范负责将数据从源引入Experience Platform。 响应包括创建新数据流所需的唯一流规范id
。
{
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"name": "CRMToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"attributes": {
"isSourceFlow": true,
"flacValidationSupported": true,
"frequency": "batch",
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"sourceConnectionSpecIds": [
"3416976c-a9ca-4bba-901a-1f08f66978ff",
"38ad80fe-8b06-4938-94f4-d4ee80266b07",
"d771e9c1-4f26-40dc-8617-ce58c4b53702",
"3c9b37f8-13a6-43d8-bad3-b863b941fedd",
"cc6a4487-9e91-433e-a3a3-9cf6626c1806",
"3000eb99-cd47-43f3-827c-43caf170f015",
"26d738e0-8963-47ea-aadf-c60de735468a",
"74a1c565-4e59-48d7-9d67-7c03b8a13137",
"cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
"4f63aa36-bd48-4e33-bb83-49fbcd11c708",
"cb66ab34-8619-49cb-96d1-39b37ede86ea",
"eb13cb25-47ab-407f-ba89-c0125281c563",
"1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
"37b6bf40-d318-4655-90be-5cd6f65d334b",
"a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
"221c7626-58f6-4eec-8ee2-042b0226f03b",
"a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
"6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
"aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
"8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
"ecde33f2-c56f-46cc-bdea-ad151c16cd69",
"102706fb-a5cd-42ee-afe0-bc42f017ff43",
"09182899-b429-40c9-a15a-bf3ddbc8ced7",
"0479cc14-7651-4354-b233-7480606c2ac3",
"d6b52d86-f0f8-475f-89d4-ce54c8527328",
"a8f4d393-1a6b-43f3-931f-91a16ed857f4",
"1fe283f6-9bec-11ea-bb37-0242ac130002",
"fcad62f3-09b0-41d3-be11-449d5a621b69",
"ea1c2a08-b722-11eb-8529-0242ac130003",
"35d6c4d8-c9a9-11eb-b8bc-0242ac130003",
"ff4274f2-c9a9-11eb-b8bc-0242ac130003",
"ba5126ec-c9ac-11eb-b8bc-0242ac130003",
"b2e08744-4f1a-40ce-af30-7abac3e23cf3",
"929e4450-0237-4ed2-9404-b7e1e0a00309",
"2acf109f-9b66-4d5e-bc18-ebb2adcff8d5",
"2fa8af9c-2d1a-43ea-a253-f00a00c74412"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
},
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"transformationSpec": [
{
"name": "Copy",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"deltaColumn": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"dateFormat": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"required": [
"deltaColumn"
]
}
},
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"runSpec": {
"name": "ProviderParams",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for creating flow run.",
"properties": {
"startTime": {
"type": "integer",
"description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
},
"windowStartTime": {
"type": "integer",
"description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"windowEndTime": {
"type": "integer",
"description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"deltaColumn": {
"type": "object",
"description": "The delta column is required to partition the data and separate newly ingested data from historic data.",
"properties": {
"name": {
"type": "string"
},
"dateFormat": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"required": [
"startTime",
"windowStartTime",
"windowEndTime",
"deltaColumn"
]
}
}
}
创建数据流
收集数据的最后一步是创建数据流。 此时,您应该准备以下所需的值:
数据流负责从源中计划和收集数据。 您可以通过在请求有效载荷中提供上述值时执行POST请求来创建数据流。
要计划摄取,您必须先将开始时间值设置为纪元时间(以秒为单位)。 然后,必须将频率值设置为五个选项之一: once
、minute
、hour
、day
或week
。 间隔值用于指定两次连续摄取之间的时间段,创建一次性摄取不需要设置间隔。 对于所有其他频率,间隔值必须设置为等于或大于15
。
API格式
POST /flows
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Test Shopify dataflow",
"description": "Shopify With mapping ingestion",
"flowSpec": {
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"version": "1.0"
},
"sourceConnectionIds": [
"c278ab14-acdf-440b-b67f-1265d15a7655"
],
"targetConnectionIds": [
"6c0ba537-a96b-4d74-8c95-450eb88baee8"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "22922102bffd4369b6209c102a604062",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1604961070",
"frequency": "once"
}
}'
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
frequency
。 可接受的值包括: once
、minute
、hour
、day
或week
。scheduleParams.interval
间隔指定两次连续流运行之间的周期。 间隔的值应为非零整数。 每个频率的最小接受间隔值如下:
- 一次:不适用
- 分钟: 15
- 小时: 1
- 天: 1
- 周: 1
响应
成功的响应返回新创建的数据流的ID id
。
{
"id": "20c115bc-46e3-40f3-bfe9-fb25abe4ba76",
"etag": "\"030018cb-0000-0200-0000-5fa9c31a0000\""
}