本教程介紹了從資料庫源檢索資料並使用 Flow Service API。
要建立資料流,必須已具有與資料庫源的有效基連接ID。 如果您沒有此ID,請查看 源概述 的子目錄。
本教程要求您對以下Adobe Experience Platform元件有一定的瞭解:
{TENANT_ID}
、「容器」的概念和發出請求所需的標頭(特別要注意「接受」標頭及其可能值)。有關如何成功調用平台API的資訊,請參見上的指南 平台API入門。
您可以通過向POST請求建立源連接 Flow Service API。 源連接由連接ID、源資料檔案的路徑和連接規範ID組成。
要建立源連接,還必須為資料格式屬性定義枚舉值。
對基於檔案的連接器使用以下枚舉值:
資料格式 | 枚舉值 |
---|---|
分隔 | delimited |
JSON | json |
鑲木 | parquet |
對於所有基於表的連接器,將值設定為 tabular
。
API格式
POST /sourceConnections
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database source connection",
"baseConnectionId": "6990abad-977d-41b9-a85d-17ea8cf1c0e4",
"description": "Database source connection",
"data": {
"format": "tabular"
},
"params": {
"tableName": "test1.Mytable",
"columns": [
{
"name": "TestID",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Name",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Datefield",
"type": "string",
"meta:xdmType": "date-time",
"xdm": {
"type": "string",
"format": "date-time"
}
}
]
},
"connectionSpec": {
"id": "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
"version": "1.0"
}
}'
屬性 | 說明 |
---|---|
baseConnectionId |
資料庫源的連接ID。 |
params.path |
源檔案的路徑。 |
connectionSpec.id |
資料庫源的連接規範ID。 查看 附錄 的子目錄。 |
回應
成功的響應返回唯一標識符(id
)。 在後續步驟中建立目標連接時需要此ID。
{
"id": "b7581b59-c603-4df1-a689-d23d7ac440f3",
"etag": "\"ef05d265-0000-0200-0000-6019e0080000\""
}
為了在平台中使用源資料,必須建立目標架構以根據您的需要來構造源資料。 然後使用目標模式建立包含源資料的平台資料集。
通過執行對目標XDM的POST請求,可以建立目標XDM模式 架構註冊表API。
有關如何建立目標XDM架構的詳細步驟,請參見上的教程 使用API建立架構。
通過對目標資料集執行POST請求,可以建立目標資料集 目錄服務API,提供負載內目標架構的ID。
有關如何建立目標資料集的詳細步驟,請參見上的教程 使用API建立資料集。
目標連接表示到所接收資料所在目的地的連接。 要建立目標連接,必須提供與資料湖關聯的固定連接規範ID。 此連接規範ID為: c604ff05-7f1a-43c0-8e18-33bf874cb11c
。
您現在將唯一標識符作為目標模式、目標資料集和到資料湖的連接規範ID。 使用 Flow Service API,您可以通過指定這些標識符以及包含入站源資料的資料集來建立目標連接。
API格式
POST /targetConnections
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database target connection",
"description": "Database target connection",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "6019e0e7c5dcf718db5ebc71"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
屬性 | 說明 |
---|---|
data.schema.id |
的 $id 目標XDM架構。 |
data.schema.version |
架構的版本。 必須設定此值 application/vnd.adobe.xed-full+json;version=1 ,返回架構的最新次版本。 |
params.dataSetId |
在上一步中收集的目標資料集的ID。 |
connectionSpec.id |
用於連接到資料湖的連接規範ID。 此ID為: c604ff05-7f1a-43c0-8e18-33bf874cb11c 。 |
回應
成功的響應返回新目標連接的唯一標識符(id
)。 在後續步驟中建立資料流時需要此值。
{
"id": "320f119a-5ac1-4ab1-88ea-eb19e674ea2e",
"etag": "\"c0038936-0000-0200-0000-6019e1190000\""
}
為了將源資料攝取到目標資料集中,必須首先將其映射到目標資料集所遵循的目標模式。
要建立映射集,請向 mappingSets
端點 Data Prep API 提供目標XDM架構時 $id
以及要建立的映射集的詳細資訊。
API格式
POST /mappingSets
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "TestID",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.fullName",
"sourceAttribute": "Name",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.birthDate",
"sourceAttribute": "Datefield",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
屬性 | 說明 |
---|---|
xdmSchema |
的 $id 目標XDM架構。 |
回應
成功的響應返回新建立的映射的詳細資訊,包括其唯一標識符(id
)。 在後續步驟中建立資料流時需要此ID。
{
"id": "0b090130b58b4819afc78b6dc98b484d",
"version": 0,
"createdDate": 1612309018666,
"modifiedDate": 1612309018666,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
資料流負責從源收集資料並將其引入平台。 要建立資料流,必須首先通過向 Flow Service API。 資料流規範負責從外部資料庫或NoSQL系統收集資料。
API格式
GET /flowSpecs?property=name=="CRMToAEP"
要求
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
回應
成功的響應將返回負責將資料從源引入平台的資料流規範的詳細資訊。 響應包括唯一流規範 id
建立新資料流所需。
{
"items": [
{
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"name": "CRMToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"sourceConnectionSpecIds": [
"3416976c-a9ca-4bba-901a-1f08f66978ff",
"38ad80fe-8b06-4938-94f4-d4ee80266b07",
"d771e9c1-4f26-40dc-8617-ce58c4b53702",
"3c9b37f8-13a6-43d8-bad3-b863b941fedd",
"cc6a4487-9e91-433e-a3a3-9cf6626c1806",
"3000eb99-cd47-43f3-827c-43caf170f015",
"26d738e0-8963-47ea-aadf-c60de735468a",
"74a1c565-4e59-48d7-9d67-7c03b8a13137",
"cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
"4f63aa36-bd48-4e33-bb83-49fbcd11c708",
"cb66ab34-8619-49cb-96d1-39b37ede86ea",
"eb13cb25-47ab-407f-ba89-c0125281c563",
"1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
"37b6bf40-d318-4655-90be-5cd6f65d334b",
"a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
"221c7626-58f6-4eec-8ee2-042b0226f03b",
"a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
"6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
"aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
"8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
"ecde33f2-c56f-46cc-bdea-ad151c16cd69",
"102706fb-a5cd-42ee-afe0-bc42f017ff43",
"09182899-b429-40c9-a15a-bf3ddbc8ced7",
"0479cc14-7651-4354-b233-7480606c2ac3",
"d6b52d86-f0f8-475f-89d4-ce54c8527328",
"a8f4d393-1a6b-43f3-931f-91a16ed857f4",
"1fe283f6-9bec-11ea-bb37-0242ac130002"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"transformationSpecs": [
{
"name": "Copy",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"deltaColumn": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"dateFormat": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"required": [
"deltaColumn"
]
}
},
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"attributes": {
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
}
}
]
}
收集資料的最後一步是建立資料流。 此時,您應準備以下必需值:
資料流負責從源調度和收集資料。 通過在請求負載中提供先前提到的值的同時執行POST請求,可以建立資料流。
要計畫攝取,必須首先將開始時間值設定為劃時代(秒)。 然後,必須將頻率值設定為以下五個選項之一: once
。 minute
。 hour
。 day
或 week
。 該間隔值指定兩個連續接收之間的期間,並且建立一次性接收不需要設定間隔。 對於所有其它頻率,間隔值必須設定為等於或大於 15
。
API格式
POST /flows
要求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database dataflow using BigQuery",
"description": "collecting test1.Mytable",
"flowSpec": {
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"version": "1.0"
},
"sourceConnectionIds": [
"b7581b59-c603-4df1-a689-d23d7ac440f3"
],
"targetConnectionIds": [
"320f119a-5ac1-4ab1-88ea-eb19e674ea2e"
],
"transformations": [
{
"name": "Copy",
"params": {
"deltaColumn": {
"name": "Datefield",
"dateFormat": "YYYY-MM-DD",
"timezone": "UTC"
}
}
},
{
"name": "Mapping",
"params": {
"mappingId": "0b090130b58b4819afc78b6dc98b484d",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1612310466",
"frequency":"minute",
"interval":"15",
"backfill": "true"
}
}'
屬性 | 說明 |
---|---|
flowSpec.id |
的 流規範ID 在上一步中檢索。 |
sourceConnectionIds |
的 源連接ID 在較早的步驟中檢索。 |
targetConnectionIds |
的 目標連接ID 在較早的步驟中檢索。 |
transformations.params.mappingId |
的 映射ID 在較早的步驟中檢索。 |
transformations.params.deltaColum |
用於區分新資料和現有資料的指定列。 增量資料將根據選定列的時間戳進行接收。 支援的日期格式 deltaColumn 是 yyyy-MM-dd HH:mm:ss 。 如果您使用的是Azure表儲存,則支援的格式 deltaColumn 是 yyyy-MM-ddTHH:mm:ssZ 。 |
transformations.params.mappingId |
與資料庫關聯的映射ID。 |
scheduleParams.startTime |
資料流在紀元時間中的開始時間。 |
scheduleParams.frequency |
資料流收集資料的頻率。 可接受值包括: once 。 minute 。 hour 。 day 或 week 。 |
scheduleParams.interval |
該間隔指定兩個連續流運行之間的期間。 間隔的值應為非零整數。 頻率設定為時不需要間隔 once 應大於或等於 15 其他頻率值。 |
回應
成功的響應返回ID(id
)。
{
"id": "2edc08ac-4df5-4fe6-936f-81a19ce92f5c",
"etag": "\"770029f8-0000-0200-0000-6019e7d40000\""
}
建立資料流後,您可以監視正在通過其接收的資料,以查看有關流運行、完成狀態和錯誤的資訊。 有關如何監視資料流的詳細資訊,請參見上的教程 監視API中的資料流
按照本教程,您建立了源連接器,以按計畫從資料庫收集資料。 現在,下游平台服務(如 Real-time Customer Profile 和 Data Science Workspace。 有關詳細資訊,請參閱以下文檔:
以下部分列出了不同的雲儲存源連接器及其連接規範。
連接器名稱 | 連接規範ID |
---|---|
Amazon Redshift | 3416976c-a9ca-4bba-901a-1f08f66978ff |
Apache Hive 上 Azure HDInsights | aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f |
Apache Spark 上 Azure HDInsights | 6a8d82bc-1caf-45d1-908d-cadabc9d63a6 |
Azure Data Explorer | 0479cc14-7651-4354-b233-7480606c2ac3 |
Azure Synapse Analytics | a49bcc7d-8038-43af-b1e4-5a7a089a7d79 |
Azure Table Storage | ecde33f2-c56f-46cc-bdea-ad151c16cd69 |
Couchbase | 1fe283f6-9bec-11ea-bb37-0242ac130002 |
Google BigQuery | 3c9b37f8-13a6-43d8-bad3-b863b941fedd |
Greenplum | 37b6bf40-d318-4655-90be-5cd6f65d334b |
IBM DB2 | 09182899-b429-40c9-a15a-bf3ddbc8ced7 |
MariaDB | 000eb99-cd47-43f3-827c-43caf170f015 |
Microsoft SQL Server | 1f372ff9-38a4-4492-96f5-b9a4e4bd00ec |
MySQL | 26d738e0-8963-47ea-aadf-c60de735468a |
Oracle | d6b52d86-f0f8-475f-89d4-ce54c8527328 |
Phoenix | 102706fb-a5cd-42ee-afe0-bc42f017ff43 |
PostgreSQL | 74a1c565-4e59-48d7-9d67-7c03b8a13137 |