ソースコネクターとAPIを使用したクラウドストレージデータの収集

このチュートリアルでは、サードパーティのクラウドストレージからデータを取得し、ソースコネクタと Flow Service APIを使用してプラットフォームにデータを取り込む手順を説明します

はじめに

このチュートリアルでは、有効な接続を通じてサードパーティのクラウドストレージにアクセスでき、ファイルのパスや構造など、に組み込むファイルに関する情報が必要 Platformです。 この情報がない場合は、このチュートリアルを試す前に、APIを使用したサードパーティのクラウドストレージの 調査に関するチュ Flow Service ートリアル を参照してください。

また、このチュートリアルでは、Adobe Experience Platformの次のコンポーネントについて、十分に理解している必要があります。

  • Experience Data Model (XDM) System:Experience Platformが顧客体験データを編成する際に使用する標準化されたフレームワーク。
    • スキーマ構成の基本:スキーマ構成の主要な原則やベストプラクティスなど、XDM スキーマの基本的な構成要素について学びます。
    • スキーマレジストリ開発ガイド:スキーマレジストリAPIの呼び出しを正常に実行するために知っておく必要がある重要な情報が含まれます。 これには、{TENANT_ID}、「コンテナ」の概念、リクエストをおこなうために必要なヘッダー(Accept ヘッダーとその可能な値に特に注意)が含まれます。
  • Catalog Service:カタログは、内のデータの場所と系列のレコードシステムで Experience Platformす。
  • Batch ingestion:Batch Ingestion APIを使用すると、データをバッチファイル Experience Platform としてに取り込むことができます。
  • サンドボックス: Experience Platform は、1つの Platform インスタンスを別々の仮想環境に分割し、デジタルエクスペリエンスアプリケーションの開発と発展に役立つ仮想サンドボックスを提供します。
    The following sections provide additional information that you will need to know in order to successfully connect to a cloud storage using the Flow Service API.

API 呼び出し例の読み取り

このチュートリアルでは、API 呼び出しの例を提供し、リクエストの形式を設定する方法を示します。この中には、パス、必須ヘッダー、適切な形式のリクエストペイロードが含まれます。また、API レスポンスで返されるサンプル JSON も示されています。ドキュメントで使用される API 呼び出し例の表記について詳しくは、 トラブルシューテングガイドのAPI 呼び出し例の読み方に関する節を参照してください。Experience Platform

必須ヘッダーの値の収集

In order to make calls to Platform APIs, you must first complete the authentication tutorial. Completing the authentication tutorial provides the values for each of the required headers in all Experience Platform API calls, as shown below:

  • Authorization: Bearer {ACCESS_TOKEN}
  • x-api-key: {API_KEY}
  • x-gw-ims-org-id: {IMS_ORG}

All resources in Experience Platform, including those belonging to Flow Service, are isolated to specific virtual sandboxes. All requests to Platform APIs require a header that specifies the name of the sandbox the operation will take place in:

  • x-sandbox-name: {SANDBOX_NAME}

ペイロード(POST、PUT、PATCH)を含むすべてのリクエストには、メディアのタイプを指定する以下のような追加ヘッダーが必要です。

  • Content-Type: application/json

ソース接続の作成

You can create a source connection by making a POST request to the Flow Service API. ソース接続は、接続ID、ソースデータファイルのパス、および接続仕様IDで構成されます。

ソース接続を作成するには、データ形式属性の列挙値も定義する必要があります。

ファイルベースのコネクタの列挙値は、次のとおりです。

データフォーマット 列挙値
区切り delimited
JSON json
パーケ parquet

テーブルベースのすべてのコネクタの値をに設定し tabularます。

メモ

列の区切り文字をプロパティとして指定すると、CSVファイルとTSVファイルをクラウドストレージソースコネクタに取り込むことができます。 任意の1文字の値は、列の区切り文字として使用できます。 指定しない場合、コンマ (,) がデフォルト値として使用されます。

API 形式

POST /sourceConnections

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Cloud storage source connector",
        "connectionId": "9e2541a0-b143-4d23-a541-a0b143dd2301",
        "description": "Cloud storage source connector",
        "data": {
            "format": "delimited",
            "columnDelimiter": "\t"
        },
        "params": {
            "path": "/ingestion-demos/leads/tsv_data/*.tsv",
            "recursive": "true"
        },
            "connectionSpec": {
            "id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
            "version": "1.0"
        }
    }'
プロパティ 説明
connectionId アクセスするサードパーティのクラウドストレージシステムの一意の接続ID。
data.format データ形式属性を定義する列挙値。
data.columnDelimiter 任意の1文字列の列区切り文字を使用して、フラットファイルを収集できます。 このプロパティは、CSVファイルまたはTSVファイルを取り込む場合にのみ必要です。
params.path アクセスするソースファイルのパス。
connectionSpec.id 特定のサードパーティクラウドストレージシステムに関連付けられている接続仕様ID。 接続仕様IDのリストについては、 付録 を参照してください。

応答

正常な応答は、新たに作成されたソース接続の固有な識別子(id)を返します。 このIDは、後の手順でデータフローを作成する際に必要です。

{
    "id": "26b53912-1005-49f0-b539-12100559f0e2",
    "etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}

ターゲットXDMスキーマの作成

でソースデータを使用するには、必要に応じてソースデータを構造化するためのターゲットスキーマを作成する Platform必要があります。 次に、このターゲットスキーマを使用して、ソースデータが含まれる Platform データセットを作成します。

ターゲットXDMスキーマは、 スキーマレジストリAPIに対するPOST要求を実行することで作成できます

API 形式

POST /schemaregistry/tenant/schemas

リクエスト

次のリクエスト例は、XDM Individualプロファイルクラスを拡張するXDMスキーマを作成します。

curl -X POST \
    'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "type": "object",
        "title": "Target schema for a Cloud Storage connector",
        "description": "Target schema for a Cloud Storage connector",
        "allOf": [
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
            }
        ],
        "meta:containerId": "tenant",
        "meta:resourceType": "schemas",
        "meta:xdmType": "object",
        "meta:class": "https://ns.adobe.com/xdm/context/profile"
    }'

応答

A successful response returns details of the newly created schema including its unique identifier ($id). このIDは、後の手順でターゲットデータセット、マッピング、データフローを作成する際に必要となります。

{
    "$id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
    "meta:altId": "_{TENANT_ID}.schemas.995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
    "meta:resourceType": "schemas",
    "version": "1.0",
    "title": "Target schema cloud storage",
    "type": "object",
    "description": "Target schema for cloud storage",
    "allOf": [
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
            "type": "object",
            "meta:xdmType": "object"
        }
    ],
    "refs": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "imsOrg": "{IMS_ORG}",
    "meta:extensible": false,
    "meta:abstract": false,
    "meta:extends": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/common/auditable",
        "https://ns.adobe.com/xdm/data/record",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "meta:xdmType": "object",
    "meta:registryMetadata": {
        "repo:createdDate": 1597783248870,
        "repo:lastModifiedDate": 1597783248870,
        "xdm:createdClientId": "{CREATED_CLIENT_ID}",
        "xdm:lastModifiedClientId": "{LAST_MODIFIED_CLIENT_ID}",
        "xdm:createdUserId": "{CREATED_USER_ID}",
        "xdm:lastModifiedUserId": "{LAST_MODIFIED_USER_ID}",
        "eTag": "596661ec6c7a9c6ae530676e98290a4a58ca29540ed92489cf4478b2bf013a65",
        "meta:globalLibVersion": "1.13.3"
    },
    "meta:class": "https://ns.adobe.com/xdm/context/profile",
    "meta:containerId": "tenant",
    "meta:tenantNamespace": "{TENANT_ID}"
}

ターゲットデータセットの作成

ターゲットデータセットは、 カタログサービスAPIに対してPOSTリクエストを実行し、ペイロード内のターゲットスキーマのIDを提供することで作成できます。

API 形式

POST /catalog/dataSets

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Target dataset for cloud storage",
        "schemaRef": {
            "id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
            "contentType": "application/vnd.adobe.xed-full-notext+json; version=1"
        }
    }'
プロパティ 説明
schemaRef.id ターゲットXDMスキーマのID。

応答

A successful response returns an array containing the ID of the newly created dataset in the format "@/datasets/{DATASET_ID}". データセット ID は、API 呼び出しでデータセットを参照するために使用される、読み取り専用のシステム生成文字列です。ターゲットデータセットIDは、後の手順でターゲット接続とデータフローを作成する際に必要となります。

[
    "@/dataSets/5f3c3cedb2805c194ff0b69a"
]

ターゲット接続の作成

ターゲット接続は、取り込まれたデータが到着した宛先への接続を表します。 ターゲット接続を作成するには、Data Lakeに関連付けられた固定接続仕様IDを指定する必要があります。 この接続仕様IDは次のとおりです。 c604ff05-7f1a-43c0-8e18-33bf874cb11c.

これで、ターゲットスキーマ、ターゲットデータセット、データレークへの接続仕様IDの一意の識別子が得られました。 これらの識別子を使用して、 Flow Service APIを使用してターゲット接続を作成し、受信ソースデータを含むデータセットを指定できます。

API 形式

POST /targetConnections

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Target Connection for a Cloud Storage connector",
        "description": "Target Connection for a Cloud Storage connector",
        "data": {
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
                "version": "application/vnd.adobe.xed-full+json;version=1.0"
            }
        },
        "params": {
            "dataSetId": "5f3c3cedb2805c194ff0b69a"
        },
            "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        }
    }'
プロパティ 説明
data.schema.id ターゲット $id のXDMスキーマ。
params.dataSetId ターゲットデータセットのID。
connectionSpec.id Data Lakeへの固定接続仕様ID。 このIDは次のとおりです。 c604ff05-7f1a-43c0-8e18-33bf874cb11c.

応答

正常な応答は、新しいターゲット接続の固有な識別子(id)を返します。 このIDは、後の手順で必要になります。

{
    "id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

マッピングの作成

ソースデータをターゲットデータセットに取り込むには、まず、ターゲットデータセットが準拠するターゲットスキーマにマッピングする必要があります。 これは、リクエストペイロード内で定義されたデータマッピングを使用して、コンバージョンサービスに対するPOSTリクエストを実行することで達成されます。

API 形式

POST /conversion/mappingSets

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
        "xdmVersion": "1.0",
        "id": null,
        "mappings": [
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "Id",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.firstName",
                "sourceAttribute": "FirstName",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.lastName",
                "sourceAttribute": "LastName",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            }
        ]
    }'
プロパティ 説明
xdmSchema ターゲットXDMスキーマのID。

応答

正常な応答は、新たに作成されたマッピングの詳細(一意の識別子(id)を含む)を返します。 この値は、後の手順でデータフローを作成する際に必要になります。

{
    "id": "bf5286a9c1ad4266baca76ba3adc9366",
    "version": 0,
    "createdDate": 1597784069368,
    "modifiedDate": 1597784069368,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

データフロー仕様の取得

データフローは、ソースからデータを収集し、それらをに取り込む役割を持ち Platformます。 データフローを作成するには、まず、クラウドストレージデータの収集を担当するデータフロー仕様を取得する必要があります。

API 形式

GET /flowSpecs?property=name=="CloudStorageToAEP"

リクエスト

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

応答

成功した応答は、クラウドストレージからのデータの取り込みを担当するデータフロー仕様の詳細を返し Platformます。 この応答には、一意のフロー仕様IDが含まれます。 このIDは、次の手順で新しいデータフローを作成する際に必要です。

{
    "items": [
        {
            "id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
            "name": "CloudStorageToAEP",
            "providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
            "version": "1.0",
            "sourceConnectionSpecIds": [
                "b3ba5556-48be-44b7-8b85-ff2b69b46dc4",
                "ecadc60c-7455-4d87-84dc-2a0e293d997b",
                "b7829c2f-2eb0-4f49-a6ee-55e33008b629",
                "4c10e202-c428-4796-9208-5f1f5732b1cf",
                "fb2e94c9-c031-467d-8103-6bd6e0a432f2",
                "32e8f412-cdf7-464c-9885-78184cb113fd",
                "b7bf2577-4520-42c9-bae9-cad01560f7bc",
                "998b8ae3-cec0-43b7-8abe-40b1eb4ee069",
                "be5ec48c-5b78-49d5-b8fa-7c89ec4569b8"
            ],
            "targetConnectionSpecIds": [
                "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
            ],
            "transformationSpecs": [
                {
                    "name": "Mapping",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "description": "defines various params required for different mapping from source to target",
                        "properties": {
                            "mappingId": {
                                "type": "string"
                            },
                            "mappingVersion": {
                                "type": "string"
                            }
                        }
                    }
                }
            ],
            "scheduleSpec": {
                "name": "PeriodicSchedule",
                "type": "Periodic",
                "spec": {
                    "$schema": "http://json-schema.org/draft-07/schema#",
                    "type": "object",
                    "properties": {
                        "startTime": {
                            "description": "epoch time",
                            "type": "integer"
                        },
                        "endTime": {
                            "description": "epoch time",
                            "type": "integer"
                        },
                        "interval": {
                            "type": "integer"
                        },
                        "frequency": {
                            "type": "string",
                            "enum": [
                                "minute",
                                "hour",
                                "day",
                                "week"
                            ]
                        },
                        "backfill": {
                            "type": "boolean",
                            "default": true
                        }
                    },
                    "required": [
                        "startTime",
                        "frequency",
                        "interval"
                    ],
                    "if": {
                        "properties": {
                            "frequency": {
                                "const": "minute"
                            }
                        }
                    },
                    "then": {
                        "properties": {
                            "interval": {
                                "minimum": 15
                            }
                        }
                    },
                    "else": {
                        "properties": {
                            "interval": {
                                "minimum": 1
                            }
                        }
                    }
                }
            },
            "permissionsInfo": {
                "view": [
                    {
                        "@type": "lowLevel",
                        "name": "EnterpriseSource",
                        "permissions": [
                            "read"
                        ]
                    }
                ],
                "manage": [
                    {
                        "@type": "lowLevel",
                        "name": "EnterpriseSource",
                        "permissions": [
                            "write"
                        ]
                    }
                ]
            }
        }
    ]
}

データフローの作成

クラウドストレージデータを収集する最後の手順は、データフローを作成することです。 現時点では、次の必須の値を用意しておきます。

データフローは、ソースからのデータのスケジュールおよび収集を担当します。 POST内で前述の値を提供しながらペイロードリクエストを実行すると、データフローを作成できます。

取り込みのスケジュールを設定するには、まず開始時間の値を秒単位のエポック時間に設定する必要があります。 次に、頻度の値を次の5つのオプションのいずれかに設定する必要があります。 once、、 minutehourdayまたはのいずれか weekです。 interval値は、2つの連続したインジェスションの間の期間を指定し、1回限りのインジェストを作成する場合に、間隔を設定する必要はありません。 その他のすべての周波数の場合、間隔の値は次の値と等しいかそれ以上に設定する必要があり 15ます。

API 形式

POST /flows

リクエスト

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Cloud Storage flow to Platform",
        "description": "Cloud Storage flow to Platform",
        "flowSpec": {
            "id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "26b53912-1005-49f0-b539-12100559f0e2"
        ],
        "targetConnectionIds": [
            "f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
        ],
        "transformations": [
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
                    "mappingVersion": "0"
                }
            }
        ],
        "scheduleParams": {
            "startTime": "1597784298",
            "frequency":"minute",
            "interval":"30"
        }
    }'
プロパティ 説明
flowSpec.id 前の手順で取得した フロー仕様ID
sourceConnectionIds 前の手順で取得した ソース接続ID
targetConnectionIds 前の手順で取得した ターゲット接続ID
transformations.params.mappingId 前の手順で取得した マッピングID
scheduleParams.startTime エポック時間のデータフローの開始時間。
scheduleParams.frequency データフローがデータを収集する頻度。 指定できる値は次のとおりです。 once、、 minutehourdayまたはのいずれか weekです。
scheduleParams.interval この間隔は、連続する2つのフローの実行間隔を指定します。 間隔の値は、ゼロ以外の整数である必要があります。 頻度を「次の値」に設定する場合、間隔は不要 once です。他の頻度の値に対して、間隔は「次の値」以上に設定する必要があり 15 ます。

応答

A successful response returns the ID (id) of the newly created dataflow.

{
    "id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

データフローの監視

データフローを作成したら、データフローを介して取り込まれるデータを監視し、フローの実行、完了状態、エラーに関する情報を確認できます。 データフローの監視方法の詳細については、APIのデータフローの 監視に関するチュートリアルを参照してください

次の手順

このチュートリアルに従って、ソースコネクタを作成し、クラウドストレージからデータをスケジュールに基づいて収集します。 受信データは、やなどのダウンストリーム Platform サービスで使用でき Real-time Customer Profile るようになり Data Science Workspaceました。 詳しくは、次のドキュメントを参照してください。

付録

次の節では、様々なクラウドストレージのソースコネクタと接続仕様をリストします。

接続の指定

コネクタ名 接続仕様
Amazon S3 (S3) ecadc60c-7455-4d87-84dc-2a0e293d997b
Amazon Kinesis (Kinesis) 86043421-563b-46ec-8e6c-e23184711bf6
Azure Blob (BLOB) 4c10e202-c428-4796-9208-5f1f5732b1cf
Azure Data Lake Storage Gen2 (ADLS Gen2) 0ed90a81-07f4-4586-8190-b40eccef1c5a
Azure Event Hubs (イベントハブ) bf9f5905-92b7-48bf-bf20-455bc6b60a4e
Azure File Storage be5ec48c-5b78-49d5-b8fa-7c89ec4569b8
Google Cloud Storage 32e8f412-cdf7-464c-9885-78184cb113fd
HDFS 54e221aa-d342-4707-bcff-7a4bceef0001
SFTP bf367b0d-3d9b-4060-b67b-0d3d9bd06094

このページ