ストリーミング取得 API を使用してレコードデータをストリーミングする
このチュートリアルは、Adobe Experience Platform Data Ingestion Service API の一部であるストリーミング取得 API の使用を開始する際に役立ちます。
はじめに
このチュートリアルでは、Adobe Experience Platform の各種サービスに関する実用的な知識が必要です。このチュートリアルを開始する前に、次のサービスのドキュメントを確認してください。
- Experience Data Model (XDM):Platform がエクスペリエンスデータを整理する際に使用する、標準化されたフレームワーク。
- スキーマレジストリ開発者ガイド:Schema Registry API の使用可能な各エンドポイントとそれらの呼び出し方法を説明する包括的なガイド。 これには、このチュートリアル全体の呼び出しで表示される
{TENANT_ID}
の理解と、取得用のデータセットの作成に使用されるスキーマの作成方法の理解が含まれます。
- スキーマレジストリ開発者ガイド:Schema Registry API の使用可能な各エンドポイントとそれらの呼び出し方法を説明する包括的なガイド。 これには、このチュートリアル全体の呼び出しで表示される
- Real-Time Customer Profile:複数のソースから集計したデータに基づいて、統合されたリアルタイムの顧客プロファイルを提供します。
Platform API の使用
Platform API を正常に呼び出す方法について詳しくは、Platform API の概要のガイドを参照してください。
XDM Individual Profile クラスに基づいてスキーマを作成します
データセットを作成するには、まず XDM Individual Profile クラスを実装する新しいスキーマを作成する必要があります。 スキーマの作成方法について詳しくは、『スキーマレジストリ API 開発者ガイド』を参照してください。
API 形式
POST /schemaregistry/tenant/schemas
リクエスト
curl -X POST https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'Content-Type: application/json' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-d '{
"type": "object",
"title": "Sample schema",
"description": "Sample description",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-work-details"
}
],
"meta:immutableTags": [
"union"
]
}'
title
description
meta:immutableTags
union
タグが使用されています。応答
正常な応答は、HTTP ステータス 201 と、新しく作成されたスキーマの詳細を返します。
{
"$id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
"meta:altId": "_{TENANT_ID}.schemas.{SCHEMA_ID}",
"meta:resourceType": "schemas",
"version": "1.0",
"type": "object",
"title": "Sample schema",
"description": "Sample description",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-work-details"
}
],
"meta:class": "https://ns.adobe.com/xdm/context/profile",
"meta:abstract": false,
"meta:extensible": false,
"meta:extends": [
"https://ns.adobe.com/xdm/context/profile",
"https://ns.adobe.com/xdm/data/record",
"https://ns.adobe.com/xdm/cpmtext/identitymap",
"https://ns.adobe.com/xdm/common/extensible",
"https://ns.adobe.com/xdm/common/auditable",
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-work-details"
],
"meta:immutableTags": [
"union"
],
"meta:containerId": "tenant",
"imsOrg": "{ORG_ID}",
"meta:xdmType": "object",
"meta:registryMetadata": {
"repo:createDate": 1551376506996,
"repo:lastModifiedDate": 1551376506996,
"xdm:createdClientId": "{CLIENT_ID}",
"xdm:repositoryCreatedBy": "{CREATED_BY}"
}
}
{TENANT_ID}
データセットを作成する際には、$id
属性と version
属性の両方が使用されるので、注意してください。
スキーマのプライマリ ID 記述子の設定
次に、仕事用電子メールアドレス属性をプライマリ識別子として使用して、上で作成した識別子に ID 記述子を追加します。これをおこなうと、次の 2 つの変更が行われます。
-
仕事用電子メールアドレスは必須フィールドになります。つまり、このフィールドなしで送信されたメッセージは検証に失敗し、取得されません。
-
Real-Time Customer Profile は、その個人に関する詳細を結合するのに役立つ識別子として、作業メールアドレスを使用します。
リクエスト
curl -X POST https://platform.adobe.io/data/foundation/schemaregistry/tenant/descriptors \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'Content-Type: application/json' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-d '{
"@type":"xdm:descriptorIdentity",
"xdm:sourceProperty":"/workEmail/address",
"xdm:property":"xdm:code",
"xdm:isPrimary":true,
"xdm:namespace":"Email",
"xdm:sourceSchema":"{SCHEMA_REF_ID}",
"xdm:sourceVersion":1
}
{SCHEMA_REF_ID}
$id
。次のようになります。 "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}"
応答
成功時の応答は、HTTP ステータス 201 と共に、スキーマ用に新しく作成されたプライマリ ID 記述子に関する情報を返します。
{
"xdm:property": "xdm:code",
"xdm:sourceSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
"xdm:namespace": "Email",
"@type": "xdm:descriptorIdentity",
"xdm:sourceVersion": 1,
"xdm:isPrimary": true,
"xdm:sourceProperty": "/workEmail/address",
"@id": "17aaebfa382ce8fc0a40d3e43870b6470aab894e1c368d16",
"meta:containerId": "tenant",
"version": "1",
"imsOrg": "{ORG_ID}"
}
レコードデータのデータセットの作成
スキーマを作成したら、レコードデータを取り込むためのデータセットを作成する必要があります。
API 形式
POST /catalog/dataSets
リクエスト
curl -X POST https://platform.adobe.io/data/foundation/catalog/dataSets \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'Content-Type: application/json' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-d ' {
"name": "Dataset name",
"description": "Dataset description",
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID},
"contentType": "application/vnd.adobe.xed-full+json;version=1"
},
"tags": {
"unifiedIdentity": ["enabled:true"],
"unifiedProfile": ["enabled:true"]
}
}'
応答
正常な応答は、HTTP ステータス 201 と、新しく作成されたデータセットの ID を @/dataSets/{DATASET_ID}
の形式で含む配列を返します。
[
"@/dataSets/5e30d7986c0cc218a85cee65
]
ストリーミング接続の作成
スキーマとデータセットを作成したら、ストリーミング接続を作成できます
ストリーミング接続の作成について詳しくは、『ストリーミング接続作成のチュートリアル』を参照してください。
ストリーミング接続へのレコードデータの取り込み ingest-data
データセットとストリーミング接続を配置したら、XDM 形式の JSON レコードを取り込んで、レコードデータを Platform に取り込むことができます。
API 形式
POST /collection/{CONNECTION_ID}?syncValidation=true
{CONNECTION_ID}
inletId
値です。syncValidation
true
に設定した場合、リクエストが正常に送信されたかどうかを確認するために、即座のフィードバックに使用できます。デフォルトでは、この値は false
に設定されています。 このクエリパラメーターを true
に設定すると、リクエストのレートは 1CONNECTION_ID
あたり 1 分あたり 60 回に制限されます。リクエスト
ストリーミング接続へのレコードデータの取り込みは、ソース名の有無にかかわらず実行できます。
次のリクエスト例では、ソース名が欠落しているレコードを Platform に取り込んでいます。 レコードにソース名がない場合、ストリーミング接続定義からソース ID が追加されます。
curl -X POST https://dcs.adobedc.net/collection/{CONNECTION_ID}?syncValidation=true \
-H "Cache-Control: no-cache" \
-H "Content-Type: application/json" \
-d '{
"header": {
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
"contentType": "application/vnd.adobe.xed-full+json;version=1"
},
"imsOrgId": "{ORG_ID}",
"datasetId": "{DATASET_ID}",
"flowId": "{FLOW_ID}",
},
"body": {
"xdmMeta": {
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
"contentType": "application/vnd.adobe.xed-full+json;version=1"
}
},
"xdmEntity": {
"person": {
"name": {
"firstName": "Jane",
"middleName": "F",
"lastName": "Doe"
},
"birthDate": "1969-03-14",
"gender": "female"
},
"workEmail": {
"primary": true,
"address": "janedoe@example.com",
"type": "work",
"status": "active"
}
}
}
}'
ソース名を含める場合に、ソース名をどのように含めるかを次の例に示します。
"header": {
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
"contentType": "application/vnd.adobe.xed-full+json;version=1"
},
"imsOrgId": "{ORG_ID}",
"datasetId": "{DATASET_ID}",
"source": {
"name": "Sample source name"
}
}
応答
応答が成功すると、HTTP ステータス 200 が、新しくストリーミングされた Profile の詳細と共に返されます。
{
"inletId": "{CONNECTION_ID}",
"xactionId": "1584479347507:2153:240",
"receivedTimeMs": 1584479347507,
"syncValidation": {
"status": "pass"
}
}
{CONNECTION_ID}
xactionId
receivedTimeMs
syncValidation.status
syncValidation=true
クエリパラメーターが追加されたので、この値が表示されます。検証が成功した場合、ステータスは pass
になります。新しく取り込んだレコードデータの取得
以前に取り込んだレコードを検証するには、Profile Access API を使用してレコードデータを取得します。
schema.name
または relatedSchema.name
が _xdm.context.profile
の場合、Profile Access れは すべて 関連の ID を取得します。API 形式
GET /access/entities
GET /access/entities?{QUERY_PARAMETERS}
GET /access/entities?schema.name=_xdm.context.profile&entityId=janedoe@example.com&entityIdNS=email
schema.name
entityId
entityIdNS
リクエスト
既に取り込んだレコードデータは、次の GET リクエストで確認できます。
curl -X GET 'https://platform.adobe.io/data/core/ups/access/entities?schema.name=_xdm.context.profile&entityId=janedoe@example.com&entityIdNS=email'\
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
応答
成功時の応答は、HTTP ステータス 200 と共に、要求されたエンティティの詳細を返します。これは以前に正常に取り込まれた記録と同じであることがわかります。
{
"BVrqzwVv7o2p3naHvnsWpqZXv3KJgA": {
"entityId": "BVrqzwVv7o2p3naHvnsWpqZXv3KJgA",
"mergePolicy": {
"id": "e161dae9-52f0-4c7f-b264-dc43dd903d56"
},
"sources": [
"5e30d7986c0cc218a85cee65"
],
"tags": [
"1580346827274:2478:215"
],
"identityGraph": [
"BVrqzwVv7o2p3naHvnsWpqZXv3KJgA"
],
"entity": {
"person": {
"name": {
"lastName": "Doe",
"middleName": "F",
"firstName": "Jane"
},
"gender": "female",
"birthDate": "1969-03-14"
},
"workEmail": {
"type": "work",
"address": "janedoe@example.com",
"status": "active",
"primary": true
},
"identityMap": {
"email": [
{
"id": "janedoe@example.com"
}
]
}
},
"lastModifiedAt": "2020-01-30T01:13:59Z"
}
}
次の手順
このドキュメントでは、ストリーミング接続を使用してレコードデータを Platform に取り込む方法について説明しました。 異なる値でさらに呼び出しを実行し、更新された値を取得してみてください。さらに、UI を使用して、取り込んだデータの監視 Platform 開始できます。 詳しくは、『データ取得監視ガイド』を参照してください。
一般的なストリーミング取得の詳細については、『ストリーミング取得の概要』を参照してください。