ソースコネクタとAPIを使用してストリーミングデータを収集する

Flow Service は、Adobe Experience Platform内のさまざまな異なるソースから顧客データを収集し、一元化するために使用されます。 このサービスは、ユーザーインターフェイスとRESTful APIを提供し、サポートされるすべてのソースを接続できます。

このチュートリアルでは、ストリーミングソースコネクタからデータを取得し、 Experience Platform APIを使用するための手順について説明し Flow Service ます

はじめに

このチュートリアルでは、ストリーミングコネクタの有効な接続IDが必要です。 この情報がない場合は、このチュートリアルを試行する前に、次のチュートリアルのストリーミングソース接続の作成を参照してください。

また、このチュートリアルでは、Adobe Experience Platformの次のコンポーネントについて、十分に理解している必要があります。

  • Experience Data Model (XDM) System:Experience Platformが顧客体験データを編成する際に使用する標準化されたフレームワーク。
    • スキーマ構成の基本:スキーマ構成の主要な原則やベストプラクティスなど、XDM スキーマの基本的な構成要素について学びます。
    • スキーマレジストリ開発ガイド:スキーマレジストリAPIの呼び出しを正常に実行するために知っておく必要がある重要な情報が含まれます。 これには、{TENANT_ID}、「コンテナ」の概念、リクエストをおこなうために必要なヘッダー(Accept ヘッダーとその可能な値に特に注意)が含まれます。
  • Catalog Service:カタログは、内のデータの場所と系列のレコードシステムで Experience Platformす。
  • Streaming ingestion:のストリーミング取り込み Platform 機能により、ユーザーはクライアントおよびサーバー側のデバイスからデータをリアルタイムに送信す Experience Platform る方法を使用できます。
  • サンドボックス: Experience Platform は、1つの Platform インスタンスを別々の仮想環境に分割し、デジタルエクスペリエンスアプリケーションの開発と発展に役立つ仮想サンドボックスを提供します。

The following sections provide additional information that you will need to know in order to successfully collect streaming data using the Flow Service API.

API 呼び出し例の読み取り

このチュートリアルでは、API 呼び出しの例を提供し、リクエストの形式を設定する方法を示します。この中には、パス、必須ヘッダー、適切な形式のリクエストペイロードが含まれます。また、API レスポンスで返されるサンプル JSON も示されています。ドキュメントで使用される API 呼び出し例の表記について詳しくは、 トラブルシューテングガイドのAPI 呼び出し例の読み方に関する節を参照してください。Experience Platform

必須ヘッダーの値の収集

In order to make calls to Platform APIs, you must first complete the authentication tutorial. Completing the authentication tutorial provides the values for each of the required headers in all Experience Platform API calls, as shown below:

  • Authorization: Bearer {ACCESS_TOKEN}
  • x-api-key: {API_KEY}
  • x-gw-ims-org-id: {IMS_ORG}

All resources in Experience Platform, including those belonging to Flow Service, are isolated to specific virtual sandboxes. All requests to Platform APIs require a header that specifies the name of the sandbox the operation will take place in:

  • x-sandbox-name: {SANDBOX_NAME}

ペイロード(POST、PUT、PATCH)を含むすべてのリクエストには、メディアのタイプを指定する以下のような追加ヘッダーが必要です。

  • Content-Type: application/json

ソース接続の作成

You can create a source connection by making a POST request to the Flow Service API. ソース接続は、接続ID、ソースデータファイルのパス、および接続仕様IDで構成されます。

ソース接続を作成するには、データ形式属性の列挙値も定義する必要があります。

ファイルベースのコネクタの列挙値は、次のとおりです。

データフォーマット 列挙値
区切り delimited
JSON json
パーケ parquet

テーブルベースのすべてのコネクタの値をに設定し tabularます。

API 形式

POST /sourceConnections

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Test source connector for streaming data",
        "providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
        "connectionId": "f6aa6c58-3c3d-4c59-aa6c-583c3d6c599c",
        "description": "Test source connector for streaming data",
        "data": {
            "format": "delimited"
        },
            "connectionSpec": {
            "id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
            "version": "1.0"
        }
    }'
プロパティ 説明
providerId ストリーミングコネクタのプロバイダID。
connectionId ストリーミングコネクタの一意の接続ID。
connectionSpec.id 特定のストリーミングコネクタに関連付けられている接続仕様ID。

応答

正常な応答は、新たに作成されたソース接続の固有な識別子(id)を返します。 このIDは、後の手順でデータフローを作成する際に必要です。

{
    "id": "2abd97c4-91bb-4c93-bd97-c491bbfc933d",
    "etag": "\"66013508-0000-0200-0000-5f6e2ae70000\""
}

ターゲットXDMスキーマの作成

でソースデータを使用するには、必要に応じてソースデータを構造化するためのターゲットスキーマを作成する Platform必要があります。 次に、このターゲットスキーマを使用して、ソースデータが含まれる Platform データセットを作成します。 このターゲットXDMスキーマはXDM Individual Profile クラスも拡張します。

ターゲットXDMスキーマは、 スキーマレジストリAPIに対するPOST要求を実行することで作成できます

API 形式

POST /tenant/schemas

リクエスト

次のリクエスト例は、XDM Individual Profile クラスを拡張するXDMスキーマを作成します。

curl -X POST \
    'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "type": "object",
        "title": "Sample schema for a streaming connector",
        "description": "Sample schema for a streaming connector",
        "allOf": [
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
            }
        ],
        "meta:containerId": "tenant",
        "meta:resourceType": "schemas",
        "meta:xdmType": "object",
        "meta:class": "https://ns.adobe.com/xdm/context/profile"
    }'

応答

A successful response returns details of the newly created schema including its unique identifier ($id). このIDは、後の手順でターゲットデータセット、マッピング、データフローを作成する際に必要となります。

{
    "$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
    "meta:altId": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
    "meta:resourceType": "schemas",
    "version": "1.0",
    "title": "Sample schema for a streaming connector",
    "type": "object",
    "description": "Sample schema for a streaming connector",
    "allOf": [
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
            "type": "object",
            "meta:xdmType": "object"
        }
    ],
    "refs": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "imsOrg": "{IMS_ORG}",
    "meta:extensible": false,
    "meta:abstract": false,
    "meta:extends": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/common/auditable",
        "https://ns.adobe.com/xdm/data/record",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "meta:xdmType": "object",
    "meta:registryMetadata": {
        "repo:createdDate": 1604960074752,
        "repo:lastModifiedDate": 1604960074752,
        "xdm:createdClientId": "{CREATED_CLIENT_ID}",
        "xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
        "xdm:createdUserId": "{CREATED_USER_ID}",
        "xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
        "eTag": "8522a151effd974429518ed90c3eaf6efc9bf6ffb6644087a85c6d4455dcd045",
        "meta:globalLibVersion": "1.16.1"
    },
    "meta:class": "https://ns.adobe.com/xdm/context/profile",
    "meta:containerId": "tenant",
    "meta:sandboxId": "{SANDBOX_ID}",
    "meta:sandboxType": "production",
    "meta:tenantNamespace": "_{TENANT_ID}"
}

ターゲットデータセットの作成

ターゲットデータセットは、 カタログサービスAPIに対してPOSTリクエストを実行し、ペイロード内のターゲットスキーマのIDを提供することで作成できます。

API 形式

POST /catalog/dataSets

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "schemaRef": {
            "id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
            "contentType": "application/vnd.adobe.xed-full-notext+json; version=1.1"
        },
        "fileDescription": {
            "format": "parquet"
        },
        "tags": {
            "identity": [
            "enabled:true"
            ],
            "profile": [
            "enabled:true"
            ]
        },
        "name": "Test streaming dataset"
    }'
プロパティ 説明
schemaRef.id ターゲットXDMスキーマのID。

応答

A successful response returns an array containing the ID of the newly created dataset in the format "@/datasets/{DATASET_ID}". データセット ID は、API 呼び出しでデータセットを参照するために使用される、読み取り専用のシステム生成文字列です。ターゲットデータセットIDは、後の手順でターゲット接続とデータフローを作成する際に必要となります。

[
    "@/dataSets/5f7187bac6d00f194fb937c0"
]

ターゲット接続の作成

ターゲット接続は、取り込まれたデータが到着した宛先への接続を表します。 ターゲット接続を作成するには、データレークに関連付けられた固定接続仕様IDを指定する必要があります。 この接続仕様IDは次のとおりです。 c604ff05-7f1a-43c0-8e18-33bf874cb11c.

ターゲットスキーマ、ターゲットデータセット、データレークへの接続仕様IDに固有の識別子が追加されました。 これらの識別子を使用して、 Flow Service APIを使用してターゲット接続を作成し、受信ソースデータを含むデータセットを指定できます。

API 形式

POST /targetConnections

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Streaming target connection",
        "description": "Streaming target connection",
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        },
        "data": {
            "format": "parquet_xdm"
        },
        "params": {
        "dataSetId": "5f7187bac6d00f194fb937c0"
        }
    }'
プロパティ 説明
params.dataSetId ターゲットデータセットのID。
connectionSpec.id Data Lakeへの接続に使用する接続仕様ID。 このIDは次のとおりです。 c604ff05-7f1a-43c0-8e18-33bf874cb11c.

応答

正常な応答は、新しいターゲット接続の固有な識別子(id)を返します。 このIDは、後の手順で必要になります。

{
    "id": "d9300194-6a82-4163-b001-946a821163b8",
    "etag": "\"4006d3e4-0000-0200-0000-5f7189220000\""
}

マッピングの作成

ソースデータをターゲットデータセットに取り込むには、まず、ターゲットデータセットが準拠するターゲットスキーマにマッピングする必要があります。 これは、リクエストペイロード内で定義されたデータマッピングを使用して、コンバージョンサービスに対するPOSTリクエストを実行することで達成されます。

API 形式

POST /conversion/mappingSets

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
        "xdmVersion": "1.0",
        "mappings": [
            {
                "destinationXdmPath": "person.name.firstName",
                "sourceAttribute": "firstName",
                "identity": false,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.lastName",
                "sourceAttribute": "lastName",
                "identity": false,
                "version": 0
            }
        ]
    }'
プロパティ 説明
xdmSchema ターゲット $id のXDMスキーマ。

応答

正常な応答は、新たに作成されたマッピングの詳細(一意の識別子(id)を含む)を返します。 このIDは、後の手順でデータフローを作成する際に必要です。

{
    "id": "380b032b445a46008e77585e046efe5e",
    "version": 0,
    "createdDate": 1604960750613,
    "modifiedDate": 1604960750613,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

データフロー仕様の検索

データフローは、ソースからデータを収集し、それらをに取り込む役割を持ち Platformます。 データフローを作成するには、まず Flow Service APIに対してGETリクエストを実行して、データフロー仕様を取得する必要があります。 データフロー仕様は、ストリーミングコネクタからデータを収集します。
API 形式

GET /flowSpecs?property=name=="Steam data with transformation"

リクエスト

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="Steam data with transformation"' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

応答

正常に応答すると、ストリーミングコネクタからのデータをに取り込むデータフロー仕様の詳細が返され Platformます。 このIDは、次の手順で新しいデータフローを作成する際に必要です。

{
    "items": [
        {
            "id": "c1a19761-d2c7-4702-b9fa-fe91f0613e81",
            "name": "Steam data with transformation",
            "providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
            "version": "1.0",
            "sourceConnectionSpecIds": [
                "d27d4907-7351-47dd-bbc2-05a04365703d",
                "51ae16c2-bdad-42fd-9fce-8d5dfddaf140",
                "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb"
            ],
            "targetConnectionSpecIds": [
                "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
            ],
            "transformationSpecs": [
                {
                    "name": "Mapping",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "description": "defines various params required for different mapping from Raw to XDM",
                        "properties": {
                            "mappingId": {
                                "type": "string"
                            }
                        },
                        "required": [
                            "mappingId"
                        ]
                    }
                }
            ],
            "attributes": {
                "uiAttributes": {
                    "apiFeatures": {
                        "deleteSupported": false,
                        "updateSupported": false,
                        "flowRunsSupported": false
                    }
                }
            },
            "permissionsInfo": {
                "view": [
                    {
                        "@type": "lowLevel",
                        "name": "StreamingSource",
                        "permissions": [
                            "read"
                        ]
                    }
                ],
                "manage": [
                    {
                        "@type": "lowLevel",
                        "name": "StreamingSource",
                        "permissions": [
                            "write"
                        ]
                    }
                ]
            }
        }
    ]
}

データフローの作成

ストリーミングデータを収集する最後の手順は、データフローを作成することです。 現時点では、次の必須の値を用意しておきます。

データフローは、ソースからのデータのスケジュールおよび収集を担当します。 POST内で前述の値を提供しながらペイロードリクエストを実行すると、データフローを作成できます。

API 形式

POST /flows

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Streaming dataflow",
        "description": "Streaming dataflow",
        "flowSpec": {
            "id": "c1a19761-d2c7-4702-b9fa-fe91f0613e81",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "2abd97c4-91bb-4c93-bd97-c491bbfc933d"
        ],
        "targetConnectionIds": [
            "723222e2-6ab9-4b0b-b222-e26ab9bb0bc2"
        ],
        "transformations": [
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "380b032b445a46008e77585e046efe5e",
                    "mappingVersion": 0
                }
            }
        ]
    }'
プロパティ 説明
flowSpec.id 前の手順で取得した フロー仕様ID
sourceConnectionIds 前の手順で取得した ソース接続ID
targetConnectionIds 前の手順で取得した ターゲット接続ID
transformations.params.mappingId 前の手順で取得した マッピングID

応答

A successful response returns the ID (id) of the newly created dataflow.

{
    "id": "1f086c23-2ea8-4d06-886c-232ea8bd061d",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

次の手順

このチュートリアルに従って、データフローを作成し、ストリーミングコネクタからストリーミングデータを収集しました。 受信データは、やなどのダウンストリーム Platform サービスで使用でき Real-time Customer Profile るようになり Data Science Workspaceました。 詳しくは、次のドキュメントを参照してください。

このページ