ストリーミング取得APIを使用したレコードデータのストリーミング
このチュートリアルでは、Adobe Experience Platform Data Ingestion Service APIの一部であるストリーミング取り込みAPIの使用を開始する際に役立ちます。
はじめに
このチュートリアルでは、Adobe Experience Platform の各種サービスに関する実用的な知識が必要です。このチュートリアルを開始する前に、次のサービスのドキュメントを確認してください。
- Experience Data Model (XDM): Experience Platformがエクスペリエンスデータを整理するための標準化されたフレームワークです。
- Schema Registry開発者ガイド : Schema Registry APIの利用可能な各エンドポイントと、それらのエンドポイントへの呼び出し方法について説明する包括的なガイドです。 これには、このチュートリアル全体の呼び出しで表示される
{TENANT_ID}の理解と、取り込み用のデータセットの作成に使用されるスキーマの作成方法の理解が含まれます。
- Schema Registry開発者ガイド : Schema Registry APIの利用可能な各エンドポイントと、それらのエンドポイントへの呼び出し方法について説明する包括的なガイドです。 これには、このチュートリアル全体の呼び出しで表示される
- Real-Time Customer Profile:複数のソースからの集約データに基づいて、統合された消費者プロファイルをリアルタイムで提供します。
Experience Platform APIの使用
Experience Platform APIの呼び出しを正常に行う方法について詳しくは、Experience Platform APIの概要に関するガイドを参照してください。
XDM Individual Profile クラスに基づくスキーマの作成
データセットを作成するには、まずXDM Individual Profile クラスを実装する新しいスキーマを作成する必要があります。 スキーマの作成方法について詳しくは、『スキーマレジストリ API 開発者ガイド』を参照してください。
API 形式
POST /schemaregistry/tenant/schemas
リクエスト
curl -X POST https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'Content-Type: application/json' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-d '{
"type": "object",
"title": "Sample schema",
"description": "Sample description",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-work-details"
}
],
"meta:immutableTags": [
"union"
]
}'
titledescriptionmeta:immutableTagsunion タグを使用して、データをReal-Time Customer Profileに保持します。応答
正常な応答は、HTTP ステータス 201 と、新しく作成されたスキーマの詳細を返します。
{
"$id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
"meta:altId": "_{TENANT_ID}.schemas.{SCHEMA_ID}",
"meta:resourceType": "schemas",
"version": "1.0",
"type": "object",
"title": "Sample schema",
"description": "Sample description",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-work-details"
}
],
"meta:class": "https://ns.adobe.com/xdm/context/profile",
"meta:abstract": false,
"meta:extensible": false,
"meta:extends": [
"https://ns.adobe.com/xdm/context/profile",
"https://ns.adobe.com/xdm/data/record",
"https://ns.adobe.com/xdm/cpmtext/identitymap",
"https://ns.adobe.com/xdm/common/extensible",
"https://ns.adobe.com/xdm/common/auditable",
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-work-details"
],
"meta:immutableTags": [
"union"
],
"meta:containerId": "tenant",
"imsOrg": "{ORG_ID}",
"meta:xdmType": "object",
"meta:registryMetadata": {
"repo:createDate": 1551376506996,
"repo:lastModifiedDate": 1551376506996,
"xdm:createdClientId": "{CLIENT_ID}",
"xdm:repositoryCreatedBy": "{CREATED_BY}"
}
}
{TENANT_ID}データセットを作成する際には、$id 属性と version 属性の両方が使用されるので、注意してください。
スキーマのプライマリ ID 記述子の設定
次に、仕事用電子メールアドレス属性をプライマリ識別子として使用して、上で作成した識別子に ID 記述子を追加します。これをおこなうと、次の 2 つの変更が行われます。
-
仕事用電子メールアドレスは必須フィールドになります。つまり、このフィールドなしで送信されたメッセージは検証に失敗し、取得されません。
-
Real-Time Customer Profileは、仕事用の電子メール アドレスを識別子として使用して、その個人に関する詳細情報を結合します。
リクエスト
curl -X POST https://platform.adobe.io/data/foundation/schemaregistry/tenant/descriptors \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'Content-Type: application/json' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-d '{
"@type":"xdm:descriptorIdentity",
"xdm:sourceProperty":"/workEmail/address",
"xdm:property":"xdm:code",
"xdm:isPrimary":true,
"xdm:namespace":"Email",
"xdm:sourceSchema":"{SCHEMA_REF_ID}",
"xdm:sourceVersion":1
}
{SCHEMA_REF_ID}$id。次のようになります。 "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}"応答
成功時の応答は、HTTP ステータス 201 と共に、スキーマ用に新しく作成されたプライマリ ID 記述子に関する情報を返します。
{
"xdm:property": "xdm:code",
"xdm:sourceSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
"xdm:namespace": "Email",
"@type": "xdm:descriptorIdentity",
"xdm:sourceVersion": 1,
"xdm:isPrimary": true,
"xdm:sourceProperty": "/workEmail/address",
"@id": "17aaebfa382ce8fc0a40d3e43870b6470aab894e1c368d16",
"meta:containerId": "tenant",
"version": "1",
"imsOrg": "{ORG_ID}"
}
レコードデータのデータセットの作成
スキーマを作成したら、レコードデータを取り込むためのデータセットを作成する必要があります。
API 形式
POST /catalog/dataSets
リクエスト
curl -X POST https://platform.adobe.io/data/foundation/catalog/dataSets \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'Content-Type: application/json' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-d ' {
"name": "Dataset name",
"description": "Dataset description",
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID},
"contentType": "application/vnd.adobe.xed-full+json;version=1"
},
"tags": {
"unifiedIdentity": ["enabled:true"],
"unifiedProfile": ["enabled:true"]
}
}'
応答
正常な応答は、HTTP ステータス 201 と、新しく作成されたデータセットの ID を @/dataSets/{DATASET_ID} の形式で含む配列を返します。
[
"@/dataSets/5e30d7986c0cc218a85cee65
]
ストリーミング接続の作成
スキーマとデータセットを作成したら、ストリーミング接続を作成できます
ストリーミング接続の作成について詳しくは、『ストリーミング接続作成のチュートリアル』を参照してください。
ストリーミング接続へのレコードデータの取り込み ingest-data
データセットとストリーミング接続を配置すると、XDM形式のJSON レコードを取り込んで、Experience Platformにレコードデータを取り込むことができます。
API 形式
POST /collection/{CONNECTION_ID}?syncValidation=true
{CONNECTION_ID}inletId 値です。syncValidationtrue に設定した場合、リクエストが正常に送信されたかどうかを確認するために、即座のフィードバックに使用できます。デフォルトでは、この値はfalseに設定されています。 このクエリパラメーターをtrueに設定した場合、リクエストは1分あたりCONNECTION_IDあたり60回に制限されます。リクエスト
ストリーミング接続へのレコードデータの取り込みは、ソース名の有無にかかわらず実行できます。
以下のリクエスト例では、ソース名が見つからないレコードをExperience Platformに取り込みます。 レコードにソース名がない場合は、ストリーミング接続定義からソース IDが追加されます。
curl -X POST https://dcs.adobedc.net/collection/{CONNECTION_ID}?syncValidation=true \
-H "Cache-Control: no-cache" \
-H "Content-Type: application/json" \
-d '{
"header": {
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
"contentType": "application/vnd.adobe.xed-full+json;version=1"
},
"imsOrgId": "{ORG_ID}",
"datasetId": "{DATASET_ID}",
"flowId": "{FLOW_ID}",
},
"body": {
"xdmMeta": {
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
"contentType": "application/vnd.adobe.xed-full+json;version=1"
}
},
"xdmEntity": {
"person": {
"name": {
"firstName": "Jane",
"middleName": "F",
"lastName": "Doe"
},
"birthDate": "1969-03-14",
"gender": "female"
},
"workEmail": {
"primary": true,
"address": "janedoe@example.com",
"type": "work",
"status": "active"
}
}
}
}'
ソース名を含める場合は、次の例にソース名を含める方法を示します。
"header": {
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/{SCHEMA_ID}",
"contentType": "application/vnd.adobe.xed-full+json;version=1"
},
"imsOrgId": "{ORG_ID}",
"datasetId": "{DATASET_ID}",
"source": {
"name": "Sample source name"
}
}
応答
応答が成功すると、HTTP ステータス 200が返され、新しくストリーミングされたProfileの詳細が表示されます。
{
"inletId": "{CONNECTION_ID}",
"xactionId": "1584479347507:2153:240",
"receivedTimeMs": 1584479347507,
"syncValidation": {
"status": "pass"
}
}
{CONNECTION_ID}xactionIdreceivedTimeMssyncValidation.statussyncValidation=true クエリパラメーターが追加されたので、この値が表示されます。検証が成功した場合、ステータスは pass になります。新しく取り込んだレコードデータの取得
以前に取り込んだレコードを検証するには、Profile Access APIを使用してレコードデータを取得します。
schema.nameまたはrelatedSchema.nameが_xdm.context.profileである場合、Profile Accessは すべての 関連IDを取得します。API 形式
GET /access/entities
GET /access/entities?{QUERY_PARAMETERS}
GET /access/entities?schema.name=_xdm.context.profile&entityId=janedoe@example.com&entityIdNS=email
schema.nameentityIdentityIdNSリクエスト
既に取り込んだレコードデータは、次の GET リクエストで確認できます。
curl -X GET 'https://platform.adobe.io/data/core/ups/access/entities?schema.name=_xdm.context.profile&entityId=janedoe@example.com&entityIdNS=email'\
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
応答
成功時の応答は、HTTP ステータス 200 と共に、要求されたエンティティの詳細を返します。これは以前に正常に取り込まれた記録と同じであることがわかります。
{
"BVrqzwVv7o2p3naHvnsWpqZXv3KJgA": {
"entityId": "BVrqzwVv7o2p3naHvnsWpqZXv3KJgA",
"mergePolicy": {
"id": "e161dae9-52f0-4c7f-b264-dc43dd903d56"
},
"sources": [
"5e30d7986c0cc218a85cee65"
],
"tags": [
"1580346827274:2478:215"
],
"identityGraph": [
"BVrqzwVv7o2p3naHvnsWpqZXv3KJgA"
],
"entity": {
"person": {
"name": {
"lastName": "Doe",
"middleName": "F",
"firstName": "Jane"
},
"gender": "female",
"birthDate": "1969-03-14"
},
"workEmail": {
"type": "work",
"address": "janedoe@example.com",
"status": "active",
"primary": true
},
"identityMap": {
"email": [
{
"id": "janedoe@example.com"
}
]
}
},
"lastModifiedAt": "2020-01-30T01:13:59Z"
}
}
次の手順
このドキュメントでは、ストリーミング接続を使用してレコードデータをExperience Platformに取り込む方法を理解できました。 異なる値でさらに呼び出しを実行し、更新された値を取得してみてください。さらに、Experience Platform UIを使用して、取り込んだデータの監視を開始できます。 詳しくは、データ取り込みモニタリングガイドを参照してください。
一般的なストリーミング取り込みの詳細については、ストリーミング取り込みの概要を参照してください。