Flow Service API を使用したデータベースソースのデータフローの作成

このチュートリアルでは、データベースソースからデータを取得し、Flow Service API を使用して Platform に取り込む手順を説明します。

メモ

データフローを作成するには、データベースソースと共に有効なベース接続 ID が必要です。 この ID がない場合、 ソースの概要 を参照してください。

はじめに

このチュートリアルは、Adobe Experience Platform の次のコンポーネントを実際に利用および理解しているユーザーを対象としています。

  • Experience Data Model (XDM) System:Experience Platform が顧客体験データを整理するための標準的なフレームワーク。
    • スキーマ構成の基本:スキーマ構成の主要な原則やベストプラクティスなど、XDM スキーマの基本的な構成要素について説明します。
    • スキーマレジストリ開発者ガイドには、Schema Registry API の呼び出しを正常に実行するために知っておくべき重要な情報が含まれています。これには、{TENANT_ID}、「コンテナ」の概念、リクエストを行うのに必要なヘッダー(Accept ヘッダーと使用可能な値には特に注意を払う)が含まれます。
  • Catalog Service:カタログは、 Experience Platform 内のデータの位置と系統を記録するシステムです。
  • Batch ingestion:Batch Ingestion API を使用すると、データをバッチファイルとして Experience Platform に取り込むことができます。
  • サンドボックス:Experience Platform は、単一の Platform インスタンスを個別の仮想環境に分割する仮想サンドボックスを提供し、デジタル体験アプリケーションの開発および進化を支援します。

Platform API の使用

Platform API を正常に呼び出す方法については詳しくは、Platform API の概要のガイドを参照してください。

ソース接続の作成

Flow Service API に対して POST リクエストを実行することで、ソース接続を作成することができます。ソース接続は、接続 ID、ソースデータファイルへのパス、接続仕様 ID から構成されます。

ソース接続を作成するには、データ形式属性の列挙値も定義する必要があります。

ファイルベースのコネクタには、次の列挙値を使用します。

データ形式 列挙値
区切り delimited
JSON json
PARQUET parquet

すべてのテーブルベースのコネクタで、値をtabularに設定します。

API 形式

POST /sourceConnections

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database source connection",
        "baseConnectionId": "6990abad-977d-41b9-a85d-17ea8cf1c0e4",
        "description": "Database source connection",
        "data": {
            "format": "tabular"
        },
        "params": {
            "tableName": "test1.Mytable",
            "columns": [
                {
                    "name": "TestID",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "Name",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "Datefield",
                    "type": "string",
                    "meta:xdmType": "date-time",
                    "xdm": {
                        "type": "string",
                        "format": "date-time"
                    }
                }
            ]
        },
        "connectionSpec": {
            "id": "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
            "version": "1.0"
        }
    }'
プロパティ 説明
baseConnectionId データベースソースの接続 ID。
params.path ソースファイルのパス。
connectionSpec.id データベースソースの接続仕様 ID。 データベース仕様 ID の一覧については、付録を参照してください。

応答

リクエストが成功した場合は、新しく作成したソース接続の一意の ID(id)が返されます。この ID は、後の手順でターゲット接続を作成する際に必要になります。

{
    "id": "b7581b59-c603-4df1-a689-d23d7ac440f3",
    "etag": "\"ef05d265-0000-0200-0000-6019e0080000\""
}

ターゲット XDM スキーマの作成

ソースデータを Platform で使用するには、必要に応じてターゲットスキーマを作成してソースデータを構造化する必要があります。 次に、ターゲットスキーマを使用して、ソースデータが含まれる Platform データセットを作成します。

Schema Registry API に POST リクエストを実行することで、ターゲット XDM スキーマを作成できます。

ターゲット XDM スキーマの作成手順について詳しくは、 API を使用したスキーマの作成に関するチュートリアルを参照してください。

ターゲットデータセットの作成

Catalog Service API に POST リクエストを実行し、その際にペイロード内でターゲットスキーマの ID を指定することで、ターゲットデータセットを作成できます。

ターゲットデータセットの作成手順について詳しくは、 API を使用したデータセットの作成に関するチュートリアルを参照してください。

ターゲット接続の作成

ターゲット接続は、取り込まれたデータが取り込まれる宛先への接続を表します。 ターゲット接続を作成するには、データレイクに関連付けられた固定接続仕様 ID を指定する必要があります。 この接続仕様 ID は c604ff05-7f1a-43c0-8e18-33bf874cb11c です。

これで、ターゲットスキーマとターゲットデータセットの一意の識別子、およびデータレイクに対する接続仕様 ID が得られました。 Flow Service API を使用すると、受信ソースデータを格納するデータセットと共にこれらの識別子を指定することで、ターゲット接続を作成できます。

API 形式

POST /targetConnections

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database target connection",
        "description": "Database target connection",
        "data": {
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "6019e0e7c5dcf718db5ebc71"
        },
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        }
    }'
プロパティ 説明
data.schema.id ターゲット XDM スキーマの $id
data.schema.version スキーマのバージョン番号。この値は application/vnd.adobe.xed-full+json;version=1 に設定する必要があります。この設定によりスキーマの最新のマイナーバージョンが返されます。
params.dataSetId 前の手順で収集したターゲットデータセットの ID。
connectionSpec.id Data Lake への接続に使用する接続仕様 ID。 この ID は c604ff05-7f1a-43c0-8e18-33bf874cb11c です。

応答

応答が成功すると、新しいターゲット接続の一意の ID(id)が返されます。この値は、後の手順でデータフローを作成する際に必要になります。

{
    "id": "320f119a-5ac1-4ab1-88ea-eb19e674ea2e",
    "etag": "\"c0038936-0000-0200-0000-6019e1190000\""
}

マッピングの作成

ソースデータをターゲットデータセットに取り込むには、まず、ターゲットデータセットが準拠するターゲットスキーマにマッピングする必要があります。

マッピングセットを作成するには、Data Prep APImappingSets エンドポイントに POST リクエストを実行し、その際にターゲット XDM スキーマ $id および作成するマッピングセットの詳細を指定します。

API 形式

POST /mappingSets

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
        "xdmVersion": "1.0",
        "id": null,
        "mappings": [
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "TestID",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.fullName",
                "sourceAttribute": "Name",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.birthDate",
                "sourceAttribute": "Datefield",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            }
        ]
    }'
プロパティ 説明
xdmSchema ターゲット XDM スキーマの $id

応答

応答が成功すると、一意の ID(id)など、新しく作成されたマッピングの詳細が返されます。この ID は、後の手順でデータフローを作成する際に必要になります。

{
    "id": "0b090130b58b4819afc78b6dc98b484d",
    "version": 0,
    "createdDate": 1612309018666,
    "modifiedDate": 1612309018666,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

データフロー仕様の取得

データフローは、ソースからデータを収集し、Platform に取り込む役割を担っています。データフローを作成するにはまず、Flow Service API に対して GET リクエストを実行し、データフローの仕様を取得する必要があります。データフローの仕様は、外部データベースまたは NoSQL システムからデータを収集する役割を果たします。

API 形式

GET /flowSpecs?property=name=="CRMToAEP"

リクエスト

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

応答

応答が成功すると、ソースから Platform にデータを取り込む必要があるデータフローの仕様の詳細が返されます。応答には、新しいデータフローを作成するために必要な、一意のフロー仕様 id が含まれます。

{
    "items": [
        {
            "id": "14518937-270c-4525-bdec-c2ba7cce3860",
            "name": "CRMToAEP",
            "providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
            "version": "1.0",
            "sourceConnectionSpecIds": [
                "3416976c-a9ca-4bba-901a-1f08f66978ff",
                "38ad80fe-8b06-4938-94f4-d4ee80266b07",
                "d771e9c1-4f26-40dc-8617-ce58c4b53702",
                "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
                "cc6a4487-9e91-433e-a3a3-9cf6626c1806",
                "3000eb99-cd47-43f3-827c-43caf170f015",
                "26d738e0-8963-47ea-aadf-c60de735468a",
                "74a1c565-4e59-48d7-9d67-7c03b8a13137",
                "cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
                "4f63aa36-bd48-4e33-bb83-49fbcd11c708",
                "cb66ab34-8619-49cb-96d1-39b37ede86ea",
                "eb13cb25-47ab-407f-ba89-c0125281c563",
                "1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
                "37b6bf40-d318-4655-90be-5cd6f65d334b",
                "a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
                "221c7626-58f6-4eec-8ee2-042b0226f03b",
                "a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
                "6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
                "aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
                "8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
                "ecde33f2-c56f-46cc-bdea-ad151c16cd69",
                "102706fb-a5cd-42ee-afe0-bc42f017ff43",
                "09182899-b429-40c9-a15a-bf3ddbc8ced7",
                "0479cc14-7651-4354-b233-7480606c2ac3",
                "d6b52d86-f0f8-475f-89d4-ce54c8527328",
                "a8f4d393-1a6b-43f3-931f-91a16ed857f4",
                "1fe283f6-9bec-11ea-bb37-0242ac130002"
            ],
            "targetConnectionSpecIds": [
                "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
            ],
            "optionSpec": {
                "name": "OptionSpec",
                "spec": {
                    "$schema": "http://json-schema.org/draft-07/schema#",
                    "type": "object",
                    "properties": {
                        "errorDiagnosticsEnabled": {
                            "title": "Error diagnostics.",
                            "description": "Flag to enable detailed and sample error diagnostics summary.",
                            "type": "boolean",
                            "default": false
                        },
                        "partialIngestionPercent": {
                            "title": "Partial ingestion threshold.",
                            "description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
                            "type": "number",
                            "exclusiveMinimum": 0
                        }
                    }
                }
            },
            "transformationSpecs": [
                {
                    "name": "Copy",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "properties": {
                            "deltaColumn": {
                                "type": "object",
                                "properties": {
                                    "name": {
                                        "type": "string"
                                    },
                                    "dateFormat": {
                                        "type": "string"
                                    },
                                    "timezone": {
                                        "type": "string"
                                    }
                                },
                                "required": [
                                    "name"
                                ]
                            }
                        },
                        "required": [
                            "deltaColumn"
                        ]
                    }
                },
                {
                    "name": "Mapping",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "description": "defines various params required for different mapping from source to target",
                        "properties": {
                            "mappingId": {
                                "type": "string"
                            },
                            "mappingVersion": {
                                "type": "string"
                            }
                        }
                    }
                }
            ],
            "scheduleSpec": {
                "name": "PeriodicSchedule",
                "type": "Periodic",
                "spec": {
                    "$schema": "http://json-schema.org/draft-07/schema#",
                    "type": "object",
                    "properties": {
                        "startTime": {
                            "description": "epoch time",
                            "type": "integer"
                        },
                        "frequency": {
                            "type": "string",
                            "enum": [
                                "once",
                                "minute",
                                "hour",
                                "day",
                                "week"
                            ]
                        },
                        "interval": {
                            "type": "integer"
                        },
                        "backfill": {
                            "type": "boolean",
                            "default": true
                        }
                    },
                    "required": [
                        "startTime",
                        "frequency"
                    ],
                    "if": {
                        "properties": {
                            "frequency": {
                                "const": "once"
                            }
                        }
                    },
                    "then": {
                        "allOf": [
                            {
                                "not": {
                                    "required": [
                                        "interval"
                                    ]
                                }
                            },
                            {
                                "not": {
                                    "required": [
                                        "backfill"
                                    ]
                                }
                            }
                        ]
                    },
                    "else": {
                        "required": [
                            "interval"
                        ],
                        "if": {
                            "properties": {
                                "frequency": {
                                    "const": "minute"
                                }
                            }
                        },
                        "then": {
                            "properties": {
                                "interval": {
                                    "minimum": 15
                                }
                            }
                        },
                        "else": {
                            "properties": {
                                "interval": {
                                    "minimum": 1
                                }
                            }
                        }
                    }
                }
            },
            "attributes": {
                "notification": {
                    "category": "sources",
                    "flowRun": {
                        "enabled": true
                    }
                }
            },
            "permissionsInfo": {
                "view": [
                    {
                        "@type": "lowLevel",
                        "name": "EnterpriseSource",
                        "permissions": [
                            "read"
                        ]
                    }
                ],
                "manage": [
                    {
                        "@type": "lowLevel",
                        "name": "EnterpriseSource",
                        "permissions": [
                            "write"
                        ]
                    }
                ]
            }
        }
    ]
}

データフローの作成

データ収集のための最後の手順は、データフローの作成です。 この時点で、次の必要な値が用意されているはずです。

データフローは、ソースからデータをスケジュールおよび収集する役割を果たします。POST リクエストを実行し、その際にリクエストペイロード内で前述の値を指定することで、データフローを作成できます。

取り込みをスケジュールするには、まず開始時刻の値をエポック時間(秒)に設定する必要があります。次に、頻度の値を次の 5 つのオプションのいずれかに設定する必要があります。onceminutehourday または week。インターバルの値には、連続した 2 回の取り込みの間の期間を指定します。1 回限りの取り込みを作成する場合は、インターバルを設定する必要はありません。それ以外の頻度では、間隔の値を 15 以上に設定する必要があります。

API 形式

POST /flows

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database dataflow using BigQuery",
        "description": "collecting test1.Mytable",
        "flowSpec": {
            "id": "14518937-270c-4525-bdec-c2ba7cce3860",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "b7581b59-c603-4df1-a689-d23d7ac440f3"
        ],
        "targetConnectionIds": [
            "320f119a-5ac1-4ab1-88ea-eb19e674ea2e"
        ],
        "transformations": [
            {
                "name": "Copy",
                "params": {
                    "deltaColumn": {
                        "name": "Datefield",
                        "dateFormat": "YYYY-MM-DD",
                        "timezone": "UTC"
                    }
                }
            },
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "0b090130b58b4819afc78b6dc98b484d",
                    "mappingVersion": 0
                }
            }
        ],
        "scheduleParams": {
            "startTime": "1612310466",
            "frequency":"minute",
            "interval":"15",
            "backfill": "true"
        }
    }'
プロパティ 説明
flowSpec.id 前の手順で取得したフロー仕様 ID
sourceConnectionIds 前の手順で取得したソース接続 ID
targetConnectionIds 前の手順で取得したターゲット接続 ID
transformations.params.mappingId 前の手順で取得したマッピング ID
transformations.params.deltaColum 新しいデータと既存のデータを区別するために使用する、指定の列。 増分データは、選択した列のタイムスタンプに基づいて取り込まれます。 deltaColumn でサポートされている日付形式は yyyy-MM-dd HH:mm:ss です。Azure Table Storage を使用している場合、deltaColumn でサポートされている形式は yyyy-MM-ddTHH:mm:ssZ です。
transformations.params.mappingId データベースに関連付けられたマッピング ID。
scheduleParams.startTime エポック時間で表した、データフローの開始時間。
scheduleParams.frequency データフローがデータを収集する頻度。指定できる値は、onceminutehourdayweek です。
scheduleParams.interval インターバルは 2 つの連続したフロー実行の間隔を指定します。インターバルの値はゼロ以外の整数にしてください。頻度が once に設定されている場合、間隔は必須ではありません。また、頻度は他の頻度の値に対して、15 よりも大きいか、等しい必要があります。

応答

リクエストが成功した場合は、新しく作成したデータフローの ID(id)が返されます。

{
    "id": "2edc08ac-4df5-4fe6-936f-81a19ce92f5c",
    "etag": "\"770029f8-0000-0200-0000-6019e7d40000\""
}

データフローの監視

データフローが作成されると、それを通して取り込まれるデータを監視し、フローの実行状況、完了状況、エラーなどの情報を確認することができます。データフローの監視方法について詳しくは、API でのデータフローの監視に関するチュートリアルを参照してください。

次の手順

このチュートリアルでは、スケジュールに従ってデータベースからデータを収集するソースコネクタを作成しました。これで、Real-time Customer Profile および Data Science Workspace などのダウンストリームの Platform サービスで受信データを使用できるようになりました。詳しくは、次のドキュメントを参照してください。

付録

次の節では、様々なクラウドストレージソースコネクタとその接続仕様を示します。

接続仕様

コネクタ名 接続仕様 ID
Amazon Redshift 3416976c-a9ca-4bba-901a-1f08f66978ff
Azure HDInsights 上の Apache Hive aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f
Azure HDInsights 上の Apache Spark 6a8d82bc-1caf-45d1-908d-cadabc9d63a6
Azure Data Explorer 0479cc14-7651-4354-b233-7480606c2ac3
Azure Synapse Analytics a49bcc7d-8038-43af-b1e4-5a7a089a7d79
Azure Table Storage ecde33f2-c56f-46cc-bdea-ad151c16cd69
Couchbase 1fe283f6-9bec-11ea-bb37-0242ac130002
Google BigQuery 3c9b37f8-13a6-43d8-bad3-b863b941fedd
Greenplum 37b6bf40-d318-4655-90be-5cd6f65d334b
IBM DB2 09182899-b429-40c9-a15a-bf3ddbc8ced7
MariaDB 000eb99-cd47-43f3-827c-43caf170f015
Microsoft SQL Server 1f372ff9-38a4-4492-96f5-b9a4e4bd00ec
MySQL 26d738e0-8963-47ea-aadf-c60de735468a
Oracle d6b52d86-f0f8-475f-89d4-ce54c8527328
Phoenix 102706fb-a5cd-42ee-afe0-bc42f017ff43
PostgreSQL 74a1c565-4e59-48d7-9d67-7c03b8a13137

このページ