本教學課程涵蓋從雲端儲存空間來源擷取資料,以及使用 Flow Service API.
要建立資料流,您必須具有與雲儲存源的有效基本連接ID。 如果您沒有此ID,請參閱 來源概觀 以取得可建立基本連線的雲儲存來源清單。
本教學課程需要您妥善了解下列Adobe Experience Platform元件:
{TENANT_ID}
、「容器」的概念,以及提出要求所需的標題(請特別注意「接受」標題及其可能的值)。如需如何成功呼叫Platform API的詳細資訊,請參閱 Platform API快速入門.
您可以透過向 sourceConnections
端點 Flow Service 提供基本連接ID時的API、要獲取的源檔案的路徑,以及源的相應連接規範ID。
建立來源連線時,您也必須為資料格式屬性定義列舉值。
為檔案型來源使用下列列舉值:
資料格式 | 列舉值 |
---|---|
分隔 | delimited |
JSON | json |
鑲木 | parquet |
對於所有基於表的源,將值設定為 tabular
.
API格式
POST /sourceConnections
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}",
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
屬性 | 說明 |
---|---|
baseConnectionId |
雲儲存源的基本連接ID。 |
data.format |
您要帶入Platform的資料格式。 支援的值包括: delimited , JSON ,和 parquet . |
data.properties |
(選用)建立來源連線時可套用至資料的一組屬性。 |
data.properties.columnDelimiter |
(可選)收集一般檔案時可指定的單字元欄分隔字元。 任何單一字元值都是允許的欄分隔字元。 若未提供,請使用逗號(, )作為預設值。 附註:此 columnDelimiter 只有擷取分隔檔案時,才能使用屬性。 |
data.properties.encoding |
(選用)定義將資料擷取至Platform時所要使用的編碼類型的屬性。 支援的編碼類型為: UTF-8 和 ISO-8859-1 . 附註:此 encoding 只有擷取分隔的CSV檔案時,才可使用參數。 其他檔案類型將會以預設編碼擷取, UTF-8 . |
data.properties.compressionType |
(選用)定義擷取之壓縮檔案類型的屬性。 支援的壓縮檔案類型為: bzip2 , gzip , deflate , zipDeflate , tarGzip ,和 tar . 附註:此 compressionType 擷取分隔或JSON檔案時,只能使用屬性。 |
params.path |
要訪問的源檔案的路徑。 此參數指向單個檔案或整個資料夾。 附註:您可以使用星號來取代檔案名稱,以指定擷取整個資料夾。 例如: /acme/summerCampaign/*.csv 會將整個 /acme/summerCampaign/ 檔案夾。 |
params.type |
您要擷取的來源資料檔案的檔案類型。 使用類型 file 擷取個別檔案並使用類型 folder 內嵌整個資料夾。 |
connectionSpec.id |
與特定雲儲存源關聯的連接規範ID。 請參閱 附錄 以獲取連接規範ID的清單。 |
回應
成功的回應會傳回唯一識別碼(id
)。 在後續步驟中需要此ID才能建立資料流。
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
建立來源連線時,您可以使用規則運算式,將特定一組檔案從來源內嵌至Platform。
API格式
POST /sourceConnections
要求
在下列範例中,檔案路徑中會使用規則運算式,以指定擷取所有具有 premium
以他們的名字。
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/*premium*.csv",
"type": "folder"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
建立來源連線時,您可以使用 recursive
參數,從深度巢狀資料夾內嵌資料。
API格式
POST /sourceConnections
要求
在以下範例中, recursive: true
參數通知 Flow Service 擷取程式期間遞回讀取所有子資料夾。
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source with recursive ingestion",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/customers/premium/buyers/recursive",
"type": "folder",
"recursive": true
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
為了在Platform中使用來源資料,必須建立目標架構,以根據您的需求來建構來源資料。 然後,目標架構會用來建立包含來源資料的Platform資料集。
您可以透過執行POST要求來建立目標XDM結構 結構註冊表API.
如需建立Target XDM結構的詳細步驟,請參閱 使用API建立結構.
目標資料集的建立方式,是透過對 目錄服務API,提供裝載中目標架構的ID。
如需如何建立目標資料集的詳細步驟,請參閱 使用API建立資料集.
目標連線代表所擷取資料所登陸之目的地的連線。 要建立目標連接,必須提供與Data Lake關聯的固定連接規範ID。 此連接規範ID為: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
您現在擁有目標結構的唯一識別碼、目標資料集,以及Data Lake的連線規格ID。 使用這些識別碼,您可以使用 Flow Service API,指定包含傳入來源資料的資料集。
API格式
POST /targetConnections
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
屬性 | 說明 |
---|---|
data.schema.id |
此 $id 目標XDM架構的區段。 |
data.schema.version |
結構的版本。 必須設定此值 application/vnd.adobe.xed-full+json;version=1 ,會傳回結構的最新次要版本。 |
params.dataSetId |
目標資料集的ID。 |
connectionSpec.id |
資料湖的固定連接規範ID。 此ID為: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
回應
成功的回應會傳回新目標連線的唯一識別碼(id
)。 後續步驟需要此ID。
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
若要將來源資料內嵌至目標資料集,必須先將其對應至目標資料集所遵守的目標架構。
若要建立對應集,請向 mappingSets
端點 Data Prep API 提供目標XDM架構時 $id
以及您要建立之對應集的詳細資訊。
您可以使用雲端儲存空間來源連接器,對應JSON檔案中的陣列等複雜資料類型。
API格式
POST /conversion/mappingSets
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
屬性 | 說明 |
---|---|
xdmSchema |
目標XDM結構的ID。 |
回應
成功的回應會傳回新建立之對應的詳細資訊,包括其唯一識別碼(id
)。 在以後的步驟中需要此值才能建立資料流。
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
資料流負責收集來源的資料,並將其導入Platform。 要建立資料流,必須首先獲得負責收集雲儲存資料的資料流規範。
API格式
GET /flowSpecs?property=name=="CloudStorageToAEP"
要求
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
以下的JSON回應裝載會隱藏,以便簡潔。 選取「裝載」以查看回應裝載。
回應
成功的響應返回負責將源資料帶入平台的資料流規範的詳細資訊。 回應包含唯一流量規格 id
建立新資料流所需。
{
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"name": "CloudStorageToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"attributes": {
"isSourceFlow": true,
"flacValidationSupported": true,
"frequency": "batch",
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"sourceConnectionSpecIds": [
"b3ba5556-48be-44b7-8b85-ff2b69b46dc4",
"ecadc60c-7455-4d87-84dc-2a0e293d997b",
"b7829c2f-2eb0-4f49-a6ee-55e33008b629",
"4c10e202-c428-4796-9208-5f1f5732b1cf",
"fb2e94c9-c031-467d-8103-6bd6e0a432f2",
"32e8f412-cdf7-464c-9885-78184cb113fd",
"b7bf2577-4520-42c9-bae9-cad01560f7bc",
"998b8ae3-cec0-43b7-8abe-40b1eb4ee069",
"be5ec48c-5b78-49d5-b8fa-7c89ec4569b8",
"54e221aa-d342-4707-bcff-7a4bceef0001",
"c85f9425-fb21-426c-ad0b-405e9bd8a46c",
"26f526f2-58f4-4712-961d-e41bf1ccc0e8"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
},
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"transformationSpec": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"runSpec": {
"name": "ProviderParams",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for creating flow run.",
"properties": {
"startTime": {
"type": "integer",
"description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
},
"windowStartTime": {
"type": "integer",
"description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"windowEndTime": {
"type": "integer",
"description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
}
},
"required": [
"startTime",
"windowStartTime",
"windowEndTime"
]
}
}
}
收集雲儲存資料的最後一步是建立資料流。 您現在已準備下列必要值:
資料流負責從源中調度和收集資料。 您可以在裝載中提供先前提及的值時,執行POST要求來建立資料流。
對於批次內嵌,每個隨後的資料流都會根據檔案的來源選擇要內嵌的檔案 上次修改 時間戳記。 這意味著批處理資料流從源中選擇自上次資料流運行以來新建或修改的檔案。
若要排程擷取,您必須先將開始時間值設為紀元時間(以秒為單位)。 然後,您必須將頻率值設定為以下五個選項之一: once
, minute
, hour
, day
,或 week
. 間隔值指定兩個連續擷取之間的期間,並建立一次性擷取不需要設定間隔。 對於所有其他頻率,間隔值必須設定為等於或大於 15
.
強烈建議在使用 FTP連接器.
API格式
POST /flows
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Platform",
"description": "Cloud Storage flow to Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
屬性 | 說明 |
---|---|
flowSpec.id |
此 流規格ID 在上一步驟中擷取。 |
sourceConnectionIds |
此 源連接ID 在先前步驟中擷取。 |
targetConnectionIds |
此 目標連線ID 在先前步驟中擷取。 |
transformations.params.mappingId |
此 對應ID 在先前步驟中擷取。 |
scheduleParams.startTime |
資料流的開始時間(以Epoch時間表示)。 |
scheduleParams.frequency |
資料流收集資料的頻率。 可接受的值包括: once , minute , hour , day ,或 week . |
scheduleParams.interval |
該間隔指定兩個連續流運行之間的週期。 間隔的值應為非零整數。 頻率設為時不需要間隔 once 且應大於或等於 15 的其他頻率值。 |
回應
成功的回應會傳回ID(id
)。
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
建立資料流後,您可以監視正在通過資料流進行內嵌的資料,以查看有關流運行、完成狀態和錯誤的資訊。 有關如何監視資料流的詳細資訊,請參閱 監控API中的資料流
依照本教學課程,您已建立來源連接器,可依排程從雲端儲存空間收集資料。 下游Platform服務(例如 Real-Time Customer Profile 和 Data Science Workspace. 如需詳細資訊,請參閱下列檔案:
下節列出了不同的雲儲存源連接器及其連接規範。
連接器名稱 | 連接規格 |
---|---|
Amazon S3 (S3) | ecadc60c-7455-4d87-84dc-2a0e293d997b |
Amazon Kinesis (Kinesis) | 86043421-563b-46ec-8e6c-e23184711bf6 |
Azure Blob (Blob) | 4c10e202-c428-4796-9208-5f1f5732b1cf |
Azure Data Lake Storage Gen2 (ADLS第2代) | b3ba5556-48be-44b7-8b85-ff2b69b46dc4 |
Azure Event Hubs (事件中心) | bf9f5905-92b7-48bf-bf20-455bc6b60a4e |
Azure File Storage | be5ec48c-5b78-49d5-b8fa-7c89ec4569b8 |
Google Cloud Storage | 32e8f412-cdf7-464c-9885-78184cb113fd |
HDFS | 54e221aa-d342-4707-bcff-7a4bceef0001 |
Oracle Object Storage | c85f9425-fb21-426c-ad0b-405e9bd8a46c |
SFTP | bf367b0d-3d9b-4060-b67b-0d3d9bd06094 |