使用Flow Service API建立資料庫來源的資料流
本教學課程涵蓋從資料庫來源擷取資料,以及使用Flow Service API將資料帶到Platform的步驟。
- 為了建立資料流,您必須擁有包含資料庫來源的有效基底連線ID。 如果您沒有此ID,請參閱來源概觀,以取得可建立基礎連線的資料庫來源清單。
- 為了讓Experience Platform擷取資料,所有以表格為基礎的批次來源的時區都必須設定為UTC。 Snowflake 來源唯一支援的時間戳記是UTC時間的TIMESTAMP_NTZ。
快速入門
本教學課程需要您實際瞭解下列Adobe Experience Platform元件:
-
Experience Data Model (XDM) System:Experience Platform用來組織客戶體驗資料的標準化架構。
- 結構描述組合的基本概念:瞭解XDM結構描述的基本建置區塊,包括結構描述組合中的關鍵原則和最佳實務。
- Schema Registry開發人員指南:包含您成功執行Schema Registry API呼叫所需瞭解的重要資訊。 這包括您的
{TENANT_ID}
、「容器」的概念,以及發出要求所需的標頭(特別注意Accept標頭及其可能的值)。
-
Catalog Service:目錄是Experience Platform中資料位置和歷程的記錄系統。
-
Batch ingestion:批次擷取API可讓您將資料以批次檔案的形式擷取到Experience Platform中。
-
沙箱:Experience Platform提供的虛擬沙箱可將單一Platform執行個體分割成個別的虛擬環境,以利開發及改進數位體驗應用程式。
使用平台API
如需如何成功呼叫Platform API的詳細資訊,請參閱Platform API快速入門的指南。
建立來源連線 source
您可以向Flow Service API發出POST要求,以建立來源連線。 來源連線由連線ID、來源資料檔案的路徑以及連線規格ID組成。
若要建立來源連線,您也必須定義資料格式屬性的列舉值。
對檔案型聯結器使用下列列舉值:
delimited
json
parquet
針對所有資料表式聯結器,將值設定為tabular
。
API格式
POST /sourceConnections
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database source connection",
"baseConnectionId": "6990abad-977d-41b9-a85d-17ea8cf1c0e4",
"description": "Database source connection",
"data": {
"format": "tabular"
},
"params": {
"tableName": "test1.Mytable",
"columns": [
{
"name": "TestID",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Name",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Datefield",
"type": "string",
"meta:xdmType": "date-time",
"xdm": {
"type": "string",
"format": "date-time"
}
}
]
},
"connectionSpec": {
"id": "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
"version": "1.0"
}
}'
baseConnectionId
params.path
connectionSpec.id
回應
成功的回應會傳回新建立的來源連線的唯一識別碼(id
)。 在後續步驟中,建立目標連線時需要此ID。
{
"id": "b7581b59-c603-4df1-a689-d23d7ac440f3",
"etag": "\"ef05d265-0000-0200-0000-6019e0080000\""
}
建立目標XDM結構描述 target-schema
為了在Platform中使用來源資料,必須建立目標結構描述,以根據您的需求來建構來源資料。 然後目標結構描述會用來建立包含來源資料的Platform資料集。
可透過對結構描述登入API執行POST要求來建立目標XDM結構描述。
如需有關如何建立目標XDM結構描述的詳細步驟,請參閱有關使用API 建立結構描述的教學課程。
建立目標資料集 target-dataset
可以透過對目錄服務API執行POST要求,在承載中提供目標結構描述的ID來建立目標資料集。
如需有關如何建立目標資料集的詳細步驟,請參閱有關使用API建立資料集的教學課程。
建立目標連線 target-connection
目標連線代表與擷取資料著陸目的地之間的連線。 若要建立目標連線,您必須提供與資料湖相關聯的固定連線規格ID。 此連線規格識別碼為: c604ff05-7f1a-43c0-8e18-33bf874cb11c
。
您現在具有目標結構描述、目標資料集和到資料湖的連線規格ID的唯一識別碼。 使用Flow Service API,您可以指定這些識別碼以及將包含傳入來源資料的資料集,以建立目標連線。
API格式
POST /targetConnections
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database target connection",
"description": "Database target connection",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "6019e0e7c5dcf718db5ebc71"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
。data.schema.version
application/vnd.adobe.xed-full+json;version=1
,這會傳回結構描述的最新次要版本。params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
。回應
成功的回應會傳回新目標連線的唯一識別碼(id
)。 在後續步驟中需要此值,才能建立資料流。
{
"id": "320f119a-5ac1-4ab1-88ea-eb19e674ea2e",
"etag": "\"c0038936-0000-0200-0000-6019e1190000\""
}
建立對應 mapping
為了將來源資料擷取到目標資料集中,必須首先將其對應到目標資料集所堅持的目標結構描述。
若要建立對應集,請在提供您的目標XDM結構描述$id
和您要建立的對應集詳細資料時,向Data Prep API的mappingSets
端點提出POST要求。
API格式
POST /mappingSets
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "TestID",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.fullName",
"sourceAttribute": "Name",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.birthDate",
"sourceAttribute": "Datefield",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
xdmSchema
$id
。回應
成功的回應會傳回新建立的對應詳細資料,包括其唯一識別碼(id
)。 在後續步驟中需要此ID才能建立資料流。
{
"id": "0b090130b58b4819afc78b6dc98b484d",
"version": 0,
"createdDate": 1612309018666,
"modifiedDate": 1612309018666,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
擷取資料流規格 specs
資料流負責從來源收集資料,並將這些資料匯入Platform。 若要建立資料流,您必須先對Flow Service API執行GET要求,以取得資料流規格。 資料流規格負責從外部資料庫或NoSQL系統收集資料。
API格式
GET /flowSpecs?property=name=="CRMToAEP"
要求
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
回應
成功的回應會傳回負責將資料從來源帶入Platform的資料流規格的詳細資料。 回應包含建立新資料流所需的唯一流程規格id
。
code language-json |
---|
|
建立資料流
收集資料的最後一步是建立資料流。 此時,您應該已準備下列必要值:
資料流負責從來源排程及收集資料。 您可以執行POST要求,同時在要求裝載中提供先前提到的值,藉此建立資料流。
若要排程內嵌,您必須先將開始時間值設為以秒為單位的epoch時間。 然後,您必須將頻率值設定為下列五個選項之一: once
、minute
、hour
、day
或week
。 間隔值會指定兩個連續擷取之間的期間,而建立一次性擷取不需要設定間隔。 對於所有其他頻率,間隔值必須設定為等於或大於15
。
API格式
POST /flows
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database dataflow using BigQuery",
"description": "collecting test1.Mytable",
"flowSpec": {
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"version": "1.0"
},
"sourceConnectionIds": [
"b7581b59-c603-4df1-a689-d23d7ac440f3"
],
"targetConnectionIds": [
"320f119a-5ac1-4ab1-88ea-eb19e674ea2e"
],
"transformations": [
{
"name": "Copy",
"params": {
"deltaColumn": {
"name": "Datefield",
"dateFormat": "YYYY-MM-DD",
"timezone": "UTC"
}
}
},
{
"name": "Mapping",
"params": {
"mappingId": "0b090130b58b4819afc78b6dc98b484d",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1612310466",
"frequency":"minute",
"interval":"15",
"backfill": "true"
}
}'
+++
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
transformations.params.deltaColum
deltaColumn
支援的日期格式為yyyy-MM-dd HH:mm:ss
。 如果您使用Azure資料表儲存體,deltaColumn
支援的格式為yyyy-MM-ddTHH:mm:ssZ
。transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
once
、minute
、hour
、day
或week
。scheduleParams.interval
間隔會指定兩個連續資料流執行之間的期間。 間隔的值應為非零整數。 每個頻率的最小接受間隔值如下:
- 一次:不適用
- 分鐘: 15
- 小時: 1
- 天: 1
- 周: 1
回應
成功的回應會傳回新建立的資料流識別碼(id
)。
{
"id": "2edc08ac-4df5-4fe6-936f-81a19ce92f5c",
"etag": "\"770029f8-0000-0200-0000-6019e7d40000\""
}
監視資料流
建立資料流後,您可以監視透過該資料流擷取的資料,以檢視有關資料流執行、完成狀態和錯誤的資訊。 如需如何監視資料流程的詳細資訊,請參閱有關API中監視資料流程的教學課程
後續步驟
依照本教學課程中的指示,您已建立來源聯結器,以根據排程從資料庫收集資料。 下游Platform服務(例如Real-Time Customer Profile和Data Science Workspace)現在可以使用傳入的資料。 如需更多詳細資訊,請參閱下列檔案:
附錄
下節列出不同的雲端儲存空間來源聯結器及其連線規格。
連線規格
3416976c-a9ca-4bba-901a-1f08f66978ff
aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f
6a8d82bc-1caf-45d1-908d-cadabc9d63a6
0479cc14-7651-4354-b233-7480606c2ac3
a49bcc7d-8038-43af-b1e4-5a7a089a7d79
ecde33f2-c56f-46cc-bdea-ad151c16cd69
1fe283f6-9bec-11ea-bb37-0242ac130002
3c9b37f8-13a6-43d8-bad3-b863b941fedd
37b6bf40-d318-4655-90be-5cd6f65d334b
09182899-b429-40c9-a15a-bf3ddbc8ced7
000eb99-cd47-43f3-827c-43caf170f015
1f372ff9-38a4-4492-96f5-b9a4e4bd00ec
26d738e0-8963-47ea-aadf-c60de735468a
d6b52d86-f0f8-475f-89d4-ce54c8527328
102706fb-a5cd-42ee-afe0-bc42f017ff43
74a1c565-4e59-48d7-9d67-7c03b8a13137