本教程涵盖以下步骤:从支付源检索数据,并使用将这些数据导入Platform Flow Service API.
要创建数据流,您必须已具有带支付源的有效基本连接ID。 如果您没有此ID,请参阅 源概述 用于创建基本连接的支付来源列表。
本教程要求您实际了解Adobe Experience Platform的以下组件:
{TENANT_ID}
、“容器”的概念以及发出请求所需的标头(请特别注意“接受”标头及其可能的值)。有关如何成功调用Platform API的信息,请参阅 Platform API快速入门.
您可以通过向以下对象发出POST请求来创建源连接: Flow Service API。 源连接由连接ID、源数据文件的路径以及连接规范ID组成。
要创建源连接,还必须为数据格式属性定义一个枚举值。
为基于文件的连接器使用以下枚举值:
数据格式 | 枚举值 |
---|---|
已分隔 | delimited |
JSON | json |
Parquet | parquet |
对于所有基于表的连接器,将该值设置为 tabular
.
API格式
POST /sourceConnections
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "PayPal source connection",
"baseConnectionId": "24151d58-ffa7-4960-951d-58ffa7396097",
"description": "PayPal source connection",
"data": {
"format": "tabular",
}
},
"params": {
"tableName": "PayPal.Catalog_Products",
"columns": [
{
"name": "Product_Id",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Product_Name",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Description",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Create_Time",
"type": "string",
"meta:xdmType": "date-time",
"xdm": {
"type": "string",
"format": "date-time"
}
}
]
},
"connectionSpec": {
"id": "221c7626-58f6-4eec-8ee2-042b0226f03b",
"version": "1.0"
}
}'
属性 | 描述 |
---|---|
baseConnectionId |
您正在访问的第三方支付应用程序的唯一连接ID。 |
params.path |
源文件的路径。 |
connectionSpec.id |
您的支付应用程序的连接规范ID。 |
响应
成功的响应将返回唯一标识符(id
)。 在后续步骤中,创建目标连接时需要此ID。
{
"id": "2c48a152-3d49-43cb-88a1-523d49e3cbcb",
"etag": "\"8000c843-0000-0200-0000-5e8917ea0000\""
}
为了在Platform中使用源数据,必须创建目标架构,以根据您的需求构建源数据。 然后,使用目标架构创建包含源数据的Platform数据集。
可以通过向以下对象执行POST请求来创建目标XDM架构 架构注册表API.
有关如何创建目标XDM架构的详细步骤,请参阅关于的教程 使用API创建架构.
可以通过向执行POST请求来创建目标数据集 目录服务API,在有效负载中提供目标架构的ID。
有关如何创建目标数据集的详细步骤,请参阅关于的教程 使用API创建数据集.
目标连接表示与所摄取数据所登陆的目标之间的连接。 要创建目标连接,您必须提供与数据湖关联的固定连接规范ID。 此连接规范ID为: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
现在,您拥有目标架构、目标数据集以及到数据湖的连接规范ID的唯一标识符。 使用 Flow Service API之外,您还可以通过指定这些标识符以及将包含入站源数据的数据集来创建目标连接。
API格式
POST /targetConnections
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for payments",
"description": "Target Connection for payments",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/14d89c5bb88e2ff488f23db896be469e7e30bb166bda8722",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5e8918669cbbee18ad9771f3"
},
"connectionSpec": {
"id": "221c7626-58f6-4eec-8ee2-042b0226f03b",
"version": "1.0"
}
}'
属性 | 描述 |
---|---|
data.schema.id |
此 $id 目标XDM架构的。 |
data.schema.version |
架构的版本。 必须设置此值 application/vnd.adobe.xed-full+json;version=1 ,返回架构的最新次版本。 |
params.dataSetId |
上一步中生成的目标数据集的ID。 注意:创建目标连接时,必须提供有效的数据集ID。 无效的数据集ID将导致错误。 |
connectionSpec.id |
用于连接到数据湖的连接规范ID。 此ID为: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
响应
成功的响应将返回新目标连接的唯一标识符(id
)。 在后续步骤中需要使用此值来创建数据流。
{
"id": "c8e12917-ac33-44d7-a129-17ac3364d7b7",
"etag": "\"0f015874-0000-0200-0000-5e8918e60000\""
}
要将源数据摄取到目标数据集中,必须首先将其映射到目标数据集所遵循的目标架构。
POST要创建映射集,请向 mappingSets
的端点 Data Prep API 提供目标XDM架构时 $id
以及要创建的映射集的详细信息。
API格式
POST /mappingSets
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/14d89c5bb88e2ff488f23db896be469e7e30bb166bda8722",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Product_Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "product.name",
"sourceAttribute": "Product_Name",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "product.description",
"sourceAttribute": "Description",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "product.type",
"sourceAttribute": "Type",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
属性 | 描述 |
---|---|
xdmSchema |
此 $id 目标XDM架构的。 |
响应
成功响应将返回新创建映射的详细信息,包括其唯一标识符(id
)。 此ID是稍后步骤创建数据流所必需的。
{
"id": "b54a8dc38e8d4e31a2dc096e413ae8e5",
"version": 0,
"createdDate": 1586043319604,
"modifiedDate": 1586043319604,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
数据流负责从源收集数据并将这些数据导入Platform。 GET要创建数据流,您必须首先通过对 Flow Service API。 数据流规范负责从外部数据库或NoSQL系统中收集数据。
API格式
GET /flowSpecs?property=name=="CRMToAEP"
请求
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
响应
成功的响应将返回数据流规范的详细信息,该规范负责将数据从源引入Platform。 响应中包含唯一的流量规范 id
需要才能创建新数据流。
为简单起见,以下JSON响应有效负载已隐藏。 选择“payload”(有效负载)可查看响应有效负载。
{
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"name": "CRMToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"attributes": {
"isSourceFlow": true,
"flacValidationSupported": true,
"frequency": "batch",
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"sourceConnectionSpecIds": [
"3416976c-a9ca-4bba-901a-1f08f66978ff",
"38ad80fe-8b06-4938-94f4-d4ee80266b07",
"d771e9c1-4f26-40dc-8617-ce58c4b53702",
"3c9b37f8-13a6-43d8-bad3-b863b941fedd",
"cc6a4487-9e91-433e-a3a3-9cf6626c1806",
"3000eb99-cd47-43f3-827c-43caf170f015",
"26d738e0-8963-47ea-aadf-c60de735468a",
"74a1c565-4e59-48d7-9d67-7c03b8a13137",
"cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
"4f63aa36-bd48-4e33-bb83-49fbcd11c708",
"cb66ab34-8619-49cb-96d1-39b37ede86ea",
"eb13cb25-47ab-407f-ba89-c0125281c563",
"1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
"37b6bf40-d318-4655-90be-5cd6f65d334b",
"a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
"221c7626-58f6-4eec-8ee2-042b0226f03b",
"a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
"6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
"aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
"8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
"ecde33f2-c56f-46cc-bdea-ad151c16cd69",
"102706fb-a5cd-42ee-afe0-bc42f017ff43",
"09182899-b429-40c9-a15a-bf3ddbc8ced7",
"0479cc14-7651-4354-b233-7480606c2ac3",
"d6b52d86-f0f8-475f-89d4-ce54c8527328",
"a8f4d393-1a6b-43f3-931f-91a16ed857f4",
"1fe283f6-9bec-11ea-bb37-0242ac130002",
"fcad62f3-09b0-41d3-be11-449d5a621b69",
"ea1c2a08-b722-11eb-8529-0242ac130003",
"35d6c4d8-c9a9-11eb-b8bc-0242ac130003",
"ff4274f2-c9a9-11eb-b8bc-0242ac130003",
"ba5126ec-c9ac-11eb-b8bc-0242ac130003",
"b2e08744-4f1a-40ce-af30-7abac3e23cf3",
"929e4450-0237-4ed2-9404-b7e1e0a00309",
"2acf109f-9b66-4d5e-bc18-ebb2adcff8d5",
"2fa8af9c-2d1a-43ea-a253-f00a00c74412"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
},
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"transformationSpec": [
{
"name": "Copy",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"deltaColumn": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"dateFormat": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"required": [
"deltaColumn"
]
}
},
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"runSpec": {
"name": "ProviderParams",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for creating flow run.",
"properties": {
"startTime": {
"type": "integer",
"description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
},
"windowStartTime": {
"type": "integer",
"description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"windowEndTime": {
"type": "integer",
"description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"deltaColumn": {
"type": "object",
"description": "The delta column is required to partition the data and separate newly ingested data from historic data.",
"properties": {
"name": {
"type": "string"
},
"dateFormat": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"required": [
"startTime",
"windowStartTime",
"windowEndTime",
"deltaColumn"
]
}
}
}
收集数据的最后一步是创建数据流。 此时,您应该准备以下所需的值:
数据流负责从源中计划和收集数据。 您可以通过在有效负载中提供上述值时执行POST请求来创建数据流。
要计划摄取,您必须先将开始时间值设置为纪元时间(以秒为单位)。 然后,必须将频率值设置为五个选项之一: once
, minute
, hour
, day
,或 week
. 间隔值用于指定两次连续摄取之间的时间段,创建一次性摄取不需要设置间隔。 对于所有其他频率,间隔值必须设置为等于或大于 15
.
API格式
POST /flows
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Dataflow between payments Platform",
"description": "Inbound data to Platform",
"flowSpec": {
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"version": "1.0"
},
"sourceConnectionIds": [
"2c48a152-3d49-43cb-88a1-523d49e3cbcb"
],
"targetConnectionIds": [
"c8e12917-ac33-44d7-a129-17ac3364d7b7"
],
"transformations": [
{
"name": "Copy",
"params": {
"deltaColumn": {
"name": "updatedAt",
"dateFormat": "YYYY-MM-DD",
"timezone": "UTC"
}
}
},
{
"name": "Mapping",
"params": {
"mappingId": "b54a8dc38e8d4e31a2dc096e413ae8e5",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1567411548",
"frequency": "minute",
"interval":"30"
}
}'
属性 | 描述 |
---|---|
flowSpec.id |
此 流量规范ID 在上一步中检索。 |
sourceConnectionIds |
此 源连接ID 在之前的步骤中检索。 |
targetConnectionIds |
此 目标连接Id 在之前的步骤中检索。 |
transformations.params.mappingId |
此 映射ID 在之前的步骤中检索。 |
transformations.params.deltaColum |
用于区分新数据和现有数据的指定列。 将根据选定列的时间戳摄取增量数据。 支持的日期格式 deltaColumn 是 yyyy-MM-dd HH:mm:ss . |
transformations.params.mappingId |
与数据库关联的映射ID。 |
scheduleParams.startTime |
以纪元时间表示的数据流开始时间。 |
scheduleParams.frequency |
数据流收集数据的频率。 可接受的值包括: once , minute , hour , day ,或 week . |
scheduleParams.interval |
间隔指定两次连续流运行之间的周期。 间隔的值应为非零整数。 当频率设置为时,不需要间隔 once 并且应大于或等于 15 其他频率值。 |
响应
成功的响应将返回ID id
新创建的数据流的。
{
"id": "e0bd8463-0913-4ca1-bd84-6309134ca1f6",
"etag": "\"04004fe9-0000-0200-0000-5ebc4c8b0000\""
}
创建数据流后,您可以监视通过它摄取的数据,以查看有关流运行、完成状态和错误的信息。 有关如何监视数据流的更多信息,请参阅关于的教程 在API中监控数据流
在本教程之后,您已创建一个源连接器,以按计划从支付应用程序中收集数据。 传入数据现在可供下游平台服务使用,例如 Real-Time Customer Profile 和 Data Science Workspace. 有关更多详细信息,请参阅以下文档: