使用Flow Service API建立資料庫來源的資料流
建立對象:
- 開發人員
本教學課程涵蓋從資料庫來源擷取資料,以及使用Flow Service API將這些資料帶到Experience Platform的步驟。
- 為了建立資料流,您必須擁有包含資料庫來源的有效基底連線ID。 如果您沒有此ID,請參閱來源概觀,以取得可建立基礎連線的資料庫來源清單。
- 為了讓Experience Platform擷取資料,所有表格型批次來源的時區都必須設定為UTC。 Snowflake 來源唯一支援的時間戳記是UTC時間的TIMESTAMP_NTZ。
快速入門
本教學課程需要您實際瞭解下列Adobe Experience Platform元件:
-
Experience Data Model (XDM) System: Experience Platform組織客戶體驗資料的標準化架構。
- 結構描述組合的基本概念:瞭解XDM結構描述的基本建置區塊,包括結構描述組合中的關鍵原則和最佳實務。
- Schema Registry開發人員指南:包含您成功執行Schema Registry API呼叫所需瞭解的重要資訊。 這包括您的
{TENANT_ID}
、「容器」的概念,以及發出要求所需的標頭(特別注意Accept標頭及其可能的值)。
-
Catalog Service:目錄是Experience Platform中資料位置和歷程的記錄系統。
-
Batch ingestion:批次擷取API可讓您將資料以批次檔案的形式擷取到Experience Platform。
-
沙箱: Experience Platform提供的虛擬沙箱可將單一Experience Platform執行個體分割成個別的虛擬環境,以利開發及改進數位體驗應用程式。
使用Experience Platform API
如需如何成功呼叫Experience Platform API的詳細資訊,請參閱Experience Platform API快速入門指南。
建立來源連線
您可以對Flow Service API發出POST要求,以建立來源連線。 來源連線由連線ID、來源資料檔案的路徑以及連線規格ID組成。
若要建立來源連線,您也必須定義資料格式屬性的列舉值。
對檔案型聯結器使用下列列舉值:
delimited
json
parquet
針對所有資料表式聯結器,將值設定為tabular
。
API格式
POST /sourceConnections
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database source connection",
"baseConnectionId": "6990abad-977d-41b9-a85d-17ea8cf1c0e4",
"description": "Database source connection",
"data": {
"format": "tabular"
},
"params": {
"tableName": "test1.Mytable",
"columns": [
{
"name": "TestID",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Name",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Datefield",
"type": "string",
"meta:xdmType": "date-time",
"xdm": {
"type": "string",
"format": "date-time"
}
}
]
},
"connectionSpec": {
"id": "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
"version": "1.0"
}
}'
baseConnectionId
params.path
connectionSpec.id
回應
成功的回應會傳回新建立的來源連線的唯一識別碼(id
)。 在後續步驟中,建立目標連線時需要此ID。
{
"id": "b7581b59-c603-4df1-a689-d23d7ac440f3",
"etag": "\"ef05d265-0000-0200-0000-6019e0080000\""
}
建立目標XDM結構描述
為了在Experience Platform中使用來源資料,必須建立目標結構描述,以根據您的需求建構來源資料。 然後使用目標結構描述來建立包含來源資料的Experience Platform資料集。
可透過對結構描述登入API執行POST要求來建立目標XDM結構描述。
如需有關如何建立目標XDM結構描述的詳細步驟,請參閱有關使用API 建立結構描述的教學課程。
建立目標資料集
可透過對目錄服務API執行POST要求,在承載中提供目標結構描述的ID,來建立目標資料集。
如需有關如何建立目標資料集的詳細步驟,請參閱有關使用API建立資料集的教學課程。
建立目標連線
目標連線代表與擷取資料著陸目的地之間的連線。 若要建立目標連線,您必須提供與資料湖相關聯的固定連線規格ID。 此連線規格識別碼為: c604ff05-7f1a-43c0-8e18-33bf874cb11c
。
您現在具有目標結構描述、目標資料集和到資料湖的連線規格ID的唯一識別碼。 使用Flow Service API,您可以指定這些識別碼以及將包含傳入來源資料的資料集,以建立目標連線。
API格式
POST /targetConnections
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database target connection",
"description": "Database target connection",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "6019e0e7c5dcf718db5ebc71"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
。data.schema.version
application/vnd.adobe.xed-full+json;version=1
,這會傳回結構描述的最新次要版本。params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
。回應
成功的回應會傳回新目標連線的唯一識別碼(id
)。 在後續步驟中需要此值,才能建立資料流。
{
"id": "320f119a-5ac1-4ab1-88ea-eb19e674ea2e",
"etag": "\"c0038936-0000-0200-0000-6019e1190000\""
}
建立對應
為了將來源資料擷取到目標資料集中,必須首先將其對應到目標資料集所堅持的目標結構描述。
若要建立對應集,請在提供您的目標XDM結構描述$id
和您要建立的對應集詳細資料時,對Data Prep API的mappingSets
端點提出POST要求。
API格式
POST /mappingSets
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "TestID",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.fullName",
"sourceAttribute": "Name",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.birthDate",
"sourceAttribute": "Datefield",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
xdmSchema
$id
。回應
成功的回應會傳回新建立的對應詳細資料,包括其唯一識別碼(id
)。 在後續步驟中需要此ID才能建立資料流。
{
"id": "0b090130b58b4819afc78b6dc98b484d",
"version": 0,
"createdDate": 1612309018666,
"modifiedDate": 1612309018666,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
擷取資料流規格
資料流負責從來源收集資料,並將這些資料匯入Experience Platform。 若要建立資料流,您必須先對Flow Service API執行GET要求,以取得資料流規格。 資料流規格負責從外部資料庫或NoSQL系統收集資料。
API格式
GET /flowSpecs?property=name=="CRMToAEP"
要求
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
回應
成功的回應會傳回負責將資料從來源帶入Experience Platform的資料流規格的詳細資料。 回應包含建立新資料流所需的唯一流程規格id
。
{
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"name": "CRMToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"attributes": {
"isSourceFlow": true,
"flacValidationSupported": true,
"frequency": "batch",
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"sourceConnectionSpecIds": [
"3416976c-a9ca-4bba-901a-1f08f66978ff",
"38ad80fe-8b06-4938-94f4-d4ee80266b07",
"d771e9c1-4f26-40dc-8617-ce58c4b53702",
"3c9b37f8-13a6-43d8-bad3-b863b941fedd",
"cc6a4487-9e91-433e-a3a3-9cf6626c1806",
"3000eb99-cd47-43f3-827c-43caf170f015",
"26d738e0-8963-47ea-aadf-c60de735468a",
"74a1c565-4e59-48d7-9d67-7c03b8a13137",
"cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
"4f63aa36-bd48-4e33-bb83-49fbcd11c708",
"cb66ab34-8619-49cb-96d1-39b37ede86ea",
"eb13cb25-47ab-407f-ba89-c0125281c563",
"1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
"37b6bf40-d318-4655-90be-5cd6f65d334b",
"a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
"221c7626-58f6-4eec-8ee2-042b0226f03b",
"a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
"6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
"aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
"8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
"ecde33f2-c56f-46cc-bdea-ad151c16cd69",
"102706fb-a5cd-42ee-afe0-bc42f017ff43",
"09182899-b429-40c9-a15a-bf3ddbc8ced7",
"0479cc14-7651-4354-b233-7480606c2ac3",
"d6b52d86-f0f8-475f-89d4-ce54c8527328",
"a8f4d393-1a6b-43f3-931f-91a16ed857f4",
"1fe283f6-9bec-11ea-bb37-0242ac130002",
"fcad62f3-09b0-41d3-be11-449d5a621b69",
"ea1c2a08-b722-11eb-8529-0242ac130003",
"35d6c4d8-c9a9-11eb-b8bc-0242ac130003",
"ff4274f2-c9a9-11eb-b8bc-0242ac130003",
"ba5126ec-c9ac-11eb-b8bc-0242ac130003",
"b2e08744-4f1a-40ce-af30-7abac3e23cf3",
"929e4450-0237-4ed2-9404-b7e1e0a00309",
"2acf109f-9b66-4d5e-bc18-ebb2adcff8d5",
"2fa8af9c-2d1a-43ea-a253-f00a00c74412"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
},
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"transformationSpec": [
{
"name": "Copy",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"deltaColumn": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"dateFormat": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"required": [
"deltaColumn"
]
}
},
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"runSpec": {
"name": "ProviderParams",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for creating flow run.",
"properties": {
"startTime": {
"type": "integer",
"description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
},
"windowStartTime": {
"type": "integer",
"description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"windowEndTime": {
"type": "integer",
"description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"deltaColumn": {
"type": "object",
"description": "The delta column is required to partition the data and separate newly ingested data from historic data.",
"properties": {
"name": {
"type": "string"
},
"dateFormat": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"required": [
"startTime",
"windowStartTime",
"windowEndTime",
"deltaColumn"
]
}
}
}
建立資料流
收集資料的最後一步是建立資料流。 此時,您應該已準備下列必要值:
資料流負責從來源排程及收集資料。 您可以執行POST要求來建立資料流,同時在要求裝載中提供先前提到的值。
若要排程內嵌,您必須先將開始時間值設為以秒為單位的epoch時間。 然後,您必須將頻率值設定為下列五個選項之一: once
、minute
、hour
、day
或week
。 間隔值會指定兩個連續擷取之間的期間,而建立一次性擷取不需要設定間隔。 對於所有其他頻率,間隔值必須設定為等於或大於15
。
API格式
POST /flows
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database dataflow using BigQuery",
"description": "collecting test1.Mytable",
"flowSpec": {
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"version": "1.0"
},
"sourceConnectionIds": [
"b7581b59-c603-4df1-a689-d23d7ac440f3"
],
"targetConnectionIds": [
"320f119a-5ac1-4ab1-88ea-eb19e674ea2e"
],
"transformations": [
{
"name": "Copy",
"params": {
"deltaColumn": {
"name": "Datefield",
"dateFormat": "YYYY-MM-DD",
"timezone": "UTC"
}
}
},
{
"name": "Mapping",
"params": {
"mappingId": "0b090130b58b4819afc78b6dc98b484d",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1612310466",
"frequency":"minute",
"interval":"15",
"backfill": "true"
}
}'
+++
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
transformations.params.deltaColum
deltaColumn
支援的日期格式為yyyy-MM-dd HH:mm:ss
。 如果您使用Azure資料表儲存體,deltaColumn
支援的格式為yyyy-MM-ddTHH:mm:ssZ
。transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
once
、minute
、hour
、day
或week
。scheduleParams.interval
間隔會指定兩個連續資料流執行之間的期間。 間隔的值應為非零整數。 每個頻率的最小接受間隔值如下:
- 一次:不適用
- 分鐘: 15
- 小時: 1
- 天: 1
- 周: 1
回應
成功的回應會傳回新建立的資料流識別碼(id
)。
{
"id": "2edc08ac-4df5-4fe6-936f-81a19ce92f5c",
"etag": "\"770029f8-0000-0200-0000-6019e7d40000\""
}
監視資料流
建立資料流後,您可以監視透過該資料流擷取的資料,以檢視有關資料流執行、完成狀態和錯誤的資訊。 如需如何監視資料流程的詳細資訊,請參閱有關API中監視資料流程的教學課程
後續步驟
依照本教學課程中的指示,您已建立來源聯結器,以根據排程從資料庫收集資料。 下游Experience Platform服務(例如Real-Time Customer Profile和Data Science Workspace)現在可以使用內送資料。 如需更多詳細資訊,請參閱下列檔案:
附錄
下節列出不同的雲端儲存空間來源聯結器及其連線規格。
連線規格
3416976c-a9ca-4bba-901a-1f08f66978ff
aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f
6a8d82bc-1caf-45d1-908d-cadabc9d63a6
0479cc14-7651-4354-b233-7480606c2ac3
a49bcc7d-8038-43af-b1e4-5a7a089a7d79
ecde33f2-c56f-46cc-bdea-ad151c16cd69
1fe283f6-9bec-11ea-bb37-0242ac130002
3c9b37f8-13a6-43d8-bad3-b863b941fedd
37b6bf40-d318-4655-90be-5cd6f65d334b
09182899-b429-40c9-a15a-bf3ddbc8ced7
000eb99-cd47-43f3-827c-43caf170f015
1f372ff9-38a4-4492-96f5-b9a4e4bd00ec
26d738e0-8963-47ea-aadf-c60de735468a
d6b52d86-f0f8-475f-89d4-ce54c8527328
102706fb-a5cd-42ee-afe0-bc42f017ff43
74a1c565-4e59-48d7-9d67-7c03b8a13137