使用源连接器和API收集云存储数据

本教程介绍了从第三方云存储检索数据,并通过源连接器和Flow Service API将数据引入平台的步骤。

快速入门

本教程要求您通过有效连接以及有关要引入平台的文件(包括文件的路径和结构)的信息,来访问第三方云存储。 如果您没有此信息,请参阅教程中的使用 Flow Service API浏览第三方云存储,然后再尝试使用本教程。

此外,本教程还要求您对Adobe Experience Platform的以下组件有一定的了解:

  • Experience Data Model (XDM) System:Experience Platform组织客户体验数据的标准化框架。
    • 架构组合的基础知识:了解XDM模式的基本构建块,包括模式组合中的关键原则和最佳实践。
    • 架构注册开发人员指南:包括成功调用架构注册表API所需了解的重要信息。这包括您的{TENANT_ID}、“容器”的概念以及发出请求所需的标头(请特别注意“接受”标头及其可能值)。
  • Catalog Service:目录是Experience Platform中数据位置和谱系的记录系统。
  • Batch ingestion:批量摄取API允许您将数据作为批处理文件导入到Experience Platform中。
  • 沙盒:Experience Platform提供将单个Platform实例分区为单独虚拟环境的虚拟沙盒,以帮助开发和改进数字体验应用程序。以下部分提供了使用Flow Service API成功连接到云存储所需了解的其他信息。

读取示例API调用

本教程提供了用于演示如何设置请求格式的示例API调用。 这包括路径、所需标头以及格式正确的请求负载。 还提供了API响应中返回的示例JSON。 有关示例API调用文档中使用的约定的信息,请参阅Experience Platform疑难解答指南中如何阅读示例API调用一节。

收集所需标题的值

要调用Platform API,您必须先完成身份验证教程。 完成身份验证教程可为所有Experience PlatformAPI调用中每个所需标头的值,如下所示:

  • Authorization: Bearer {ACCESS_TOKEN}
  • x-api-key: {API_KEY}
  • x-gw-ims-org-id: {IMS_ORG}

Experience Platform中的所有资源(包括属于Flow Service的资源)都与特定虚拟沙箱隔离。 对Platform API的所有请求都需要一个标头来指定操作将在其中进行的沙盒的名称:

  • x-sandbox-name: {SANDBOX_NAME}

所有包含有效负载(POST、PUT、PATCH)的请求都需要额外的媒体类型标头:

  • Content-Type: application/json

创建源连接

您可以通过向Flow Service API发出POST请求来创建源连接。 源连接由连接ID、源数据文件的路径和连接规范ID组成。

要创建源连接,还必须为数据格式属性定义枚举值。

为基于文件的源使用以下枚举值:

数据格式 枚举值
分隔 delimited
JSON json
镶木 parquet

对于所有基于表的源,将值设置为tabular

API格式

POST /sourceConnections

使用自定义分隔文件创建源连接

请求

您可以通过指定columnDelimiter作为属性来摄取具有自定义分隔符的分隔文件。 任何单个字符值都是允许的列分隔符。 如果未提供,则使用逗号(,)作为默认值。

以下示例请求使用制表符分隔值为分隔文件类型创建源连接。

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Cloud storage source connection for delimited files",
        "description": "Cloud storage source connector",
        "baseConnectionId": "9e2541a0-b143-4d23-a541-a0b143dd2301",
        "data": {
            "format": "delimited",
            "columnDelimiter": "\t"
        },
        "params": {
            "path": "/ingestion-demos/leads/tsv_data/*.tsv",
            "recursive": "true"
        },
        "connectionSpec": {
            "id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
            "version": "1.0"
        }
    }'
属性 描述
baseConnectionId 您正在访问的第三方云存储系统的唯一连接ID。
data.format 定义数据格式属性的枚举值。
data.columnDelimiter 您可以使用任何单字符列分隔符来收集平面文件。 仅在摄取CSV或TSV文件时才需要此属性。
params.path 您访问的源文件的路径。
connectionSpec.id 与您的特定第三方云存储系统关联的连接规范ID。 有关连接规范ID的列表,请参阅附录

响应

成功的响应会返回新创建源连接的唯一标识符(id)。 在后续步骤中需要此ID才能创建数据流。

{
    "id": "26b53912-1005-49f0-b539-12100559f0e2",
    "etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}

使用压缩文件创建源连接

请求

您还可以通过将压缩的JSON或分隔文件指定为属性来摄取该文件的compressionType。 支持的压缩文件类型列表包括:

  • bzip2
  • gzip
  • deflate
  • zipDeflate
  • tarGzip
  • tar

以下示例请求使用gzip文件类型为压缩分隔文件创建源连接。

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Cloud storage source connection for compressed files",
        "description": "Cloud storage source connection for compressed files",
        "baseConnectionId": "9e2541a0-b143-4d23-a541-a0b143dd2301",
        "data": {
            "format": "delimited",
            "properties": {
                "compressionType" : "gzip"
            }
        },
        "params": {
            "path": "/compressed/files.gzip"
        },
        "connectionSpec": {
            "id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
            "version": "1.0"
        }
     }'
属性 描述
data.properties.compressionType 确定要摄取的压缩文件类型。 仅在摄取压缩的JSON或分隔文件时,才需要此属性。

响应

成功的响应会返回新创建源连接的唯一标识符(id)。 在后续步骤中需要此ID才能创建数据流。

{
    "id": "26b53912-1005-49f0-b539-12100559f0e2",
    "etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}

创建目标XDM架构

要在Platform中使用源数据,必须创建目标架构以根据您的需求构建源数据。 然后,目标架构用于创建包含源数据的Platform数据集。

通过对架构注册表API执行POST请求,可以创建目标XDM架构。

API格式

POST /schemaregistry/tenant/schemas

请求

以下示例请求创建一个XDM架构,以扩展XDM Individual Profile类。

curl -X POST \
    'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "type": "object",
        "title": "Target schema for a Cloud Storage connector",
        "description": "Target schema for a Cloud Storage connector",
        "allOf": [
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
            }
        ],
        "meta:containerId": "tenant",
        "meta:resourceType": "schemas",
        "meta:xdmType": "object",
        "meta:class": "https://ns.adobe.com/xdm/context/profile"
    }'

响应

成功的响应会返回新创建架构的详细信息,包括其唯一标识符($id)。 在后续步骤中需要此ID才能创建目标数据集、映射和数据流。

{
    "$id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
    "meta:altId": "_{TENANT_ID}.schemas.995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
    "meta:resourceType": "schemas",
    "version": "1.0",
    "title": "Target schema cloud storage",
    "type": "object",
    "description": "Target schema for cloud storage",
    "allOf": [
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
            "type": "object",
            "meta:xdmType": "object"
        }
    ],
    "refs": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "imsOrg": "{IMS_ORG}",
    "meta:extensible": false,
    "meta:abstract": false,
    "meta:extends": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/common/auditable",
        "https://ns.adobe.com/xdm/data/record",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "meta:xdmType": "object",
    "meta:registryMetadata": {
        "repo:createdDate": 1597783248870,
        "repo:lastModifiedDate": 1597783248870,
        "xdm:createdClientId": "{CREATED_CLIENT_ID}",
        "xdm:lastModifiedClientId": "{LAST_MODIFIED_CLIENT_ID}",
        "xdm:createdUserId": "{CREATED_USER_ID}",
        "xdm:lastModifiedUserId": "{LAST_MODIFIED_USER_ID}",
        "eTag": "596661ec6c7a9c6ae530676e98290a4a58ca29540ed92489cf4478b2bf013a65",
        "meta:globalLibVersion": "1.13.3"
    },
    "meta:class": "https://ns.adobe.com/xdm/context/profile",
    "meta:containerId": "tenant",
    "meta:tenantNamespace": "{TENANT_ID}"
}

创建目标数据集

通过向Catalog Service API执行POST请求,并提供有效负载中目标架构的ID,可以创建目标数据集。

API格式

POST /catalog/dataSets

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Target dataset for cloud storage",
        "schemaRef": {
            "id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
            "contentType": "application/vnd.adobe.xed-full-notext+json; version=1"
        }
    }'
属性 描述
schemaRef.id 目标XDM架构的ID。
schemaRef.contentType 架构的版本。 此值必须设置为application/vnd.adobe.xed-full-notext+json;version=1,这将返回架构的最新次要版本。

响应

成功的响应会返回一个数组,其中包含格式为"@/datasets/{DATASET_ID}"的新创建数据集的ID。 数据集ID是由系统生成的只读字符串,用于在API调用中引用数据集。 在后续步骤中需要目标数据集ID才能创建目标连接和数据流。

[
    "@/dataSets/5f3c3cedb2805c194ff0b69a"
]

创建目标连接

目标连接表示所摄取数据所登陆目标的连接。 要创建目标连接,必须提供与数据湖关联的固定连接规范ID。 此连接规范ID为:c604ff05-7f1a-43c0-8e18-33bf874cb11c

现在,您将唯一标识符作为目标数据集的目标架构,并将连接规范ID用于数据湖。 使用这些标识符,您可以使用Flow Service API创建目标连接,以指定将包含集客源数据的数据集。

API格式

POST /targetConnections

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Target Connection for a Cloud Storage connector",
        "description": "Target Connection for a Cloud Storage connector",
        "data": {
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "5f3c3cedb2805c194ff0b69a"
        },
            "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        }
    }'
属性 描述
data.schema.id 目标XDM架构的$id
data.schema.version 架构的版本。 此值必须设置为application/vnd.adobe.xed-full+json;version=1,这将返回架构的最新次要版本。
params.dataSetId 目标数据集的ID。
connectionSpec.id 修复了与数据湖的连接规范ID。 此ID为:c604ff05-7f1a-43c0-8e18-33bf874cb11c

响应

成功的响应会返回新目标连接的唯一标识符(id)。 此ID是后续步骤所必需的。

{
    "id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

创建映射

要将源数据摄取到目标数据集,必须首先将其映射到目标数据集所遵循的目标架构。 这是通过向转化服务执行POST请求来实现的,该请求请求有效载荷中定义了数据映射。

小贴士

您可以使用云存储源连接器映射JSON文件中的数组等复杂数据类型。

API格式

POST /conversion/mappingSets

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
        "xdmVersion": "1.0",
        "id": null,
        "mappings": [
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "Id",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.firstName",
                "sourceAttribute": "FirstName",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.lastName",
                "sourceAttribute": "LastName",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            }
        ]
    }'
属性 描述
xdmSchema 目标XDM架构的ID。

响应

成功的响应会返回新创建映射的详细信息,包括其唯一标识符(id)。 在后续步骤中需要此值才能创建数据流。

{
    "id": "bf5286a9c1ad4266baca76ba3adc9366",
    "version": 0,
    "createdDate": 1597784069368,
    "modifiedDate": 1597784069368,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

检索数据流规范

数据流负责从源中收集数据并将它们引入平台。 要创建数据流,必须首先获取负责收集云存储数据的数据流规范。

API格式

GET /flowSpecs?property=name=="CloudStorageToAEP"

请求

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

响应

成功的响应会返回负责将数据从源引入平台的数据流规范的详细信息。 响应包含创建新数据流所需的唯一流程规范id

{
    "items": [
        {
            "id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
            "name": "CloudStorageToAEP",
            "providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
            "version": "1.0",
            "sourceConnectionSpecIds": [
                "b3ba5556-48be-44b7-8b85-ff2b69b46dc4",
                "ecadc60c-7455-4d87-84dc-2a0e293d997b",
                "b7829c2f-2eb0-4f49-a6ee-55e33008b629",
                "4c10e202-c428-4796-9208-5f1f5732b1cf",
                "fb2e94c9-c031-467d-8103-6bd6e0a432f2",
                "32e8f412-cdf7-464c-9885-78184cb113fd",
                "b7bf2577-4520-42c9-bae9-cad01560f7bc",
                "998b8ae3-cec0-43b7-8abe-40b1eb4ee069",
                "be5ec48c-5b78-49d5-b8fa-7c89ec4569b8"
            ],
            "targetConnectionSpecIds": [
                "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
            ],
            "transformationSpecs": [
                {
                    "name": "Mapping",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "description": "defines various params required for different mapping from source to target",
                        "properties": {
                            "mappingId": {
                                "type": "string"
                            },
                            "mappingVersion": {
                                "type": "string"
                            }
                        }
                    }
                }
            ],
            "scheduleSpec": {
                "name": "PeriodicSchedule",
                "type": "Periodic",
                "spec": {
                    "$schema": "http://json-schema.org/draft-07/schema#",
                    "type": "object",
                    "properties": {
                        "startTime": {
                            "description": "epoch time",
                            "type": "integer"
                        },
                        "endTime": {
                            "description": "epoch time",
                            "type": "integer"
                        },
                        "interval": {
                            "type": "integer"
                        },
                        "frequency": {
                            "type": "string",
                            "enum": [
                                "minute",
                                "hour",
                                "day",
                                "week"
                            ]
                        },
                        "backfill": {
                            "type": "boolean",
                            "default": true
                        }
                    },
                    "required": [
                        "startTime",
                        "frequency",
                        "interval"
                    ],
                    "if": {
                        "properties": {
                            "frequency": {
                                "const": "minute"
                            }
                        }
                    },
                    "then": {
                        "properties": {
                            "interval": {
                                "minimum": 15
                            }
                        }
                    },
                    "else": {
                        "properties": {
                            "interval": {
                                "minimum": 1
                            }
                        }
                    }
                }
            },
            "permissionsInfo": {
                "view": [
                    {
                        "@type": "lowLevel",
                        "name": "EnterpriseSource",
                        "permissions": [
                            "read"
                        ]
                    }
                ],
                "manage": [
                    {
                        "@type": "lowLevel",
                        "name": "EnterpriseSource",
                        "permissions": [
                            "write"
                        ]
                    }
                ]
            }
        }
    ]
}

创建数据流

收集云存储数据的最后一步是创建数据流。 现在,您已准备以下必需值:

数据流负责从源中调度和收集数据。 通过在有效负载中提供先前提到的值时执行POST请求,可以创建数据流。

要计划摄取,您必须首先将开始时间值设置为以秒为单位的新纪元时间。 然后,您必须将频率值设置为以下五个选项之一:onceminutehourdayweek。 间隔值可指定两个连续摄取和创建一次性摄取之间的周期,而无需设置间隔。 对于所有其他频率,间隔值必须设置为等于或大于15

重要

强烈建议在使用FTP连接器时,安排一次性引入数据流。

API格式

POST /flows

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Cloud Storage flow to Platform",
        "description": "Cloud Storage flow to Platform",
        "flowSpec": {
            "id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "26b53912-1005-49f0-b539-12100559f0e2"
        ],
        "targetConnectionIds": [
            "f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
        ],
        "transformations": [
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
                    "mappingVersion": "0"
                }
            }
        ],
        "scheduleParams": {
            "startTime": "1597784298",
            "frequency":"minute",
            "interval":"30"
        }
    }'
属性 描述
flowSpec.id 在上一步骤中检索到的流量规范ID
sourceConnectionIds 在前面的步骤中检索到的源连接ID
targetConnectionIds 在前面的步骤中检索到的目标连接ID
transformations.params.mappingId 在前面的步骤中检索到的映射ID
scheduleParams.startTime 新纪元时间中数据流的开始时间。
scheduleParams.frequency 数据流收集数据的频率。 可接受的值包括:onceminutehourdayweek
scheduleParams.interval 该间隔指定两个连续流运行之间的周期。 间隔的值应为非零整数。 当频率设置为once时,不需要间隔,对于其他频率值,间隔应大于或等于15

响应

成功的响应会返回新创建数据流的ID(id)。

{
    "id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

监控数据流

创建数据流后,您可以监视通过其摄取的数据,以查看有关流量运行、完成状态和错误的信息。 有关如何监视数据流的更多信息,请参阅关于在API中监视数据流的教程

后续步骤

在本教程中,您创建了一个源连接器,用于按计划从云存储中收集数据。 现在,下游Platform服务(如Real-time Customer Profile和Data Science Workspace)可以使用传入数据。 有关更多详细信息,请参阅以下文档:

附录

以下部分列出了不同的云存储源连接器及其连接规范。

连接规范

连接器名称 连接规范
Amazon S3 (S3) ecadc60c-7455-4d87-84dc-2a0e293d997b
Amazon Kinesis (Kinesis) 86043421-563b-46ec-8e6c-e23184711bf6
Azure Blob (Blob) 4c10e202-c428-4796-9208-5f1f5732b1cf
Azure Data Lake Storage Gen2 (ADLS第2代) 0ed90a81-07f4-4586-8190-b40eccef1c5a
Azure Event Hubs (事件中心) bf9f5905-92b7-48bf-bf20-455bc6b60a4e
Azure File Storage be5ec48c-5b78-49d5-b8fa-7c89ec4569b8
Google Cloud Storage 32e8f412-cdf7-464c-9885-78184cb113fd
HDFS 54e221aa-d342-4707-bcff-7a4bceef0001
Oracle Object Storage c85f9425-fb21-426c-ad0b-405e9bd8a46c
SFTP bf367b0d-3d9b-4060-b67b-0d3d9bd06094

在此页面上