使用來源連接器和API收集串流資料

Flow Service 用於收集和集中Adobe Experience Platform內不同來源的客戶資料。該服務提供用戶介面和REST風格的API,所有支援的源都可從中連接。

本教學課程涵蓋從串流來源連接器擷取資料,並使用Flow Service API將其匯入Experience Platform的步驟。

快速入門

本教學課程要求您必須擁有串流連接器的有效連線ID。 如果您沒有此資訊,請先參閱下列有關建立串流來源連線的教學課程,然後再嘗試本教學課程:

本教學課程還要求您對Adobe Experience Platform的以下部分有切實的瞭解:

  • Experience Data Model (XDM) System:Experience Platform組織客戶體驗資料的標準化架構。
    • 架構構成基礎:瞭解XDM架構的基本建置區塊,包括架構組合的主要原則和最佳實務。
    • 架構註冊開發人員指南:包含您必須知道的重要資訊,以便成功執行對架構註冊表API的呼叫。這包括您的{TENANT_ID}、"containers"的概念,以及提出要求時所需的標題(請特別注意「接受」標題及其可能的值)。
  • Catalog Service:目錄是記錄資料位置和世系的系統 Experience Platform。
  • Streaming ingestion:串流擷取 Platform 為使用者提供從用戶端和伺服器端裝置即時傳送資料 Experience Platform 的方法。
  • 沙盒: Experience Platform 提供虛擬沙盒,可將單一執行個體分 Platform 割為不同的虛擬環境,以協助開發和發展數位體驗應用程式。

以下各節提供您需要瞭解的其他資訊,以便使用Flow Service API成功收集串流資料。

讀取範例API呼叫

本教學課程提供範例API呼叫,以示範如何設定請求的格式。 這些包括路徑、必要標題和正確格式化的請求負載。 也提供API回應中傳回的範例JSON。 如需範例API呼叫檔案中所用慣例的詳細資訊,請參閱Experience Platform疑難排解指南中如何讀取範例API呼叫一節。

收集必要標題的值

若要呼叫Platform API,您必須先完成驗證教學課程。 完成驗證教學課程後,所有Experience Platform API呼叫中每個所需標題的值都會顯示在下面:

  • Authorization: Bearer {ACCESS_TOKEN}
  • x-api-key: {API_KEY}
  • x-gw-ims-org-id: {IMS_ORG}

Experience Platform中的所有資源(包括屬於Flow Service的資源)都隔離到特定的虛擬沙盒。 對Platform API的所有請求都需要一個標題,該標題指定要在中執行操作的沙盒的名稱:

  • x-sandbox-name: {SANDBOX_NAME}

所有包含裝載(POST、PUT、PATCH)的請求都需要附加的媒體類型標題:

  • Content-Type: application/json

建立源連接

您可以通過向Flow Service API發出POST請求來建立源連接。 源連接由連接ID、源資料檔案的路徑和連接規範ID組成。

要建立源連接,還必須為資料格式屬性定義枚舉值。

對基於檔案的連接器使用以下枚舉值:

資料格式 列舉值
分隔字元 delimited
JSON json
鑲木 parquet

對於所有基於表的連接器,請將值設定為tabular

API格式

POST /sourceConnections

請求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Test source connector for streaming data",
        "providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
        "connectionId": "f6aa6c58-3c3d-4c59-aa6c-583c3d6c599c",
        "description": "Test source connector for streaming data",
        "data": {
            "format": "delimited"
        },
            "connectionSpec": {
            "id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
            "version": "1.0"
        }
    }'
屬性 說明
providerId 串流連接器的提供者ID。
connectionId 串流連接器的唯一連線ID。
connectionSpec.id 與特定串流連接器關聯的連接規範ID。

回應

成功的響應返回新建源連接的唯一標識符(id)。 在後續步驟中需要此ID才能建立資料流。

{
    "id": "2abd97c4-91bb-4c93-bd97-c491bbfc933d",
    "etag": "\"66013508-0000-0200-0000-5f6e2ae70000\""
}

建立目標XDM模式

要使用Platform中的源資料,必須建立目標模式,以根據您的需要構建源資料。 然後,目標模式用於建立包含源資料的Platform資料集。 此目標XDM模式還擴展了XDM Individual Profile類。

通過對方案註冊表API執行POST請求,可以建立目標XDM方案。

API格式

POST /tenant/schemas

請求

以下示例請求建立一個XDM模式以擴展XDM Individual Profile類。

curl -X POST \
    'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "type": "object",
        "title": "Sample schema for a streaming connector",
        "description": "Sample schema for a streaming connector",
        "allOf": [
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
            }
        ],
        "meta:containerId": "tenant",
        "meta:resourceType": "schemas",
        "meta:xdmType": "object",
        "meta:class": "https://ns.adobe.com/xdm/context/profile"
    }'

回應

成功的響應返回新建立的架構的詳細資訊,包括其唯一標識符($id)。 在後續步驟中需要此ID,才能建立目標資料集、對應和資料流。

{
    "$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
    "meta:altId": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
    "meta:resourceType": "schemas",
    "version": "1.0",
    "title": "Sample schema for a streaming connector",
    "type": "object",
    "description": "Sample schema for a streaming connector",
    "allOf": [
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
            "type": "object",
            "meta:xdmType": "object"
        }
    ],
    "refs": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "imsOrg": "{IMS_ORG}",
    "meta:extensible": false,
    "meta:abstract": false,
    "meta:extends": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/common/auditable",
        "https://ns.adobe.com/xdm/data/record",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "meta:xdmType": "object",
    "meta:registryMetadata": {
        "repo:createdDate": 1604960074752,
        "repo:lastModifiedDate": 1604960074752,
        "xdm:createdClientId": "{CREATED_CLIENT_ID}",
        "xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
        "xdm:createdUserId": "{CREATED_USER_ID}",
        "xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
        "eTag": "8522a151effd974429518ed90c3eaf6efc9bf6ffb6644087a85c6d4455dcd045",
        "meta:globalLibVersion": "1.16.1"
    },
    "meta:class": "https://ns.adobe.com/xdm/context/profile",
    "meta:containerId": "tenant",
    "meta:sandboxId": "{SANDBOX_ID}",
    "meta:sandboxType": "production",
    "meta:tenantNamespace": "_{TENANT_ID}"
}

建立目標資料集

通過對目錄服務API執行POST請求,提供裝載內目標方案的ID,可以建立目標資料集。

API格式

POST /catalog/dataSets

請求

curl -X POST \
    'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "schemaRef": {
            "id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
            "contentType": "application/vnd.adobe.xed-full-notext+json; version=1.1"
        },
        "fileDescription": {
            "format": "parquet"
        },
        "tags": {
            "identity": [
            "enabled:true"
            ],
            "profile": [
            "enabled:true"
            ]
        },
        "name": "Test streaming dataset"
    }'
屬性 說明
schemaRef.id 目標XDM架構的ID。

回應

成功的響應返回一個陣列,該陣列包含以"@/datasets/{DATASET_ID}"格式新建立的資料集的ID。 資料集ID是唯讀、系統產生的字串,用於在API呼叫中參考資料集。 在後續步驟中需要目標資料集ID才能建立目標連接和資料流。

[
    "@/dataSets/5f7187bac6d00f194fb937c0"
]

建立目標連接

目標連接表示到所收錄資料所在目的地的連接。 要建立目標連接,必須提供與資料庫關聯的固定連接規範ID。 此連接規範ID為:c604ff05-7f1a-43c0-8e18-33bf874cb11c

您現在擁有目標資料集的唯一識別碼、目標模式,以及與資料湖的連線規範ID。 使用這些標識符,可以使用Flow Service API建立目標連接,以指定將包含入站源資料的資料集。

API格式

POST /targetConnections

請求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Streaming target connection",
        "description": "Streaming target connection",
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        },
        "data": {
            "format": "parquet_xdm"
        },
        "params": {
        "dataSetId": "5f7187bac6d00f194fb937c0"
        }
    }'
屬性 說明
params.dataSetId 目標資料集的ID。
connectionSpec.id 用於連接到資料湖的連接規範ID。 此ID為:c604ff05-7f1a-43c0-8e18-33bf874cb11c

回應

成功的響應返回新目標連接的唯一標識符(id)。 後續步驟需要此ID。

{
    "id": "d9300194-6a82-4163-b001-946a821163b8",
    "etag": "\"4006d3e4-0000-0200-0000-5f7189220000\""
}

建立映射

為了將源資料引入目標資料集,必須首先將其映射到目標資料集所遵守的目標模式。 這是透過對轉換服務執行POST請求,並在請求裝載中定義資料映射來實現的。

API格式

POST /conversion/mappingSets

請求

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
        "xdmVersion": "1.0",
        "mappings": [
            {
                "destinationXdmPath": "person.name.firstName",
                "sourceAttribute": "firstName",
                "identity": false,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.lastName",
                "sourceAttribute": "lastName",
                "identity": false,
                "version": 0
            }
        ]
    }'
屬性 說明
xdmSchema 目標XDM架構的$id

回應

成功的響應返回新建立映射的詳細資訊,包括其唯一標識符(id)。 在後續步驟中需要此ID才能建立資料流。

{
    "id": "380b032b445a46008e77585e046efe5e",
    "version": 0,
    "createdDate": 1604960750613,
    "modifiedDate": 1604960750613,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

查找資料流規範

資料流負責從源收集資料並將其導入Platform。 要建立資料流,必須首先通過對Flow Service API執行GET請求來獲取資料流規範。 資料流規範負責從流連接器收集資料。
API格式

GET /flowSpecs?property=name=="Steam data with transformation"

請求

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="Steam data with transformation"' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

回應

成功的響應返回負責將流連接器中的資料帶入Platform的資料流規範的詳細資訊。 在下一步中需要此ID才能建立新的資料流。

{
    "items": [
        {
            "id": "c1a19761-d2c7-4702-b9fa-fe91f0613e81",
            "name": "Steam data with transformation",
            "providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
            "version": "1.0",
            "sourceConnectionSpecIds": [
                "d27d4907-7351-47dd-bbc2-05a04365703d",
                "51ae16c2-bdad-42fd-9fce-8d5dfddaf140",
                "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb"
            ],
            "targetConnectionSpecIds": [
                "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
            ],
            "transformationSpecs": [
                {
                    "name": "Mapping",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "description": "defines various params required for different mapping from Raw to XDM",
                        "properties": {
                            "mappingId": {
                                "type": "string"
                            }
                        },
                        "required": [
                            "mappingId"
                        ]
                    }
                }
            ],
            "attributes": {
                "uiAttributes": {
                    "apiFeatures": {
                        "deleteSupported": false,
                        "updateSupported": false,
                        "flowRunsSupported": false
                    }
                }
            },
            "permissionsInfo": {
                "view": [
                    {
                        "@type": "lowLevel",
                        "name": "StreamingSource",
                        "permissions": [
                            "read"
                        ]
                    }
                ],
                "manage": [
                    {
                        "@type": "lowLevel",
                        "name": "StreamingSource",
                        "permissions": [
                            "write"
                        ]
                    }
                ]
            }
        }
    ]
}

建立資料流

收集串流資料的最後一步是建立資料流。 目前,您已準備好下列必要值:

資料流負責調度和收集源中的資料。 您可以通過執行POST請求,同時在裝載中提供先前提到的值來建立資料流。

API格式

POST /flows

請求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Streaming dataflow",
        "description": "Streaming dataflow",
        "flowSpec": {
            "id": "c1a19761-d2c7-4702-b9fa-fe91f0613e81",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "2abd97c4-91bb-4c93-bd97-c491bbfc933d"
        ],
        "targetConnectionIds": [
            "723222e2-6ab9-4b0b-b222-e26ab9bb0bc2"
        ],
        "transformations": [
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "380b032b445a46008e77585e046efe5e",
                    "mappingVersion": 0
                }
            }
        ]
    }'
屬性 說明
flowSpec.id 在上一步驟中檢索的流規範ID
sourceConnectionIds 在先前步驟中檢索的源連接ID
targetConnectionIds 在先前步驟中擷取的目標連線ID
transformations.params.mappingId 在先前步驟中檢索的映射ID

回應

成功的響應返回新建立的資料流的ID(id)。

{
    "id": "1f086c23-2ea8-4d06-886c-232ea8bd061d",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

後續步驟

在本教學課程中,您已建立資料流,以從串流連接器收集串流資料。 現在,下游Platform服務(例如Real-time Customer Profile和Data Science Workspace)可以使用傳入的資料。 如需詳細資訊,請參閱下列檔案:

本頁內容

Adobe Summit Banner

A virtual event April 27-28.

Expand your skills and get inspired.

Register for free
Adobe Summit Banner

A virtual event April 27-28.

Expand your skills and get inspired.

Register for free