通过源连接器和API收集流数据

Flow Service 用于收集和集中Adobe Experience Platform内不同来源的客户数据。 该服务提供用户界面和RESTful API,所有支持的源都可从中连接。

本教程介绍了从流源连接器检索数据以及使用API将 Experience Platform 其引入的 Flow Service 步骤

入门指南

本教程要求您具有流式连接器的有效连接ID。 如果您没有此信息,请在尝试本教程之前参阅以下有关创建流源连接的教程:

本教程还要求您对Adobe Experience Platform的以下组件有充分的了解:

  • Experience Data Model (XDM) System:Experience Platform组织客户体验数据的标准化框架。
    • 模式合成基础:了解XDM模式的基本构件,包括模式构成的主要原则和最佳做法。
    • 模式注册开发人员指南:包括成功执行对模式注册表API的调用时需要了解的重要信息。 这包括您 {TENANT_ID}的、“容器”的概念以及发出请求所需的标题(特别要注意“接受”标题及其可能的值)。
  • Catalog Service:目录是数据位置和谱系的记录系统 Experience Platform。
  • Streaming ingestion:流式摄取 Platform 为用户提供了一种将数据从客户端和服务器端设备实时 Experience Platform 发送到的方法。
  • 沙箱: Experience Platform 提供将单个实例分为单独的虚 Platform 拟环境的虚拟沙箱,以帮助开发和发展数字体验应用程序。

以下各节提供您需要了解的其他信息,以便使用API成功收集流数 Flow Service 据。

读取示例API调用

本教程提供示例API调用,以演示如何设置请求的格式。 这包括路径、必需的标头和格式正确的请求负载。 还提供API响应中返回的示例JSON。 有关示例API调用文档中使用的惯例的信息,请参阅疑难解答 指南中有关如何阅读示例API调 用 Experience Platform 一节。

收集所需标题的值

要调用API,您必 Platform 须先完成身份验证 教程。 完成身份验证教程可为所有API调用中的每个所需 Experience Platform 标头提供值,如下所示:

  • Authorization: Bearer {ACCESS_TOKEN}
  • x-api-key: {API_KEY}
  • x-gw-ims-org-id: {IMS_ORG}

中的所有资 Experience Platform源(包括属于这些资 Flow Service源)都与特定虚拟沙箱隔离。 对API的 Platform 所有请求都需要一个标头,它指定操作将在中进行的沙箱的名称:

  • x-sandbox-name: {SANDBOX_NAME}

所有包含有效负荷(POST、PUT、PATCH)的请求都需要额外的媒体类型标头:

  • Content-Type: application/json

创建源连接

您可以通过向API发出POST请求来创建源 Flow Service 连接。 源连接由连接ID、源数据文件的路径和连接规范ID组成。

要创建源连接,还必须为数据格式属性定义枚举值。

为基于文件的连接器使用以下枚举值:

数据格式 枚举值
分隔 delimited
JSON json
镶木 parquet

对于所有基于表的连接器,将值设置为 tabular

API格式

POST /sourceConnections

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Test source connector for streaming data",
        "providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
        "connectionId": "f6aa6c58-3c3d-4c59-aa6c-583c3d6c599c",
        "description": "Test source connector for streaming data",
        "data": {
            "format": "delimited"
        },
            "connectionSpec": {
            "id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
            "version": "1.0"
        }
    }'
属性 描述
providerId 流连接器的提供者ID。
connectionId 流连接器的唯一连接ID。
connectionSpec.id 与您的特定流连接器关联的连接规范ID。

响应

成功的响应会返回新创建的源id连接的唯一标识符()。 此ID是后续步骤中创建数据流的必需ID。

{
    "id": "2abd97c4-91bb-4c93-bd97-c491bbfc933d",
    "etag": "\"66013508-0000-0200-0000-5f6e2ae70000\""
}

创建目标XDM模式

要在中使用源数据,必须创 Platform建目标模式,以根据您的需求构建源数据。 然后,目标模式用于创建包含 Platform 源数据的数据集。 此目标XDM模式还扩展了XDM Individual Profile 类。

通过对目标注册表API执行POST请求,可以创 建模式XDM模式

API格式

POST /tenant/schemas

请求

以下示例请求创建一个扩展XDM类的XDM模式 Individual Profile 符。

curl -X POST \
    'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "type": "object",
        "title": "Sample schema for a streaming connector",
        "description": "Sample schema for a streaming connector",
        "allOf": [
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
            }
        ],
        "meta:containerId": "tenant",
        "meta:resourceType": "schemas",
        "meta:xdmType": "object",
        "meta:class": "https://ns.adobe.com/xdm/context/profile"
    }'

响应

成功的响应会返回新创建模式的详细信息,包括其唯一标识符($id)。 后续步骤中需要此ID才能创建目标数据集、映射和数据流。

{
    "$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
    "meta:altId": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
    "meta:resourceType": "schemas",
    "version": "1.0",
    "title": "Sample schema for a streaming connector",
    "type": "object",
    "description": "Sample schema for a streaming connector",
    "allOf": [
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
            "type": "object",
            "meta:xdmType": "object"
        }
    ],
    "refs": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "imsOrg": "{IMS_ORG}",
    "meta:extensible": false,
    "meta:abstract": false,
    "meta:extends": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/common/auditable",
        "https://ns.adobe.com/xdm/data/record",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "meta:xdmType": "object",
    "meta:registryMetadata": {
        "repo:createdDate": 1604960074752,
        "repo:lastModifiedDate": 1604960074752,
        "xdm:createdClientId": "{CREATED_CLIENT_ID}",
        "xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
        "xdm:createdUserId": "{CREATED_USER_ID}",
        "xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
        "eTag": "8522a151effd974429518ed90c3eaf6efc9bf6ffb6644087a85c6d4455dcd045",
        "meta:globalLibVersion": "1.16.1"
    },
    "meta:class": "https://ns.adobe.com/xdm/context/profile",
    "meta:containerId": "tenant",
    "meta:sandboxId": "{SANDBOX_ID}",
    "meta:sandboxType": "production",
    "meta:tenantNamespace": "_{TENANT_ID}"
}

创建目标数据集

通过向Catalog Service API执行目标请求 ,提供有效负荷中POST模式的ID,可以创建目标数据集。

API格式

POST /catalog/dataSets

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "schemaRef": {
            "id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
            "contentType": "application/vnd.adobe.xed-full-notext+json; version=1.1"
        },
        "fileDescription": {
            "format": "parquet"
        },
        "tags": {
            "identity": [
            "enabled:true"
            ],
            "profile": [
            "enabled:true"
            ]
        },
        "name": "Test streaming dataset"
    }'
属性 描述
schemaRef.id 目标XDM模式的ID。

响应

成功的响应会返回一个数组,其中包含格式为新创建数据集的ID "@/datasets/{DATASET_ID}"。 数据集ID是由系统生成的只读字符串,用于在API调用中引用数据集。 目标数据集ID是后续步骤中创建目标连接和数据流所必需的。

[
    "@/dataSets/5f7187bac6d00f194fb937c0"
]

创建目标连接

目标连接表示到所摄取数据所进入的目的地的连接。 要创建目标连接,必须提供与数据库关联的固定连接规范ID。 此连接规范ID为: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

您现在将唯一标识符作为目标模式目标集和连接规范ID到数据湖。 使用这些标识符,您可以使用API创建目标 Flow Service 连接,以指定将包含入站源数据的数据集。

API格式

POST /targetConnections

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Streaming target connection",
        "description": "Streaming target connection",
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        },
        "data": {
            "format": "parquet_xdm"
        },
        "params": {
        "dataSetId": "5f7187bac6d00f194fb937c0"
        }
    }'
属性 描述
params.dataSetId 目标数据集的ID。
connectionSpec.id 用于连接到数据湖的连接规范ID。 此ID为: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

响应

成功的响应会返回新目标连接的唯一标识符(id)。 此ID是后续步骤中必需的。

{
    "id": "d9300194-6a82-4163-b001-946a821163b8",
    "etag": "\"4006d3e4-0000-0200-0000-5f7189220000\""
}

创建映射

为了将源数据引入目标数据集,必须首先将其映射到目标数据集所附加的目标模式。 这是通过对转换服务执行POST请求来实现的,该请求具有在请求有效负荷中定义的数据映射。

API格式

POST /conversion/mappingSets

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
        "xdmVersion": "1.0",
        "mappings": [
            {
                "destinationXdmPath": "person.name.firstName",
                "sourceAttribute": "firstName",
                "identity": false,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.lastName",
                "sourceAttribute": "lastName",
                "identity": false,
                "version": 0
            }
        ]
    }'
属性 描述
xdmSchema 目标 $id XDM模式。

响应

成功的响应会返回新创建的映射的详细信息,包括其唯一标识符(id)。 此ID是后续步骤中创建数据流的必需ID。

{
    "id": "380b032b445a46008e77585e046efe5e",
    "version": 0,
    "createdDate": 1604960750613,
    "modifiedDate": 1604960750613,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

查找数据流规范

数据流负责从源收集数据并将其引入 Platform。 要创建数据流,必须首先通过对API执行GET请求来获取数据流规 Flow Service 范。 数据流规范负责从流连接器收集数据。
API格式

GET /flowSpecs?property=name=="Steam data with transformation"

请求

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="Steam data with transformation"' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

响应

成功的响应会返回数据流规范的详细信息,该规范负责将流连接器中的数据引入 Platform。 下一步中需要此ID才能创建新数据流。

{
    "items": [
        {
            "id": "c1a19761-d2c7-4702-b9fa-fe91f0613e81",
            "name": "Steam data with transformation",
            "providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
            "version": "1.0",
            "sourceConnectionSpecIds": [
                "d27d4907-7351-47dd-bbc2-05a04365703d",
                "51ae16c2-bdad-42fd-9fce-8d5dfddaf140",
                "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb"
            ],
            "targetConnectionSpecIds": [
                "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
            ],
            "transformationSpecs": [
                {
                    "name": "Mapping",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "description": "defines various params required for different mapping from Raw to XDM",
                        "properties": {
                            "mappingId": {
                                "type": "string"
                            }
                        },
                        "required": [
                            "mappingId"
                        ]
                    }
                }
            ],
            "attributes": {
                "uiAttributes": {
                    "apiFeatures": {
                        "deleteSupported": false,
                        "updateSupported": false,
                        "flowRunsSupported": false
                    }
                }
            },
            "permissionsInfo": {
                "view": [
                    {
                        "@type": "lowLevel",
                        "name": "StreamingSource",
                        "permissions": [
                            "read"
                        ]
                    }
                ],
                "manage": [
                    {
                        "@type": "lowLevel",
                        "name": "StreamingSource",
                        "permissions": [
                            "write"
                        ]
                    }
                ]
            }
        }
    ]
}

创建数据流

收集流数据的最后一步是创建数据流。 现在,您准备了以下必需值:

数据流负责从源调度和收集数据。 通过在有效负荷中提供先前提到的值时执行POST请求,可以创建数据流。

API格式

POST /flows

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Streaming dataflow",
        "description": "Streaming dataflow",
        "flowSpec": {
            "id": "c1a19761-d2c7-4702-b9fa-fe91f0613e81",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "2abd97c4-91bb-4c93-bd97-c491bbfc933d"
        ],
        "targetConnectionIds": [
            "723222e2-6ab9-4b0b-b222-e26ab9bb0bc2"
        ],
        "transformations": [
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "380b032b445a46008e77585e046efe5e",
                    "mappingVersion": 0
                }
            }
        ]
    }'
属性 描述
flowSpec.id 在上 一步中检索 的流程规范ID。
sourceConnectionIds 先前步骤中检索 的源连接ID。
targetConnectionIds 先前步骤中检索 的目标连接ID。
transformations.params.mappingId 先前步骤 中检索的映射ID。

响应

成功的响应会返回新创id建的数据流的ID()。

{
    "id": "1f086c23-2ea8-4d06-886c-232ea8bd061d",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

后续步骤

通过遵循本教程,您创建了一个数据流,用于从流连接器收集流数据。 现在,下游服务(如和)可 Platform 以使用传入 Real-time Customer Profile 数据 Data Science Workspace。 有关更多详细信息,请参阅以下文档:

在此页面上