Flow Service は、Adobe Experience Platform内のさまざまな異なるソースから顧客データを収集し、一元化するために使用されます。 このサービスは、ユーザーインターフェイスとRESTful APIを提供し、サポートされるすべてのソースを接続できます。
このチュートリアルでは、ストリーミングソースコネクタからデータを取得し、 Experience Platform APIを使用するための手順について説明し Flow Service ます。
このチュートリアルでは、ストリーミングコネクタの有効な接続IDが必要です。 この情報がない場合は、このチュートリアルを試行する前に、次のチュートリアルのストリーミングソース接続の作成を参照してください。
また、このチュートリアルでは、Adobe Experience Platformの次のコンポーネントについて、十分に理解している必要があります。
{TENANT_ID}
、「コンテナ」の概念、リクエストをおこなうために必要なヘッダー(Accept ヘッダーとその可能な値に特に注意)が含まれます。The following sections provide additional information that you will need to know in order to successfully collect streaming data using the Flow Service API.
このチュートリアルでは、API 呼び出しの例を提供し、リクエストの形式を設定する方法を示します。この中には、パス、必須ヘッダー、適切な形式のリクエストペイロードが含まれます。また、API レスポンスで返されるサンプル JSON も示されています。ドキュメントで使用される API 呼び出し例の表記について詳しくは、 トラブルシューテングガイドのAPI 呼び出し例の読み方に関する節を参照してください。Experience Platform
In order to make calls to Platform APIs, you must first complete the authentication tutorial. Completing the authentication tutorial provides the values for each of the required headers in all Experience Platform API calls, as shown below:
Authorization: Bearer {ACCESS_TOKEN}
x-api-key: {API_KEY}
x-gw-ims-org-id: {IMS_ORG}
All resources in Experience Platform, including those belonging to Flow Service, are isolated to specific virtual sandboxes. All requests to Platform APIs require a header that specifies the name of the sandbox the operation will take place in:
x-sandbox-name: {SANDBOX_NAME}
ペイロード(POST、PUT、PATCH)を含むすべてのリクエストには、メディアのタイプを指定する以下のような追加ヘッダーが必要です。
Content-Type: application/json
You can create a source connection by making a POST request to the Flow Service API. ソース接続は、接続ID、ソースデータファイルのパス、および接続仕様IDで構成されます。
ソース接続を作成するには、データ形式属性の列挙値も定義する必要があります。
ファイルベースのコネクタの列挙値は、次のとおりです。
データフォーマット | 列挙値 |
---|---|
区切り | delimited |
JSON | json |
パーケ | parquet |
テーブルベースのすべてのコネクタの値をに設定し tabular
ます。
API 形式
POST /sourceConnections
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {IMS_ORG}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Test source connector for streaming data",
"providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
"connectionId": "f6aa6c58-3c3d-4c59-aa6c-583c3d6c599c",
"description": "Test source connector for streaming data",
"data": {
"format": "delimited"
},
"connectionSpec": {
"id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
"version": "1.0"
}
}'
プロパティ | 説明 |
---|---|
providerId |
ストリーミングコネクタのプロバイダID。 |
connectionId |
ストリーミングコネクタの一意の接続ID。 |
connectionSpec.id |
特定のストリーミングコネクタに関連付けられている接続仕様ID。 |
応答
正常な応答は、新たに作成されたソース接続の固有な識別子(id
)を返します。 このIDは、後の手順でデータフローを作成する際に必要です。
{
"id": "2abd97c4-91bb-4c93-bd97-c491bbfc933d",
"etag": "\"66013508-0000-0200-0000-5f6e2ae70000\""
}
でソースデータを使用するには、必要に応じてソースデータを構造化するためのターゲットスキーマを作成する Platform必要があります。 次に、このターゲットスキーマを使用して、ソースデータが含まれる Platform データセットを作成します。 このターゲットXDMスキーマはXDM Individual Profile クラスも拡張します。
ターゲットXDMスキーマは、 スキーマレジストリAPIに対するPOST要求を実行することで作成できます。
API 形式
POST /tenant/schemas
リクエスト
次のリクエスト例は、XDM Individual Profile クラスを拡張するXDMスキーマを作成します。
curl -X POST \
'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {IMS_ORG}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"type": "object",
"title": "Sample schema for a streaming connector",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
}
],
"meta:containerId": "tenant",
"meta:resourceType": "schemas",
"meta:xdmType": "object",
"meta:class": "https://ns.adobe.com/xdm/context/profile"
}'
応答
A successful response returns details of the newly created schema including its unique identifier ($id
). このIDは、後の手順でターゲットデータセット、マッピング、データフローを作成する際に必要となります。
{
"$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:altId": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:resourceType": "schemas",
"version": "1.0",
"title": "Sample schema for a streaming connector",
"type": "object",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
"type": "object",
"meta:xdmType": "object"
}
],
"refs": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/context/profile"
],
"imsOrg": "{IMS_ORG}",
"meta:extensible": false,
"meta:abstract": false,
"meta:extends": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/common/auditable",
"https://ns.adobe.com/xdm/data/record",
"https://ns.adobe.com/xdm/context/profile"
],
"meta:xdmType": "object",
"meta:registryMetadata": {
"repo:createdDate": 1604960074752,
"repo:lastModifiedDate": 1604960074752,
"xdm:createdClientId": "{CREATED_CLIENT_ID}",
"xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
"xdm:createdUserId": "{CREATED_USER_ID}",
"xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
"eTag": "8522a151effd974429518ed90c3eaf6efc9bf6ffb6644087a85c6d4455dcd045",
"meta:globalLibVersion": "1.16.1"
},
"meta:class": "https://ns.adobe.com/xdm/context/profile",
"meta:containerId": "tenant",
"meta:sandboxId": "{SANDBOX_ID}",
"meta:sandboxType": "production",
"meta:tenantNamespace": "_{TENANT_ID}"
}
ターゲットデータセットは、 カタログサービスAPIに対してPOSTリクエストを実行し、ペイロード内のターゲットスキーマのIDを提供することで作成できます。
API 形式
POST /catalog/dataSets
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {IMS_ORG}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"contentType": "application/vnd.adobe.xed-full-notext+json; version=1.1"
},
"fileDescription": {
"format": "parquet"
},
"tags": {
"identity": [
"enabled:true"
],
"profile": [
"enabled:true"
]
},
"name": "Test streaming dataset"
}'
プロパティ | 説明 |
---|---|
schemaRef.id |
ターゲットXDMスキーマのID。 |
応答
A successful response returns an array containing the ID of the newly created dataset in the format "@/datasets/{DATASET_ID}"
. データセット ID は、API 呼び出しでデータセットを参照するために使用される、読み取り専用のシステム生成文字列です。ターゲットデータセットIDは、後の手順でターゲット接続とデータフローを作成する際に必要となります。
[
"@/dataSets/5f7187bac6d00f194fb937c0"
]
ターゲット接続は、取り込まれたデータが到着した宛先への接続を表します。 ターゲット接続を作成するには、データレークに関連付けられた固定接続仕様IDを指定する必要があります。 この接続仕様IDは次のとおりです。 c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
ターゲットスキーマ、ターゲットデータセット、データレークへの接続仕様IDに固有の識別子が追加されました。 これらの識別子を使用して、 Flow Service APIを使用してターゲット接続を作成し、受信ソースデータを含むデータセットを指定できます。
API 形式
POST /targetConnections
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {IMS_ORG}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming target connection",
"description": "Streaming target connection",
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
},
"data": {
"format": "parquet_xdm"
},
"params": {
"dataSetId": "5f7187bac6d00f194fb937c0"
}
}'
プロパティ | 説明 |
---|---|
params.dataSetId |
ターゲットデータセットのID。 |
connectionSpec.id |
Data Lakeへの接続に使用する接続仕様ID。 このIDは次のとおりです。 c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
応答
正常な応答は、新しいターゲット接続の固有な識別子(id
)を返します。 このIDは、後の手順で必要になります。
{
"id": "d9300194-6a82-4163-b001-946a821163b8",
"etag": "\"4006d3e4-0000-0200-0000-5f7189220000\""
}
ソースデータをターゲットデータセットに取り込むには、まず、ターゲットデータセットが準拠するターゲットスキーマにマッピングする必要があります。 これは、リクエストペイロード内で定義されたデータマッピングを使用して、コンバージョンサービスに対するPOSTリクエストを実行することで達成されます。
API 形式
POST /conversion/mappingSets
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {IMS_ORG}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"xdmVersion": "1.0",
"mappings": [
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "firstName",
"identity": false,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "lastName",
"identity": false,
"version": 0
}
]
}'
プロパティ | 説明 |
---|---|
xdmSchema |
ターゲット $id のXDMスキーマ。 |
応答
正常な応答は、新たに作成されたマッピングの詳細(一意の識別子(id
)を含む)を返します。 このIDは、後の手順でデータフローを作成する際に必要です。
{
"id": "380b032b445a46008e77585e046efe5e",
"version": 0,
"createdDate": 1604960750613,
"modifiedDate": 1604960750613,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
データフローは、ソースからデータを収集し、それらをに取り込む役割を持ち Platformます。 データフローを作成するには、まず Flow Service APIに対してGETリクエストを実行して、データフロー仕様を取得する必要があります。 データフロー仕様は、ストリーミングコネクタからデータを収集します。
API 形式
GET /flowSpecs?property=name=="Steam data with transformation"
リクエスト
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="Steam data with transformation"' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {IMS_ORG}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
応答
正常に応答すると、ストリーミングコネクタからのデータをに取り込むデータフロー仕様の詳細が返され Platformます。 このIDは、次の手順で新しいデータフローを作成する際に必要です。
{
"items": [
{
"id": "c1a19761-d2c7-4702-b9fa-fe91f0613e81",
"name": "Steam data with transformation",
"providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
"version": "1.0",
"sourceConnectionSpecIds": [
"d27d4907-7351-47dd-bbc2-05a04365703d",
"51ae16c2-bdad-42fd-9fce-8d5dfddaf140",
"bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"transformationSpecs": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from Raw to XDM",
"properties": {
"mappingId": {
"type": "string"
}
},
"required": [
"mappingId"
]
}
}
],
"attributes": {
"uiAttributes": {
"apiFeatures": {
"deleteSupported": false,
"updateSupported": false,
"flowRunsSupported": false
}
}
},
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"write"
]
}
]
}
}
]
}
ストリーミングデータを収集する最後の手順は、データフローを作成することです。 現時点では、次の必須の値を用意しておきます。
データフローは、ソースからのデータのスケジュールおよび収集を担当します。 POST内で前述の値を提供しながらペイロードリクエストを実行すると、データフローを作成できます。
API 形式
POST /flows
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {IMS_ORG}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming dataflow",
"description": "Streaming dataflow",
"flowSpec": {
"id": "c1a19761-d2c7-4702-b9fa-fe91f0613e81",
"version": "1.0"
},
"sourceConnectionIds": [
"2abd97c4-91bb-4c93-bd97-c491bbfc933d"
],
"targetConnectionIds": [
"723222e2-6ab9-4b0b-b222-e26ab9bb0bc2"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "380b032b445a46008e77585e046efe5e",
"mappingVersion": 0
}
}
]
}'
プロパティ | 説明 |
---|---|
flowSpec.id |
前の手順で取得した フロー仕様ID 。 |
sourceConnectionIds |
前の手順で取得した ソース接続ID 。 |
targetConnectionIds |
前の手順で取得した ターゲット接続ID 。 |
transformations.params.mappingId |
前の手順で取得した マッピングID 。 |
応答
A successful response returns the ID (id
) of the newly created dataflow.
{
"id": "1f086c23-2ea8-4d06-886c-232ea8bd061d",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
このチュートリアルに従って、データフローを作成し、ストリーミングコネクタからストリーミングデータを収集しました。 受信データは、やなどのダウンストリーム Platform サービスで使用でき Real-time Customer Profile るようになり Data Science Workspaceました。 詳しくは、次のドキュメントを参照してください。