使用为CRM源创建数据流 Flow Service API

本教程介绍了从CRM源检索数据以及使用 Flow Service API.

注意

要创建数据流,您必须已具有与CRM源的有效基连接ID。 如果您没有此ID,请参阅 源概述 以获取可创建基本连接的CRM源列表。

快速入门

此外,本教程还要求您对Adobe Experience Platform的以下组件有一定的了解:

  • Experience Data Model (XDM) System:Experience Platform组织客户体验数据的标准化框架。
    • 架构组合的基础知识:了解XDM模式的基本构建块,包括模式组合中的关键原则和最佳实践。
    • 架构注册开发人员指南:包括成功调用架构注册表API所需了解的重要信息。 这包括您的 {TENANT_ID}、“容器”的概念以及发出请求所需的标头(请特别注意接受标头及其可能值)。
  • Catalog Service:目录是Experience Platform中数据位置和谱系的记录系统。
  • Batch ingestion:批量摄取API允许您将数据作为批处理文件导入到Experience Platform中。
  • 沙箱:Experience Platform提供将单个Platform实例分区为单独虚拟环境的虚拟沙盒,以帮助开发和改进数字体验应用程序。

使用Platform API

有关如何成功调用Platform API的信息,请参阅 Platform API快速入门.

创建源连接

您可以通过向 Flow Service API。 源连接由连接ID、源数据文件的路径和连接规范ID组成。

要创建源连接,还必须为数据格式属性定义枚举值。

为基于文件的连接器使用以下枚举值:

数据格式 枚举值
分隔 delimited
JSON json
镶木 parquet

对于所有基于表的连接器,将值设置为 tabular.

API格式

POST /sourceConnections

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Salesforce source connection",
        "baseConnectionId": "4cb0c374-d3bb-4557-b139-5712880adc55",
        "description": "Salesforce source connection",
        "data": {
            "format": "tabular",
        },
        "params": {
            "tableName": "Accounts",
            "columns": [
                {
                    "name": "first_name",
                    "type": "string",
                    "xdm": {
                        "type": "String"
                    }
                },
                {
                    "name": "last_name",
                    "type": "string",
                    "xdm": {
                        "type": "String"
                    }
                },
                {
                    "name": "email",
                    "type": "string",
                    "xdm": {
                        "type": "String"
                    }
                }
            ]
        },
        "connectionSpec": {
            "id": "ccfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
            "version": "1.0"
        }
    }'
属性 描述
baseConnectionId 您访问的第三方CRM系统的唯一连接ID。
params.path 源文件的路径。
connectionSpec.id 与您的特定第三方CRM系统关联的连接规范ID。 请参阅 附录 ,以获取连接规范ID列表。

响应

成功的响应会返回唯一标识符(id)。 在后续步骤中需要此ID才能创建数据流。

{
    "id": "9a603322-19d2-4de9-89c6-c98bd54eb184"
    "etag": "\"4a00038b-0000-0200-0000-5ebc47fd0000\""
}

创建目标XDM架构

要在Platform中使用源数据,必须创建目标架构以根据您的需求构建源数据。 然后,目标架构用于创建包含源数据的Platform数据集。

通过对 架构注册表API.

有关如何创建目标XDM架构的详细步骤,请参阅 使用API创建模式.

创建目标数据集

通过对 目录服务API,在有效负载中提供目标架构的ID。

有关如何创建目标数据集的详细步骤,请参阅 使用API创建数据集.

创建目标连接

目标连接表示所摄取数据所登陆目标的连接。 要创建目标连接,必须提供与数据湖关联的固定连接规范ID。 此连接规范ID为: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

现在,您的唯一标识符是目标架构(目标数据集)以及与数据湖的连接规范ID。 使用 Flow Service API中,您可以通过指定这些标识符以及将包含入站源数据的数据集来创建目标连接。

API格式

POST /targetConnections

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Salesforce target connection",
        "description": "Salesforce target connection",
        "data": {
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/417a33eg81a221bd10495920574gfa2d",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "5c8c3c555033b814b69f947f"
        },
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        }
    }'
属性 描述
data.schema.id $id 目标XDM架构的URL。
data.schema.version 架构的版本。 必须设置此值 application/vnd.adobe.xed-full+json;version=1,可返回架构的最新次要版本。
params.dataSetId 目标数据集的ID。
connectionSpec.id 用于连接到数据湖的连接规范ID。 此ID为: c604ff05-7f1a-43c0-8e18-33bf874cb11c.
{
    "id": "4ee890c7-519c-4291-bd20-d64186b62da8",
    "etag": "\"2a007aa8-0000-0200-0000-5e597aaf0000\""
}

创建映射

要将源数据摄取到目标数据集,必须先将其映射到目标数据集所附加的目标架构。

要创建映射集,请向 mappingSets 的端点 Data Prep API 提供目标XDM模式时 $id 以及要创建的映射集的详细信息。

API格式

POST /conversion/mappingSets

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/417a33eg81a221bd10495920574gfa2d",
        "xdmVersion": "1.0",
        "id": null,
        "mappings": [
            {
                "destinationXdmPath": "person.name.firstName",
                "sourceAttribute": "first_name",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.lastName",
                "sourceAttribute": "last_name",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "personalEmail.address",
                "sourceAttribute": "email",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            }
        ]
    }'
属性 描述
xdmSchema 目标XDM架构的ID。

响应

成功的响应会返回新创建映射的详细信息,包括其唯一标识符(id)。 在后续步骤中需要此值才能创建数据流。

{
    "id": "500a9b747fcf4908a21917d49bd61780",
    "version": 0,
    "createdDate": 1591043336298,
    "modifiedDate": 1591043336298,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

检索数据流规范

数据流负责从源中收集数据并将它们引入平台。 要创建数据流,必须首先获取负责收集CRM数据的数据流规范。

API格式

GET /flowSpecs?property=name=="CRMToAEP"

请求

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CRMToAEP%22' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

响应

成功的响应会返回负责将数据从源引入平台的数据流规范的详细信息。 响应包括唯一流量规范 id 创建新数据流时需要。

注意

为简便起见,下面隐藏了JSON响应有效负载。 选择“有效负荷”以查看响应有效负荷。

 查看有效负载
{
  "id": "14518937-270c-4525-bdec-c2ba7cce3860",
  "name": "CRMToAEP",
  "providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
  "version": "1.0",
  "attributes": {
    "isSourceFlow": true,
    "flacValidationSupported": true,
    "frequency": "batch",
    "notification": {
      "category": "sources",
      "flowRun": {
        "enabled": true
      }
    }
  },
  "sourceConnectionSpecIds": [
    "3416976c-a9ca-4bba-901a-1f08f66978ff",
    "38ad80fe-8b06-4938-94f4-d4ee80266b07",
    "d771e9c1-4f26-40dc-8617-ce58c4b53702",
    "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
    "cc6a4487-9e91-433e-a3a3-9cf6626c1806",
    "3000eb99-cd47-43f3-827c-43caf170f015",
    "26d738e0-8963-47ea-aadf-c60de735468a",
    "74a1c565-4e59-48d7-9d67-7c03b8a13137",
    "cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
    "4f63aa36-bd48-4e33-bb83-49fbcd11c708",
    "cb66ab34-8619-49cb-96d1-39b37ede86ea",
    "eb13cb25-47ab-407f-ba89-c0125281c563",
    "1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
    "37b6bf40-d318-4655-90be-5cd6f65d334b",
    "a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
    "221c7626-58f6-4eec-8ee2-042b0226f03b",
    "a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
    "6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
    "aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
    "8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
    "ecde33f2-c56f-46cc-bdea-ad151c16cd69",
    "102706fb-a5cd-42ee-afe0-bc42f017ff43",
    "09182899-b429-40c9-a15a-bf3ddbc8ced7",
    "0479cc14-7651-4354-b233-7480606c2ac3",
    "d6b52d86-f0f8-475f-89d4-ce54c8527328",
    "a8f4d393-1a6b-43f3-931f-91a16ed857f4",
    "1fe283f6-9bec-11ea-bb37-0242ac130002",
    "fcad62f3-09b0-41d3-be11-449d5a621b69",
    "ea1c2a08-b722-11eb-8529-0242ac130003",
    "35d6c4d8-c9a9-11eb-b8bc-0242ac130003",
    "ff4274f2-c9a9-11eb-b8bc-0242ac130003",
    "ba5126ec-c9ac-11eb-b8bc-0242ac130003",
    "b2e08744-4f1a-40ce-af30-7abac3e23cf3",
    "929e4450-0237-4ed2-9404-b7e1e0a00309",
    "2acf109f-9b66-4d5e-bc18-ebb2adcff8d5",
    "2fa8af9c-2d1a-43ea-a253-f00a00c74412"
  ],
  "targetConnectionSpecIds": [
    "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
  ],
  "permissionsInfo": {
    "view": [
      {
        "@type": "lowLevel",
        "name": "EnterpriseSource",
        "permissions": [
          "read"
        ]
      }
    ],
    "manage": [
      {
        "@type": "lowLevel",
        "name": "EnterpriseSource",
        "permissions": [
          "write"
        ]
      }
    ]
  },
  "optionSpec": {
    "name": "OptionSpec",
    "spec": {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "properties": {
        "errorDiagnosticsEnabled": {
          "title": "Error diagnostics.",
          "description": "Flag to enable detailed and sample error diagnostics summary.",
          "type": "boolean",
          "default": false
        },
        "partialIngestionPercent": {
          "title": "Partial ingestion threshold.",
          "description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
          "type": "number",
          "exclusiveMinimum": 0
        }
      }
    }
  },
  "scheduleSpec": {
    "name": "PeriodicSchedule",
    "type": "Periodic",
    "spec": {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "properties": {
        "startTime": {
          "description": "epoch time",
          "type": "integer"
        },
        "frequency": {
          "type": "string",
          "enum": [
            "once",
            "minute",
            "hour",
            "day",
            "week"
          ]
        },
        "interval": {
          "type": "integer"
        },
        "backfill": {
          "type": "boolean",
          "default": true
        }
      },
      "required": [
        "startTime",
        "frequency"
      ],
      "if": {
        "properties": {
          "frequency": {
            "const": "once"
          }
        }
      },
      "then": {
        "allOf": [
          {
            "not": {
              "required": [
                "interval"
              ]
            }
          },
          {
            "not": {
              "required": [
                "backfill"
              ]
            }
          }
        ]
      },
      "else": {
        "required": [
          "interval"
        ],
        "if": {
          "properties": {
            "frequency": {
              "const": "minute"
            }
          }
        },
        "then": {
          "properties": {
            "interval": {
              "minimum": 15
            }
          }
        },
        "else": {
          "properties": {
            "interval": {
              "minimum": 1
            }
          }
        }
      }
    }
  },
  "transformationSpec": [
    {
      "name": "Copy",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "properties": {
          "deltaColumn": {
            "type": "object",
            "properties": {
              "name": {
                "type": "string"
              },
              "dateFormat": {
                "type": "string"
              },
              "timezone": {
                "type": "string"
              }
            },
            "required": [
              "name"
            ]
          }
        },
        "required": [
          "deltaColumn"
        ]
      }
    },
    {
      "name": "Mapping",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "description": "defines various params required for different mapping from source to target",
        "properties": {
          "mappingId": {
            "type": "string"
          },
          "mappingVersion": {
            "type": "string"
          }
        }
      }
    }
  ],
  "runSpec": {
      "name": "ProviderParams",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "description": "defines various params required for creating flow run.",
        "properties": {
          "startTime": {
            "type": "integer",
            "description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
          },
          "windowStartTime": {
            "type": "integer",
            "description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
          },
          "windowEndTime": {
            "type": "integer",
            "description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
          },
          "deltaColumn": {
            "type": "object",
            "description": "The delta column is required to partition the data and separate newly ingested data from historic data.",
            "properties": {
              "name": {
                "type": "string"
              },
              "dateFormat": {
                "type": "string"
              },
              "timezone": {
                "type": "string"
              }
            },
            "required": [
              "name"
            ]
          }
        },
        "required": [
          "startTime",
          "windowStartTime",
          "windowEndTime",
          "deltaColumn"
        ]
      }
    }
}

创建数据流

收集CRM数据的最后一步是创建数据流。 现在,您已准备以下必需值:

数据流负责从源中调度和收集数据。 通过在有效负载中提供先前提到的值时执行POST请求,可以创建数据流。

要计划摄取,您必须首先将开始时间值设置为以秒为单位的新纪元时间。 然后,您必须将频率值设置为以下五个选项之一: once, minute, hour, dayweek. 间隔值可指定两个连续摄取和创建一次性摄取之间的周期,而无需设置间隔。 对于所有其他频率,间隔值必须设置为等于或大于 15.

API格式

POST /flows

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Salesforce dataflow",
        "description": "Salesforce dataflow",
        "flowSpec": {
            "id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "9a603322-19d2-4de9-89c6-c98bd54eb184"
        ],
        "targetConnectionIds": [
            "4ee890c7-519c-4291-bd20-d64186b62da8"
        ],
        "transformations": [
            {
                "name": "Copy",
                "params": {
                    "deltaColumn": {
                        "name": "updatedAt",
                        "dateFormat": "YYYY-MM-DD",
                        "timezone": "UTC"
                    }
                }
            },
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "500a9b747fcf4908a21917d49bd61780",
                    "mappingVersion": 0
                }
            }
        ],
        "scheduleParams": {
            "startTime": "1567411548",
            "frequency":"minute",
            "interval":"30"
        }
    }'
属性 描述
flowSpec.id 流规范ID 已在上一步中检索。
sourceConnectionIds 源连接ID 在之前的步骤中检索。
targetConnectionIds 目标连接ID 在之前的步骤中检索。
transformations.params.mappingId 映射ID 在之前的步骤中检索。
transformations.params.deltaColum 用于区分新数据和现有数据的指定列。 将根据选定列的时间戳摄取增量数据。 支持的格式 deltaColumn is yyyy-MM-dd HH:mm:ss. 如果您使用的是Microsoft Dynamics,则支持 deltaColumn is yyyy-MM-ddTHH:mm:ssZ.
transformations.params.mappingId 与数据库关联的映射ID。
scheduleParams.startTime 新纪元时间中数据流的开始时间。
scheduleParams.frequency 数据流收集数据的频率。 可接受的值包括: once, minute, hour, dayweek.
scheduleParams.interval 该间隔指定两个连续流运行之间的周期。 间隔的值应为非零整数。 将频率设置为时,不需要间隔 once 且应大于或等于 15 的其他频率值。

响应

成功的响应会返回ID(id)。

{
    "id": "8256cfb4-17e6-432c-a469-6aedafb16cd5"
    "etag": "\"04004fe9-0000-0200-0000-5ebc4c8b0000\""

}

监控数据流

创建数据流后,您可以监视通过其摄取的数据,以查看有关流量运行、完成状态和错误的信息。 有关如何监视数据流的更多信息,请参阅 监控API中的数据流

后续步骤

在本教程之后,您创建了一个源连接器,以按计划从CRM系统收集数据。 传入数据现在可由下游Platform服务使用,例如 Real-time Customer Profile 和 Data Science Workspace. 有关更多详细信息,请参阅以下文档:

附录

以下部分列出了不同的CRM源连接器及其连接规范。

连接规范

连接器名称 连接规范
Microsoft Dynamics 38ad80fe-8b06-4938-94f4-d4ee80266b07
Salesforce cfc0fee1-7dc0-40ef-b73e-d8b134c436f5

在此页面上