使用為雲儲存源建立資料流 Flow Service API

本教學課程涵蓋從雲端儲存空間來源擷取資料,以及使用 Flow Service API.

注意

要建立資料流,您必須具有與雲儲存源的有效基本連接ID。 如果您沒有此ID,請參閱 來源概觀 以取得可建立基本連線的雲儲存來源清單。

快速入門

本教學課程需要您妥善了解下列Adobe Experience Platform元件:

  • Experience Data Model (XDM) System:Experience Platform組織客戶體驗資料的標準化架構。
    • 結構構成基本概念:了解XDM結構描述的基本建置組塊,包括結構描述的主要原則和最佳實務。
    • Schema Registry開發人員指南:包括您必須知道的重要資訊,才能成功執行對結構註冊表API的呼叫。 這包括 {TENANT_ID}、「容器」的概念,以及提出要求所需的標題(請特別注意「接受」標題及其可能的值)。
  • Catalog Service:目錄是記錄Experience Platform內資料位置和世系的系統。
  • Batch ingestion:批次內嵌API可讓您將資料以批次檔案的形式內嵌至Experience Platform。
  • 沙箱:Experience Platform提供可將單一Platform執行個體分割成個別虛擬環境的虛擬沙箱,以協助開發及改進數位體驗應用程式。

使用平台API

如需如何成功呼叫Platform API的詳細資訊,請參閱 Platform API快速入門.

建立源連接

您可以透過向 sourceConnections 端點 Flow Service 提供基本連接ID時的API、要獲取的源檔案的路徑,以及源的相應連接規範ID。

建立來源連線時,您也必須為資料格式屬性定義列舉值。

為檔案型來源使用下列列舉值:

資料格式 列舉值
分隔 delimited
JSON json
鑲木 parquet

對於所有基於表的源,將值設定為 tabular.

API格式

POST /sourceConnections

要求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Cloud Storage source connection",
        "description: "Source connection for a cloud storage source",
        "baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
        "data": {
            "format": "delimited",
            "properties": {
                "columnDelimiter": "{COLUMN_DELIMITER}",
                "encoding": "{ENCODING}"
                "compressionType": "{COMPRESSION_TYPE}"
            }
        },
        "params": {
            "path": "/acme/summerCampaign/account.csv",
            "type": "file"
        },
        "connectionSpec": {
            "id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
            "version": "1.0"
        }
    }'
屬性 說明
baseConnectionId 雲儲存源的基本連接ID。
data.format 您要帶入Platform的資料格式。 支援的值包括: delimited, JSON,和 parquet.
data.properties (選用)建立來源連線時可套用至資料的一組屬性。
data.properties.columnDelimiter (可選)收集一般檔案時可指定的單字元欄分隔字元。 任何單一字元值都是允許的欄分隔字元。 若未提供,請使用逗號(,)作為預設值。 附註:此 columnDelimiter 只有擷取分隔檔案時,才能使用屬性。
data.properties.encoding (選用)定義將資料擷取至Platform時所要使用的編碼類型的屬性。 支援的編碼類型為: UTF-8ISO-8859-1. 附註:此 encoding 只有擷取分隔的CSV檔案時,才可使用參數。 其他檔案類型將會以預設編碼擷取, UTF-8.
data.properties.compressionType (選用)定義擷取之壓縮檔案類型的屬性。 支援的壓縮檔案類型為: bzip2, gzip, deflate, zipDeflate, tarGzip,和 tar. 附註:此 compressionType 擷取分隔或JSON檔案時,只能使用屬性。
params.path 要訪問的源檔案的路徑。 此參數指向單個檔案或整個資料夾。 附註:您可以使用星號來取代檔案名稱,以指定擷取整個資料夾。 例如: /acme/summerCampaign/*.csv 會將整個 /acme/summerCampaign/ 檔案夾。
params.type 您要擷取的來源資料檔案的檔案類型。 使用類型 file 擷取個別檔案並使用類型 folder 內嵌整個資料夾。
connectionSpec.id 與特定雲儲存源關聯的連接規範ID。 請參閱 附錄 以獲取連接規範ID的清單。

回應

成功的回應會傳回唯一識別碼(id)。 在後續步驟中需要此ID才能建立資料流。

{
    "id": "26b53912-1005-49f0-b539-12100559f0e2",
    "etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}

建立目標XDM結構

為了在Platform中使用來源資料,必須建立目標架構,以根據您的需求來建構來源資料。 然後,目標架構會用來建立包含來源資料的Platform資料集。

您可以透過執行POST要求來建立目標XDM結構 結構註冊表API.

如需建立Target XDM結構的詳細步驟,請參閱 使用API建立結構.

建立目標資料集

目標資料集的建立方式,是透過對 目錄服務API,提供裝載中目標架構的ID。

如需如何建立目標資料集的詳細步驟,請參閱 使用API建立資料集.

建立目標連線

目標連線代表所擷取資料所登陸之目的地的連線。 要建立目標連接,必須提供與Data Lake關聯的固定連接規範ID。 此連接規範ID為: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

您現在擁有目標結構的唯一識別碼、目標資料集,以及Data Lake的連線規格ID。 使用這些識別碼,您可以使用 Flow Service API,指定包含傳入來源資料的資料集。

API格式

POST /targetConnections

要求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Target Connection for a Cloud Storage connector",
        "description": "Target Connection for a Cloud Storage connector",
        "data": {
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "5f3c3cedb2805c194ff0b69a"
        },
            "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        }
    }'
屬性 說明
data.schema.id $id 目標XDM架構的區段。
data.schema.version 結構的版本。 必須設定此值 application/vnd.adobe.xed-full+json;version=1,會傳回結構的最新次要版本。
params.dataSetId 目標資料集的ID。
connectionSpec.id 資料湖的固定連接規範ID。 此ID為: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

回應

成功的回應會傳回新目標連線的唯一識別碼(id)。 後續步驟需要此ID。

{
    "id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

建立對應

若要將來源資料內嵌至目標資料集,必須先將其對應至目標資料集所遵守的目標架構。

若要建立對應集,請向 mappingSets 端點 Data Prep API 提供目標XDM架構時 $id 以及您要建立之對應集的詳細資訊。

秘訣

您可以使用雲端儲存空間來源連接器,對應JSON檔案中的陣列等複雜資料類型。

API格式

POST /conversion/mappingSets

要求

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
        "xdmVersion": "1.0",
        "id": null,
        "mappings": [
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "Id",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.firstName",
                "sourceAttribute": "FirstName",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.lastName",
                "sourceAttribute": "LastName",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            }
        ]
    }'
屬性 說明
xdmSchema 目標XDM結構的ID。

回應

成功的回應會傳回新建立之對應的詳細資訊,包括其唯一識別碼(id)。 在以後的步驟中需要此值才能建立資料流。

{
    "id": "bf5286a9c1ad4266baca76ba3adc9366",
    "version": 0,
    "createdDate": 1597784069368,
    "modifiedDate": 1597784069368,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

檢索資料流規範

資料流負責收集來源的資料,並將其導入Platform。 要建立資料流,必須首先獲得負責收集雲儲存資料的資料流規範。

API格式

GET /flowSpecs?property=name=="CloudStorageToAEP"

要求

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'
注意

以下的JSON回應裝載會隱藏,以便簡潔。 選取「裝載」以查看回應裝載。

 檢視裝載

回應

成功的響應返回負責將源資料帶入平台的資料流規範的詳細資訊。 回應包含唯一流量規格 id 建立新資料流所需。

{
  "id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
  "name": "CloudStorageToAEP",
  "providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
  "version": "1.0",
  "attributes": {
    "isSourceFlow": true,
    "flacValidationSupported": true,
    "frequency": "batch",
    "notification": {
      "category": "sources",
      "flowRun": {
        "enabled": true
      }
    }
  },
  "sourceConnectionSpecIds": [
    "b3ba5556-48be-44b7-8b85-ff2b69b46dc4",
    "ecadc60c-7455-4d87-84dc-2a0e293d997b",
    "b7829c2f-2eb0-4f49-a6ee-55e33008b629",
    "4c10e202-c428-4796-9208-5f1f5732b1cf",
    "fb2e94c9-c031-467d-8103-6bd6e0a432f2",
    "32e8f412-cdf7-464c-9885-78184cb113fd",
    "b7bf2577-4520-42c9-bae9-cad01560f7bc",
    "998b8ae3-cec0-43b7-8abe-40b1eb4ee069",
    "be5ec48c-5b78-49d5-b8fa-7c89ec4569b8",
    "54e221aa-d342-4707-bcff-7a4bceef0001",
    "c85f9425-fb21-426c-ad0b-405e9bd8a46c",
    "26f526f2-58f4-4712-961d-e41bf1ccc0e8"
  ],
  "targetConnectionSpecIds": [
    "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
  ],
  "permissionsInfo": {
    "view": [
      {
        "@type": "lowLevel",
        "name": "EnterpriseSource",
        "permissions": [
          "read"
        ]
      }
    ],
    "manage": [
      {
        "@type": "lowLevel",
        "name": "EnterpriseSource",
        "permissions": [
          "write"
        ]
      }
    ]
  },
  "optionSpec": {
    "name": "OptionSpec",
    "spec": {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "properties": {
        "errorDiagnosticsEnabled": {
          "title": "Error diagnostics.",
          "description": "Flag to enable detailed and sample error diagnostics summary.",
          "type": "boolean",
          "default": false
        },
        "partialIngestionPercent": {
          "title": "Partial ingestion threshold.",
          "description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
          "type": "number",
          "exclusiveMinimum": 0
        }
      }
    }
  },
  "scheduleSpec": {
    "name": "PeriodicSchedule",
    "type": "Periodic",
    "spec": {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "properties": {
        "startTime": {
          "description": "epoch time",
          "type": "integer"
        },
        "frequency": {
          "type": "string",
          "enum": [
            "once",
            "minute",
            "hour",
            "day",
            "week"
          ]
        },
        "interval": {
          "type": "integer"
        },
        "backfill": {
          "type": "boolean",
          "default": true
        }
      },
      "required": [
        "startTime",
        "frequency"
      ],
      "if": {
        "properties": {
          "frequency": {
            "const": "once"
          }
        }
      },
      "then": {
        "allOf": [
          {
            "not": {
              "required": [
                "interval"
              ]
            }
          },
          {
            "not": {
              "required": [
                "backfill"
              ]
            }
          }
        ]
      },
      "else": {
        "required": [
          "interval"
        ],
        "if": {
          "properties": {
            "frequency": {
              "const": "minute"
            }
          }
        },
        "then": {
          "properties": {
            "interval": {
              "minimum": 15
            }
          }
        },
        "else": {
          "properties": {
            "interval": {
              "minimum": 1
            }
          }
        }
      }
    }
  },
  "transformationSpec": [
    {
      "name": "Mapping",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "description": "defines various params required for different mapping from source to target",
        "properties": {
          "mappingId": {
            "type": "string"
          },
          "mappingVersion": {
            "type": "string"
          }
        }
      }
    }
  ],
  "runSpec": {
      "name": "ProviderParams",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "description": "defines various params required for creating flow run.",
        "properties": {
          "startTime": {
            "type": "integer",
            "description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
          },
          "windowStartTime": {
            "type": "integer",
            "description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
          },
          "windowEndTime": {
            "type": "integer",
            "description": "An integer that defines the end time of the window against which data is to be pulled.  The value is represented in Unix epoch time."
          }
        },
        "required": [
          "startTime",
          "windowStartTime",
          "windowEndTime"
        ]
      }
    }
}

建立資料流

收集雲儲存資料的最後一步是建立資料流。 您現在已準備下列必要值:

資料流負責從源中調度和收集資料。 您可以在裝載中提供先前提及的值時,執行POST要求來建立資料流。

注意

對於批次內嵌,每個隨後的資料流都會根據檔案的來源選擇要內嵌的檔案 上次修改 時間戳記。 這意味著批處理資料流從源中選擇自上次資料流運行以來新建或修改的檔案。

若要排程擷取,您必須先將開始時間值設為紀元時間(以秒為單位)。 然後,您必須將頻率值設定為以下五個選項之一: once, minute, hour, day,或 week. 間隔值指定兩個連續擷取之間的期間,並建立一次性擷取不需要設定間隔。 對於所有其他頻率,間隔值必須設定為等於或大於 15.

重要

強烈建議在使用 FTP連接器.

API格式

POST /flows

要求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Cloud Storage flow to Platform",
        "description": "Cloud Storage flow to Platform",
        "flowSpec": {
            "id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "26b53912-1005-49f0-b539-12100559f0e2"
        ],
        "targetConnectionIds": [
            "f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
        ],
        "transformations": [
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
                    "mappingVersion": 0
                }
            }
        ],
        "scheduleParams": {
            "startTime": "1597784298",
            "frequency":"minute",
            "interval":"30"
        }
    }'
屬性 說明
flowSpec.id 流規格ID 在上一步驟中擷取。
sourceConnectionIds 源連接ID 在先前步驟中擷取。
targetConnectionIds 目標連線ID 在先前步驟中擷取。
transformations.params.mappingId 對應ID 在先前步驟中擷取。
scheduleParams.startTime 資料流的開始時間(以Epoch時間表示)。
scheduleParams.frequency 資料流收集資料的頻率。 可接受的值包括: once, minute, hour, day,或 week.
scheduleParams.interval 該間隔指定兩個連續流運行之間的週期。 間隔的值應為非零整數。 頻率設為時不需要間隔 once 且應大於或等於 15 的其他頻率值。

回應

成功的回應會傳回ID(id)。

{
    "id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

監視資料流

建立資料流後,您可以監視正在通過資料流進行內嵌的資料,以查看有關流運行、完成狀態和錯誤的資訊。 有關如何監視資料流的詳細資訊,請參閱 監控API中的資料流

後續步驟

依照本教學課程,您已建立來源連接器,可依排程從雲端儲存空間收集資料。 下游Platform服務(例如 Real-time Customer Profile 和 Data Science Workspace. 如需詳細資訊,請參閱下列檔案:

附錄

下節列出了不同的雲儲存源連接器及其連接規範。

連接規範

連接器名稱 連接規格
Amazon S3 (S3) ecadc60c-7455-4d87-84dc-2a0e293d997b
Amazon Kinesis (Kinesis) 86043421-563b-46ec-8e6c-e23184711bf6
Azure Blob (Blob) 4c10e202-c428-4796-9208-5f1f5732b1cf
Azure Data Lake Storage Gen2 (ADLS第2代) b3ba5556-48be-44b7-8b85-ff2b69b46dc4
Azure Event Hubs (事件中心) bf9f5905-92b7-48bf-bf20-455bc6b60a4e
Azure File Storage be5ec48c-5b78-49d5-b8fa-7c89ec4569b8
Google Cloud Storage 32e8f412-cdf7-464c-9885-78184cb113fd
HDFS 54e221aa-d342-4707-bcff-7a4bceef0001
Oracle Object Storage c85f9425-fb21-426c-ad0b-405e9bd8a46c
SFTP bf367b0d-3d9b-4060-b67b-0d3d9bd06094

本頁內容