Flow Service API を使用したクラウドストレージソースのデータフローの作成
作成対象:
- 開発者
このチュートリアルでは、クラウドストレージソースからデータを取得し、Flow Service API を使用してExperience Platformに取り込む手順を説明します。
はじめに
このチュートリアルは、Adobe Experience Platform の次のコンポーネントを実際に利用および理解しているユーザーを対象としています。
-
Experience Data Model (XDM) System:Experience Platform が顧客体験データを整理するための標準的なフレームワーク。
- スキーマ構成の基本:スキーマ構成の主要な原則やベストプラクティスなど、XDM スキーマの基本的な構成要素について説明します。
- スキーマレジストリ開発者ガイドには、Schema Registry API の呼び出しを正常に実行するために知っておくべき重要な情報が含まれています。これには、
{TENANT_ID}
、「コンテナ」の概念、リクエストを行うのに必要なヘッダー(Accept ヘッダーと使用可能な値には特に注意を払う)が含まれます。
-
Catalog Service:カタログは、 Experience Platform 内のデータの位置と系統を記録するシステムです。
-
Batch ingestion:Batch Ingestion API を使用すると、データをバッチファイルとして Experience Platform に取り込むことができます。
-
サンドボックス: Experience Platformには、1 つのExperience Platform インスタンスを別々の仮想環境に分割し、デジタルエクスペリエンスアプリケーションの開発と発展に役立つ仮想サンドボックスが用意されています。
Experience Platform API の使用
Experience Platform API を正常に呼び出す方法について詳しくは、Experience Platform API の概要を参照してください。
ソース接続の作成
ベース接続 ID、取り込むソースファイルのパス、ソースの対応する接続仕様 ID を指定しながら Flow Service API の sourceConnections
エンドポイントに対して POST リクエストを行うことで、ソース接続を作成できます。
ソース接続を作成する場合は、データ形式属性の列挙値も定義する必要があります。
ファイルベースのソースには、次の列挙値を使用します。
データ形式 | 列挙値 |
---|---|
区切り | delimited |
JSON | json |
PARQUET | parquet |
すべてのテーブルベースのソースで、値を tabular
に設定します。
API 形式
POST /sourceConnections
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}",
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
baseConnectionId
data.format
delimited
、JSON
、parquet
です。data.properties
data.properties.columnDelimiter
,
)が使用されます。 注意:columnDelimiter
プロパティは、区切り文字付きファイルを取り込む場合にのみ使用できます。data.properties.encoding
UTF-8
と ISO-8859-1
です。 メモ:encoding
パラメーターは、区切り形式の CSV ファイルを取り込む場合にのみ使用できます。 他のファイルタイプは、デフォルトのエンコーディング UTF-8
で取り込まれます。data.properties.compressionType
bzip2
、gzip
、deflate
、zipDeflate
、tarGzip
、tar
です。 メモ:compressionType
プロパティは、区切り文字付きまたは JSON ファイルを取り込む場合にのみ使用できます。params.path
/acme/summerCampaign/*.csv
は /acme/summerCampaign/
フォルダー全体を取り込みます。params.type
file
を使用して個々のファイルを取り込み、タイプ folder
を使用してフォルダー全体を取り込みます。connectionSpec.id
応答
リクエストが成功した場合は、新たに作成されたソース接続の一意の ID(id
)が返されます。この ID は、後の手順でデータフローを作成する際に必要になります。
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
正規表現を使用して、取り込む特定のファイルセットを選択します
ソース接続を作成する際に、正規表現を使用して、ソースからExperience Platformに特定のファイルセットを取り込むことができます。
API 形式
POST /sourceConnections
リクエスト
次の例では、名前に premium
が含まれるすべての CSV ファイルの取り込みを指定するために、ファイルパスで正規表現を使用しています。
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/*premium*.csv",
"type": "folder"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
データを再帰的に取り込むためのソース接続の設定
ソース接続を作成する場合は、recursive
パラメーターを使用して、深くネストされたフォルダーからデータを取り込むことができます。
API 形式
POST /sourceConnections
リクエスト
次の例では、recursive: true
パラメーターを使用 Flow Service て、取り込みプロセス中にすべてのサブフォルダーを再帰的に読み取るように指示します。
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source with recursive ingestion",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/customers/premium/buyers/recursive",
"type": "folder",
"recursive": true
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
ターゲット XDM スキーマの作成
ソースデータをExperience Platformで使用するには、必要に応じてターゲットスキーマを作成してソースデータを構造化する必要があります。 次に、ターゲットスキーマを使用して、ソースデータが含まれるExperience Platform データセットが作成されます。
Schema Registry API に POST リクエストを実行することで、ターゲット XDM スキーマを作成できます。
ターゲット XDM スキーマの作成手順について詳しくは、 API を使用したスキーマの作成に関するチュートリアルを参照してください。
ターゲットデータセットの作成
Catalog Service API に POST リクエストを実行し、その際にペイロード内でターゲットスキーマの ID を指定することで、ターゲットデータセットを作成できます。
ターゲットデータセットの作成手順について詳しくは、 API を使用したデータセットの作成に関するチュートリアルを参照してください。
ターゲット接続の作成
ターゲット接続は、取り込まれたデータが取り込まれる宛先への接続を表します。 ターゲット接続を作成するには、データレイクに関連付けられた固定接続仕様 ID を提供する必要があります。この接続仕様 ID は c604ff05-7f1a-43c0-8e18-33bf874cb11c
です。
これで、ターゲットスキーマとターゲットデータセット、およびデータレイクへの接続仕様 ID の一意の識別子が得られました。これらの識別子を使用すると、受信ソースデータを格納するデータセットを指定する Flow Service API を使用して、ターゲット接続を作成することができます。
API 形式
POST /targetConnections
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
。data.schema.version
application/vnd.adobe.xed-full+json;version=1
に設定する必要があります。params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
です。応答
リクエストが成功した場合は、新しいターゲット接続の一意の ID(id
)が返されます。この ID は、後の手順で必要になります。
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
マッピングの作成
ソースデータをターゲットデータセットに取り込むには、まず、ターゲットデータセットが準拠するターゲットスキーマにマッピングする必要があります。
マッピングセットを作成するには、ターゲット XDM スキーマ $id
および作成するマッピングセットの詳細を入力する際に、Data Prep API の mappingSets
エンドポイントに POST リクエストを行います。
API 形式
POST /conversion/mappingSets
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
xdmSchema
応答
リクエストが成功した場合は、一意の ID(id
)を含む、新しく作成されたマッピングの詳細が返されます。この値は、後の手順でデータフローを作成する際に必要になります。
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
データフロー仕様の取得
データフローは、ソースからデータを収集し、それらをExperience Platformに取り込む役割を果たします。 データフローを作成するには、まず、クラウドストレージデータの収集を実行するデータフロー仕様を取得する必要があります。
API 形式
GET /flowSpecs?property=name=="CloudStorageToAEP"
リクエスト
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
応答
リクエストが成功した場合は、ソースからExperience Platformにデータを取り込む必要があるデータフローの仕様の詳細が返されます。 応答には、新しいデータフローを作成するために必要な、一意のフロー仕様 id
が含まれます。
{
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"name": "CloudStorageToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"attributes": {
"isSourceFlow": true,
"flacValidationSupported": true,
"frequency": "batch",
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"sourceConnectionSpecIds": [
"b3ba5556-48be-44b7-8b85-ff2b69b46dc4",
"ecadc60c-7455-4d87-84dc-2a0e293d997b",
"b7829c2f-2eb0-4f49-a6ee-55e33008b629",
"4c10e202-c428-4796-9208-5f1f5732b1cf",
"fb2e94c9-c031-467d-8103-6bd6e0a432f2",
"32e8f412-cdf7-464c-9885-78184cb113fd",
"b7bf2577-4520-42c9-bae9-cad01560f7bc",
"998b8ae3-cec0-43b7-8abe-40b1eb4ee069",
"be5ec48c-5b78-49d5-b8fa-7c89ec4569b8",
"54e221aa-d342-4707-bcff-7a4bceef0001",
"c85f9425-fb21-426c-ad0b-405e9bd8a46c",
"26f526f2-58f4-4712-961d-e41bf1ccc0e8"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
},
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"transformationSpec": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"runSpec": {
"name": "ProviderParams",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for creating flow run.",
"properties": {
"startTime": {
"type": "integer",
"description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
},
"windowStartTime": {
"type": "integer",
"description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"windowEndTime": {
"type": "integer",
"description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
}
},
"required": [
"startTime",
"windowStartTime",
"windowEndTime"
]
}
}
}
データフローの作成
クラウドストレージデータを収集するための最後の手順は、データフローを作成することです。現時点で、次の必要な値の準備ができています。
データフローは、ソースからデータをスケジュールおよび収集する役割を果たします。ペイロードに前述の値を提供しながら POST リクエストを実行することで、データフローを作成することができます。
取り込みをスケジュールするには、まず開始時刻の値をエポック時間(秒)に設定する必要があります。次に、頻度の値を次の 5 つのオプションのいずれかに設定する必要があります。once
、minute
、hour
、day
または week
。インターバルの値には、連続した 2 回の取り込みの間の期間を指定します。1 回限りの取り込みを作成する場合は、インターバルを設定する必要はありません。その他のすべての頻度では、間隔の値を 15
以上に設定する必要があります。
API 形式
POST /flows
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Experience Platform",
"description": "Cloud Storage flow to Experience Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
once
、minute
、hour
、day
、week
です。scheduleParams.interval
インターバルは 2 つの連続したフロー実行の間隔を指定します。インターバルの値はゼロ以外の整数にしてください。各頻度で許容される最小のインターバル値は次のとおりです。
- 1 回:なし
- 分: 15
- 時間: 1
- 日: 1
- 週: 1
応答
リクエストが成功した場合は、新しく作成したデータフローの ID(id
)が返されます。
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
データフローの監視
データフローが作成されると、それを通して取り込まれるデータを監視し、フローの実行状況、完了状況、エラーなどの情報を確認することができます。データフローのモニタリング方法について詳しくは、API でのデータフローの監視のチュートリアルを参照してください。
次の手順
このチュートリアルでは、ソースコネクタを作成し、スケジュールに従ってクラウドストレージからデータを収集しました。これで、Real-Time Customer Profile や Data Science Workspace などのダウンストリームのExperience Platform サービスで受信データを使用できるようになりました。 詳しくは、次のドキュメントを参照してください。
付録
次の節では、様々なクラウドストレージソースコネクタとその接続仕様を示します。
接続仕様
ecadc60c-7455-4d87-84dc-2a0e293d997b
86043421-563b-46ec-8e6c-e23184711bf6
4c10e202-c428-4796-9208-5f1f5732b1cf
b3ba5556-48be-44b7-8b85-ff2b69b46dc4
bf9f5905-92b7-48bf-bf20-455bc6b60a4e
be5ec48c-5b78-49d5-b8fa-7c89ec4569b8
32e8f412-cdf7-464c-9885-78184cb113fd
54e221aa-d342-4707-bcff-7a4bceef0001
c85f9425-fb21-426c-ad0b-405e9bd8a46c
bf367b0d-3d9b-4060-b67b-0d3d9bd06094