Flow Service API を使用したデータベースソースのデータフローの作成
このチュートリアルでは、データベースソースからデータを取得し、Flow Service API を使用して Platform に取り込む手順を説明します。
- データフローを作成するには、データベースソースを持つ有効なベース接続 ID が必要となります。 この ID を持っていない場合は、を参照してください。 ソースの概要 を使用してベース接続を作成できるデータベースソースのリストについて説明します。
- Experience Platformでデータを取り込むには、すべてのテーブルベースのバッチソースのタイムゾーンを UTC に設定する必要があります。 でサポートされている唯一のタイムスタンプ Snowflake ソース は TIMESTAMP_NTZ (UTC 時間)。
はじめに
このチュートリアルは、Adobe Experience Platform の次のコンポーネントを実際に利用および理解しているユーザーを対象としています。
-
Experience Data Model (XDM) System:Experience Platform が顧客体験データを整理するための標準的なフレームワーク。
- スキーマ構成の基本:スキーマ構成の主要な原則やベストプラクティスなど、XDM スキーマの基本的な構成要素について説明します。
- スキーマレジストリ開発者ガイドには、Schema Registry API の呼び出しを正常に実行するために知っておくべき重要な情報が含まれています。これには、
{TENANT_ID}
、「コンテナ」の概念、リクエストを行うのに必要なヘッダー(Accept ヘッダーと使用可能な値には特に注意を払う)が含まれます。
-
Catalog Service:カタログは、 Experience Platform 内のデータの位置と系統を記録するシステムです。
-
Batch ingestion:Batch Ingestion API を使用すると、データをバッチファイルとして Experience Platform に取り込むことができます。
-
サンドボックス:Experience Platform は、単一の Platform インスタンスを個別の仮想環境に分割する仮想サンドボックスを提供し、デジタル体験アプリケーションの開発および進化を支援します。
Platform API の使用
Platform API を正常に呼び出す方法については詳しくは、Platform API の概要のガイドを参照してください。
ソース接続の作成 source
Flow Service API に対して POST リクエストを実行することで、ソース接続を作成することができます。ソース接続は、接続 ID、ソースデータファイルへのパス、接続仕様 ID から構成されます。
ソース接続を作成するには、データ形式属性の列挙値も定義する必要があります。
ファイルベースのコネクタには、次の列挙値を使用します。
delimited
json
parquet
すべてのテーブルベースのコネクタで、値をtabular
に設定します。
API 形式
POST /sourceConnections
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database source connection",
"baseConnectionId": "6990abad-977d-41b9-a85d-17ea8cf1c0e4",
"description": "Database source connection",
"data": {
"format": "tabular"
},
"params": {
"tableName": "test1.Mytable",
"columns": [
{
"name": "TestID",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Name",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Datefield",
"type": "string",
"meta:xdmType": "date-time",
"xdm": {
"type": "string",
"format": "date-time"
}
}
]
},
"connectionSpec": {
"id": "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
"version": "1.0"
}
}'
baseConnectionId
params.path
connectionSpec.id
応答
リクエストが成功した場合は、新しく作成したソース接続の一意の ID(id
)が返されます。この ID は、後の手順でターゲット接続を作成する際に必要になります。
{
"id": "b7581b59-c603-4df1-a689-d23d7ac440f3",
"etag": "\"ef05d265-0000-0200-0000-6019e0080000\""
}
ターゲット XDM スキーマの作成 target-schema
ソースデータを Platform で使用するには、必要に応じてターゲットスキーマを作成してソースデータを構造化する必要があります。 次に、ターゲットスキーマを使用して、ソースデータが含まれる Platform データセットを作成します。
Schema Registry API に POST リクエストを実行することで、ターゲット XDM スキーマを作成できます。
ターゲット XDM スキーマの作成手順について詳しくは、 API を使用したスキーマの作成に関するチュートリアルを参照してください。
ターゲットデータセットの作成 target-dataset
Catalog Service API に POST リクエストを実行し、その際にペイロード内でターゲットスキーマの ID を指定することで、ターゲットデータセットを作成できます。
ターゲットデータセットの作成手順について詳しくは、 API を使用したデータセットの作成に関するチュートリアルを参照してください。
ターゲット接続の作成 target-connection
ターゲット接続は、取り込まれたデータが取り込まれる宛先への接続を表します。 ターゲット接続を作成するには、データレイクに関連付けられた固定接続仕様 ID を指定する必要があります。 この接続仕様 ID は c604ff05-7f1a-43c0-8e18-33bf874cb11c
です。
これで、ターゲットスキーマとターゲットデータセットの一意の識別子、およびデータレイクに対する接続仕様 ID が得られました。 Flow Service API を使用すると、受信ソースデータを格納するデータセットと共にこれらの識別子を指定することで、ターゲット接続を作成できます。
API 形式
POST /targetConnections
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database target connection",
"description": "Database target connection",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "6019e0e7c5dcf718db5ebc71"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
。data.schema.version
application/vnd.adobe.xed-full+json;version=1
に設定する必要があります。params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
です。応答
応答が成功すると、新しいターゲット接続の一意の ID(id
)が返されます。この値は、後の手順でデータフローを作成する際に必要になります。
{
"id": "320f119a-5ac1-4ab1-88ea-eb19e674ea2e",
"etag": "\"c0038936-0000-0200-0000-6019e1190000\""
}
マッピングの作成 mapping
ソースデータをターゲットデータセットに取り込むには、まず、ターゲットデータセットが準拠するターゲットスキーマにマッピングする必要があります。
マッピングセットを作成するには、Data Prep API の mappingSets
エンドポイントに POST リクエストを実行し、その際にターゲット XDM スキーマ $id
および作成するマッピングセットの詳細を指定します。
API 形式
POST /mappingSets
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "TestID",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.fullName",
"sourceAttribute": "Name",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.birthDate",
"sourceAttribute": "Datefield",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
xdmSchema
$id
。応答
応答が成功すると、一意の ID(id
)など、新しく作成されたマッピングの詳細が返されます。この ID は、後の手順でデータフローを作成する際に必要になります。
{
"id": "0b090130b58b4819afc78b6dc98b484d",
"version": 0,
"createdDate": 1612309018666,
"modifiedDate": 1612309018666,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
データフロー仕様の取得 specs
データフローは、ソースからデータを収集し、Platform に取り込む役割を担っています。データフローを作成するにはまず、Flow Service API に対して GET リクエストを実行し、データフローの仕様を取得する必要があります。データフローの仕様は、外部データベースまたは NoSQL システムからデータを収集する役割を果たします。
API 形式
GET /flowSpecs?property=name=="CRMToAEP"
リクエスト
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
応答
応答が成功すると、ソースから Platform にデータを取り込む必要があるデータフローの仕様の詳細が返されます。応答には、新しいデータフローを作成するために必要な、一意のフロー仕様 id
が含まれます。
code language-json |
---|
|
データフローの作成
データ収集のための最後の手順は、データフローの作成です。 この時点で、次の必要な値が用意されているはずです。
データフローは、ソースからデータをスケジュールおよび収集する役割を果たします。POST リクエストを実行し、その際にリクエストペイロード内で前述の値を指定することで、データフローを作成できます。
取り込みをスケジュールするには、まず開始時刻の値をエポック時間(秒)に設定する必要があります。次に、頻度の値を次の 5 つのオプションのいずれかに設定する必要があります。once
、minute
、hour
、day
または week
。インターバルの値には、連続した 2 回の取り込みの間の期間を指定します。1 回限りの取り込みを作成する場合は、インターバルを設定する必要はありません。それ以外の頻度では、間隔の値を 15
以上に設定する必要があります。
API 形式
POST /flows
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database dataflow using BigQuery",
"description": "collecting test1.Mytable",
"flowSpec": {
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"version": "1.0"
},
"sourceConnectionIds": [
"b7581b59-c603-4df1-a689-d23d7ac440f3"
],
"targetConnectionIds": [
"320f119a-5ac1-4ab1-88ea-eb19e674ea2e"
],
"transformations": [
{
"name": "Copy",
"params": {
"deltaColumn": {
"name": "Datefield",
"dateFormat": "YYYY-MM-DD",
"timezone": "UTC"
}
}
},
{
"name": "Mapping",
"params": {
"mappingId": "0b090130b58b4819afc78b6dc98b484d",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1612310466",
"frequency":"minute",
"interval":"15",
"backfill": "true"
}
}'
+++
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
transformations.params.deltaColum
deltaColumn
でサポートされている日付形式は yyyy-MM-dd HH:mm:ss
です。Azure Table Storage を使用している場合、deltaColumn
でサポートされている形式は yyyy-MM-ddTHH:mm:ssZ
です。transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
once
、minute
、hour
、day
、week
です。scheduleParams.interval
once
に設定されている場合、間隔は必須ではありません。また、頻度は他の頻度の値に対して、15
よりも大きいか、等しい必要があります。応答
リクエストが成功した場合は、新しく作成したデータフローの ID(id
)が返されます。
{
"id": "2edc08ac-4df5-4fe6-936f-81a19ce92f5c",
"etag": "\"770029f8-0000-0200-0000-6019e7d40000\""
}
データフローの監視
データフローが作成されると、それを通して取り込まれるデータを監視し、フローの実行状況、完了状況、エラーなどの情報を確認することができます。データフローのモニタリング方法について詳しくは、API でのデータフローの監視のチュートリアルを参照してください。
次の手順
このチュートリアルでは、スケジュールに従ってデータベースからデータを収集するソースコネクタを作成しました。これで、Real-Time Customer Profile および Data Science Workspace などのダウンストリームの Platform サービスで受信データを使用できるようになりました。詳しくは、次のドキュメントを参照してください。
付録
次の節では、様々なクラウドストレージソースコネクタとその接続仕様を示します。
接続仕様
3416976c-a9ca-4bba-901a-1f08f66978ff
aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f
6a8d82bc-1caf-45d1-908d-cadabc9d63a6
0479cc14-7651-4354-b233-7480606c2ac3
a49bcc7d-8038-43af-b1e4-5a7a089a7d79
ecde33f2-c56f-46cc-bdea-ad151c16cd69
1fe283f6-9bec-11ea-bb37-0242ac130002
3c9b37f8-13a6-43d8-bad3-b863b941fedd
37b6bf40-d318-4655-90be-5cd6f65d334b
09182899-b429-40c9-a15a-bf3ddbc8ced7
000eb99-cd47-43f3-827c-43caf170f015
1f372ff9-38a4-4492-96f5-b9a4e4bd00ec
26d738e0-8963-47ea-aadf-c60de735468a
d6b52d86-f0f8-475f-89d4-ce54c8527328
102706fb-a5cd-42ee-afe0-bc42f017ff43
74a1c565-4e59-48d7-9d67-7c03b8a13137