使用Flow Service API从第三方云存储系统中摄取Parquet数据

Flow Service 用于收集和集中Adobe Experience Platform内不同来源的客户数据。该服务提供了用户界面和RESTful API,所有受支持的源都可从中连接。

本教程使用Flow Service API指导您完成从第三方云存储系统中摄取Parquet数据的步骤。

快速入门

本指南要求您对Adobe Experience Platform的以下组件有一定的了解:

  • 来源: Experience Platform 允许从各种源摄取数据,同时让您能够使用服务来构建、标记和增强传入数 Platform 据。
  • 沙盒: Experience Platform 提供将单个实例分区为单独虚 Platform 拟环境的虚拟沙盒,以帮助开发和改进数字体验应用程序。

以下部分提供了您需要了解的其他信息,以便使用Flow Service API从第三方云存储中成功摄取Parquet数据。

读取示例API调用

本教程提供了用于演示如何设置请求格式的示例API调用。 这包括路径、所需标头以及格式正确的请求负载。 还提供了API响应中返回的示例JSON。 有关示例API调用文档中使用的惯例的信息,请参阅Experience Platform疑难解答指南中如何阅读示例API调用一节。

收集所需标题的值

要调用Platform API,您必须先完成身份验证教程。 完成身份验证教程可为所有Experience Platform API调用中每个所需标头的值,如下所示:

  • Authorization: Bearer {ACCESS_TOKEN}
  • x-api-key: {API_KEY}
  • x-gw-ims-org-id: {IMS_ORG}

Experience Platform中的所有资源(包括属于Flow Service的资源)都与特定虚拟沙箱隔离。 对Platform API的所有请求都需要一个标头来指定操作将在其中进行的沙盒的名称:

  • x-sandbox-name: {SANDBOX_NAME}

所有包含有效负载(POST、PUT、PATCH)的请求都需要额外的媒体类型标头:

  • Content-Type: application/json

创建连接

要使用Platform API摄取Parquet数据,您必须对您正在访问的第三方云存储源拥有有效连接。 如果您尚未连接要处理的存储,则可以通过以下教程创建一个:

获取并存储连接的唯一标识符($id),然后继续执行本教程的下一步。

创建目标架构

要在Platform中使用源数据,还必须创建目标架构以根据您的需求构建源数据。 然后,目标架构用于创建包含源数据的Platform数据集。

如果您希望使用Experience Platform中的用户界面,架构编辑器教程将提供在架构编辑器中执行类似操作的分步说明。

API格式

POST /schemaregistry/tenant/schemas

请求

以下示例请求创建一个XDM架构,用于扩展XDM Individual Profile类。

curl -X POST \
    'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
    "type": "object",
    "title": "Sample Demo Profile XDM {{$guid}}",
    "description": "",
    "allOf": [
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-work-details"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-subscriptions"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/identitymap"
        }
    ],
    "meta:containerId": "tenant",
    "meta:resourceType": "schemas",
    "meta:xdmType": "object",
    "meta:class": "https://ns.adobe.com/xdm/context/profile"
}'

响应

成功的响应会返回新创建架构的详细信息,包括其唯一标识符($id)。 在下一步中需要此ID才能创建源连接。

{
    "$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e15530faf88aeb52d9ca5c5671a059f44f1a42ea7f5fdb80",
    "meta:altId": "_{TENANT_ID}.schemas.e15530faf88aeb52d9ca5c5671a059f44f1a42ea7f5fdb80",
    "meta:resourceType": "schemas",
    "version": "1.0",
    "title": "Sample Demo Profile XDM 8d96a964-aad8-43c5-a73a-c8b9b1ccbfb1",
    "type": "object",
    "description": "",
    "allOf": [
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-work-details",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-subscriptions",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/identitymap",
            "type": "object",
            "meta:xdmType": "object"
        }
    ],
    "refs": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/context/profile",
        "https://ns.adobe.com/xdm/context/profile-subscriptions",
        "https://ns.adobe.com/xdm/context/identitymap",
        "https://ns.adobe.com/xdm/context/profile-work-details"
    ],
    "imsOrg": "{IMS_ORG}",
    "meta:extensible": false,
    "meta:abstract": false,
    "meta:extends": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/common/auditable",
        "https://ns.adobe.com/xdm/data/record",
        "https://ns.adobe.com/xdm/context/profile",
        "https://ns.adobe.com/xdm/context/profile-subscriptions",
        "https://ns.adobe.com/xdm/context/identitymap",
        "https://ns.adobe.com/xdm/context/profile-work-details"
    ],
    "meta:xdmType": "object",
    "meta:registryMetadata": {
        "repo:createdDate": 1584673864341,
        "repo:lastModifiedDate": 1584673864341,
        "xdm:createdClientId": "{CREATED_CLIENT_ID}",
        "xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
        "xdm:createdUserId": "{CREATED_USER_ID}",
        "xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
        "eTag": "fa704f80da907c8f0f66f453ffcac3e52958687edbf55d71231dc5e1522193c4"
    },
    "meta:class": "https://ns.adobe.com/xdm/context/profile",
    "meta:containerId": "tenant",
    "meta:tenantNamespace": "_{TENANT_ID}"
}

创建源连接

现在,创建目标XDM架构后,可以使用对Flow Service API的POST请求创建源连接。 源连接由API连接、源数据格式以及对上一步中检索到的目标XDM架构的引用组成。

API格式

POST /sourceConnections

请求

curl -X POST \
    'http://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Source Connection S3 {{$guid}}",
        "baseConnectionId": "5831c52c-c261-4945-b1c5-2cc261d945b2",
        "connectionSpec": {
            "id": "ecadc60c-7455-4d87-84dc-2a0e293d997b",
            "version": 1
        },
        "data": {
            "format": "parquet_xdm",
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/e15530faf88aeb52d9ca5c5671a059f44f1a42ea7f5fdb80",
                "id": "",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "path": "partners-demo/samples",
            "recursive": "true"
        }
    }'
属性 描述
baseConnectionId 代表云存储的API的连接。
data.schema.id ($id)(如果在上一步中检索到目标xdm架构)。
params.path 源文件的路径。

响应

成功的响应会返回新创建源连接的唯一标识符(id)。 按照以后创建目标连接的步骤中的要求存储此值。

{
    "id": "73bc8911-505a-4e46-bc89-11505a6e466f",
    "etag": "\"c4004435-0000-0200-0000-5e7437d90000\""
}

创建数据集基础连接

要将外部数据摄取到Platform中,必须首先获取Experience Platform数据集基础连接。

要创建数据集基础连接,请按照数据集基础连接教程中所述的步骤操作。

在创建数据集基础连接之前,请继续执行开发人员指南中列出的步骤。 获取并存储唯一标识符($id),然后在下一步中继续将其用作基本连接ID以创建目标连接。

创建目标数据集

通过向Catalog Service API执行POST请求,并提供有效负载中目标架构的ID,可以创建目标数据集。

API格式

POST /catalog/dataSets

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Leads Dataset {{$guid}}",
        "schemaRef": {
            "id": ""https://ns.adobe.com/{TENANT_ID}/schemas/e15530faf88aeb52d9ca5c5671a059f44f1a42ea7f5fdb80"",
            "contentType": "application/vnd.adobe.xed-full-notext+json; version=1"
        }
    }'
属性 描述
schemaRef.id 目标XDM架构的ID。

响应

成功的响应会返回一个数组,其中包含格式为"@/datasets/{DATASET_ID}"的新创建数据集的ID。 数据集ID是由系统生成的只读字符串,用于在API调用中引用数据集。 按照后续步骤创建目标连接和数据流所需的方式存储目标数据集ID。

[
    "@/dataSets/5e7439b1ad55a618ad4c5102"
]

创建目标连接

现在,您拥有数据集基础连接、目标架构和目标数据集的唯一标识符。 使用这些标识符,您可以使用Flow Service API创建目标连接,以指定将包含集客源数据的数据集。

API格式

POST /targetConnections

请求

curl -X POST \
    'http://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "baseConnectionId": "291257e3-c560-4e07-9257-e3c5606e07d1",
        "connectionSpec": {
            "id":"c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        },
        "name": "Target Connection {{$guid}}",
        "data": {
            "format": "parquet_xdm",
            "schema": {
                "id": ""https://ns.adobe.com/{TENANT_ID}/schemas/e15530faf88aeb52d9ca5c5671a059f44f1a42ea7f5fdb80"",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "5e7439b1ad55a618ad4c5102"
        }
    }'
属性 描述
baseConnectionId 数据集基础连接的ID。
data.schema.id 目标XDM架构的$id
params.dataSetId 目标数据集的ID。
connectionSpec.id 云存储的连接规范ID。

响应

成功的响应会返回新目标连接的唯一标识符(id)。 按照后续步骤中的要求存储此值。

{
    "id": "9b3abc95-f2e9-47c1-babc-95f2e927c1ec",
    "etag": "\"7501936b-0000-0200-0000-5e743bcc0000\""
}

创建数据流

从第三方云存储中摄取Parquet数据的最后一步是创建数据流。 现在,您已准备以下必需值:

数据流负责从源中调度和收集数据。 通过在有效负载中提供先前提到的值时执行POST请求,可以创建数据流。

API格式

POST /flows

请求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Demo Parquet Ingestion Flow {{$guid}}",
        "flowSpec": {
            "id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "73bc8911-505a-4e46-bc89-11505a6e466f"
        ],
        "targetConnectionIds": [
            "9b3abc95-f2e9-47c1-babc-95f2e927c1ec"
        ],
        "scheduleParams": {
            "startTime": {{$timestamp}},
            "frequency": "minute",
            "interval": 1000,
            "backfill": true
        }
    }'
属性 描述
sourceConnectionIds 在前面的步骤中检索到的源连接ID。
targetConnectionIds 在前面的步骤中检索到的目标连接ID。

响应

成功的响应会返回新创建数据流的ID(id)。

{
    "id": "89ff50ef-b082-426e-bf50-efb082d26e78",
    "etag": "\"890070b8-0000-0200-0000-5e743c040000\""
}

后续步骤

在本教程中,您创建了一个源连接器,用于按计划从第三方云存储系统中收集Parquet数据。 现在,下游Platform服务(如Real-time Customer Profile和Data Science Workspace)可使用传入数据。 有关更多详细信息,请参阅以下文档:

在此页面上