使用Flow Service API為雲端儲存空間來源建立資料流
本教學課程涵蓋從雲端儲存空間來源擷取資料,以及使用Flow Service API將資料帶到Platform的步驟。
快速入門
本教學課程需要您實際瞭解下列Adobe Experience Platform元件:
-
Experience Data Model (XDM) System:Experience Platform用來組織客戶體驗資料的標準化架構。
- 結構描述組合的基本概念:瞭解XDM結構描述的基本建置區塊,包括結構描述組合中的關鍵原則和最佳實務。
- Schema Registry開發人員指南:包含您成功執行Schema Registry API呼叫所需瞭解的重要資訊。 這包括您的
{TENANT_ID}
、「容器」的概念,以及發出要求所需的標頭(特別注意Accept標頭及其可能的值)。
-
Catalog Service:目錄是Experience Platform中資料位置和歷程的記錄系統。
-
Batch ingestion:批次擷取API可讓您將資料以批次檔案的形式擷取到Experience Platform中。
-
沙箱:Experience Platform提供的虛擬沙箱可將單一Platform執行個體分割成個別的虛擬環境,以利開發及改進數位體驗應用程式。
使用平台API
如需如何成功呼叫Platform API的詳細資訊,請參閱Platform API快速入門的指南。
建立來源連線 source
您可以透過向Flow Service API的sourceConnections
端點發出POST要求來建立來源連線,同時提供您的基本連線ID、您要擷取的來源檔案的路徑,以及來源對應的連線規格ID。
建立來源連線時,您還必須定義資料格式屬性的列舉值。
對檔案型來源使用下列列舉值:
delimited
json
parquet
對於所有以資料表為基礎的來源,將值設定為tabular
。
API格式
POST /sourceConnections
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}",
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
baseConnectionId
data.format
delimited
、JSON
和parquet
。data.properties
data.properties.columnDelimiter
,
)作為預設值。 注意: columnDelimiter
屬性只能在擷取分隔檔案時使用。data.properties.encoding
UTF-8
和ISO-8859-1
。 注意: encoding
引數僅在擷取分隔的CSV檔案時可用。 將使用預設編碼UTF-8
擷取其他檔案型別。data.properties.compressionType
bzip2
、gzip
、deflate
、zipDeflate
、tarGzip
和tar
。 注意: compressionType
屬性只能在擷取分隔檔案或JSON檔案時使用。params.path
/acme/summerCampaign/*.csv
將會內嵌整個/acme/summerCampaign/
資料夾。params.type
file
來擷取個別檔案,並使用型別folder
來擷取整個資料夾。connectionSpec.id
回應
成功的回應會傳回新建立的來源連線的唯一識別碼(id
)。 在後續步驟中需要此ID才能建立資料流。
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
使用規則運算式來選取要擷取的特定檔案集 regex
建立來源連線時,您可以使用規則運算式將特定一組檔案從來源擷取到Platform。
API格式
POST /sourceConnections
要求
在以下範例中,檔案路徑中使用規則運算式,以指定擷取名稱中有premium
的所有CSV檔案。
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/*premium*.csv",
"type": "folder"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
設定來源連線以遞回方式擷取資料
建立來源連線時,您可以使用recursive
引數從深層巢狀資料夾擷取資料。
API格式
POST /sourceConnections
要求
在下列範例中,recursive: true
引數會通知Flow Service在擷取程式期間以遞回方式讀取所有子資料夾。
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source with recursive ingestion",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/customers/premium/buyers/recursive",
"type": "folder",
"recursive": true
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
建立目標XDM結構描述 target-schema
為了在Platform中使用來源資料,必須建立目標結構描述,以根據您的需求來建構來源資料。 然後目標結構描述會用來建立包含來源資料的Platform資料集。
可透過對結構描述登入API執行POST要求來建立目標XDM結構描述。
如需有關如何建立目標XDM結構描述的詳細步驟,請參閱有關使用API 建立結構描述的教學課程。
建立目標資料集 target-dataset
可以透過對目錄服務API執行POST要求,在承載中提供目標結構描述的ID來建立目標資料集。
如需有關如何建立目標資料集的詳細步驟,請參閱有關使用API建立資料集的教學課程。
建立目標連線 target-connection
目標連線代表與擷取資料著陸目的地之間的連線。 若要建立目標連線,您必須提供與Data Lake相關聯的固定連線規格ID。 此連線規格識別碼為: c604ff05-7f1a-43c0-8e18-33bf874cb11c
。
您現在具有目標結構描述、目標資料集和到資料湖的連線規格ID的唯一識別碼。 使用這些識別碼,您可以使用Flow Service API建立目標連線,以指定將包含傳入來源資料的資料集。
API格式
POST /targetConnections
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
。data.schema.version
application/vnd.adobe.xed-full+json;version=1
,這會傳回結構描述的最新次要版本。params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
。回應
成功的回應會傳回新目標連線的唯一識別碼(id
)。 此ID在後續步驟中是必要的。
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
建立對應 mapping
為了將來源資料擷取到目標資料集中,必須首先將其對應到目標資料集所堅持的目標結構描述。
若要建立對應集,請在提供您的目標XDM結構描述$id
和您要建立的對應集詳細資料時,向Data Prep API的mappingSets
端點提出POST要求。
API格式
POST /conversion/mappingSets
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
xdmSchema
回應
成功的回應會傳回新建立的對應詳細資料,包括其唯一識別碼(id
)。 在後續步驟中需要此值,才能建立資料流。
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
擷取資料流規格 specs
資料流負責從來源收集資料,並將這些資料匯入Platform。 為了建立資料流,您必須先取得負責收集雲端儲存空間資料的資料流規格。
API格式
GET /flowSpecs?property=name=="CloudStorageToAEP"
要求
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
回應
成功的回應會傳回負責將資料從來源帶入Platform的資料流規格的詳細資料。 回應包含建立新資料流所需的唯一流程規格id
。
code language-json |
---|
|
建立資料流
收集雲端儲存空間資料的最後一步是建立資料流。 到現在為止,您已準備下列必要值:
資料流負責從來源排程及收集資料。 您可以執行POST要求,同時在裝載中提供先前提到的值,藉此建立資料流。
若要排程內嵌,您必須先將開始時間值設為以秒為單位的epoch時間。 然後,您必須將頻率值設定為下列五個選項之一: once
、minute
、hour
、day
或week
。 間隔值會指定兩個連續擷取之間的期間,而建立一次性擷取不需要設定間隔。 對於所有其他頻率,間隔值必須設定為等於或大於15
。
API格式
POST /flows
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Platform",
"description": "Cloud Storage flow to Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
once
、minute
、hour
、day
或week
。scheduleParams.interval
間隔會指定兩個連續資料流執行之間的期間。 間隔的值應為非零整數。 每個頻率的最小接受間隔值如下:
- 一次:不適用
- 分鐘: 15
- 小時: 1
- 天: 1
- 周: 1
回應
成功的回應會傳回新建立的資料流識別碼(id
)。
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
監視資料流
建立資料流後,您可以監視透過該資料流擷取的資料,以檢視有關資料流執行、完成狀態和錯誤的資訊。 如需如何監視資料流程的詳細資訊,請參閱有關API中監視資料流程的教學課程
後續步驟
依照本教學課程所述,您已建立來源聯結器,以排程方式從雲端儲存空間收集資料。 下游Platform服務(例如Real-Time Customer Profile和Data Science Workspace)現在可以使用傳入的資料。 如需更多詳細資訊,請參閱下列檔案:
附錄 appendix
下節列出不同的雲端儲存空間來源聯結器及其連線規格。
連線規格
ecadc60c-7455-4d87-84dc-2a0e293d997b
86043421-563b-46ec-8e6c-e23184711bf6
4c10e202-c428-4796-9208-5f1f5732b1cf
b3ba5556-48be-44b7-8b85-ff2b69b46dc4
bf9f5905-92b7-48bf-bf20-455bc6b60a4e
be5ec48c-5b78-49d5-b8fa-7c89ec4569b8
32e8f412-cdf7-464c-9885-78184cb113fd
54e221aa-d342-4707-bcff-7a4bceef0001
c85f9425-fb21-426c-ad0b-405e9bd8a46c
bf367b0d-3d9b-4060-b67b-0d3d9bd06094