Query Service (Data Distiller)とデータセットの書き出し
この記事では、Experience Platform Query Service (Data Distiller)とデータセットの書き出しを組み合わせて、次の データ書き出しの使用例を実装する方法について説明します。
- データの検証
- データレイク、BI ツールのData Warehouse
- Aiとマシンラーニングの活用。
Adobe Analyticsでは、 データフィード 機能を使用して、これらのユースケースを実装できます。 データフィードは、Adobe Analytics から生データを取得するための強力な方法です。 この記事では、上記のユースケースを実装できるように、Experience Platformから同様のタイプの生データを取得する方法について説明します。 該当する場合、データとプロセスの違いを明確にするために、この記事に記載されている機能をAdobe Analytics データフィードと比較します。
はじめに
クエリサービス(Data Distiller)を使用したデータの書き出しとデータセットの書き出しは、次の要素で構成されます。
- クエリサービスを使用して、データフィードのデータを出力データセット
として生成する スケジュール済みクエリ を定義します。 - データセット書き出しを使用して、出力データセットをクラウドストレージの宛先に書き出す スケジュール済みデータセット書き出し を定義します。
前提条件
このユースケースで説明されている機能を使用する前に、次のすべての要件を満たしていることを確認してください。
- Experience Platformのデータレイクにデータを収集する実用的な実装。
- Data Distiller アドオンにアクセスして、バッチクエリを実行する権限を持っていることを確認します。 詳しくは、 クエリサービスのパッケージ を参照してください。
- データセットの書き出し機能へのアクセス。Real-Time CDP PrimeまたはUltimate パッケージ、Adobe Journey OptimizerまたはCustomer Journey Analyticsを購入した場合に使用できます。 詳しくは、 データセットをクラウドストレージの宛先に書き出しを参照してください。
- 1つ以上の設定済み宛先(例:Amazon S3、Google Cloud Storage)から、データフィードの生データを書き出すことができます。
クエリサービス
Experience Platform クエリサービスを使用すると、Experience Platform データレイク内の任意のデータセットを、データベーステーブルであるかのようにクエリして結合できます。 その後、結果を新しいデータセットとしてキャプチャして、レポートでさらに使用したり、書き出したりできます。
クエリサービス ユーザーインターフェイス 、PostgresQL プロトコル 🔗を介して接続された クライアント、またはRESTful APIを使用して、データフィードのデータを収集するクエリを作成およびスケジュールできます。
クエリを作成
SELECT文やその他の制限付きコマンドに対する標準ANSI SQLのすべての機能を使用して、データフィードのデータを生成するクエリを作成および実行できます。 詳細については、SQL構文を参照してください。 このSQL構文に加えて、Adobeでは次の機能をサポートしています。
- セッション化と アトリビューション の関数など、Experience Platform データレイクに保存されているイベントデータに対する一般的なビジネス関連タスクの実行に役立つ、事前構築済みのAdobe定義関数(ADF) ,
- いくつかの組み込みSpark SQL関数,
- メタデータ PostgreSQL コマンド ,
- 準備済みステートメント 。
データフィード列
クエリで使用できるXDM フィールドは、データセットのベースとなるスキーマ定義によって異なります。 データセットの基礎となるスキーマを確実に理解できます。 詳しくは、 データセット UI ガイド を参照してください。
データフィード列とXDM フィールド間のマッピングを定義する方法については、Analytics フィールドマッピング を参照してください。 スキーマ、クラス、フィールドグループ、データタイプなど、XDM リソースを管理する方法について詳しくは、 スキーマ UIの概要も参照してください。
例えば、ページ名をデータフィードの一部として使用する場合は、次のようになります。
- Adobe Analytics データフィードのUIで、データフィード定義に追加する列として pagename を選択します。
- クエリサービスでは、
sample_event_dataset_for_website_global_v1_1データセットのweb.webPageDetails.nameをクエリに含めます(Web サイト用のサンプルイベントスキーマ(グローバル v1.1) エクスペリエンスイベントスキーマに基づく)。 詳しくは、Web詳細スキーマフィールドグループ を参照してください。
ID
Experience Platformでは、様々なIDを使用できます。 クエリを作成するときは、IDが正しくクエリされていることを確認します。
多くの場合、別のフィールドグループにIDが存在します。 実装では、ECID (ecid)は、core オブジェクトを持つフィールドグループの一部として定義できます。このオブジェクト自体はidentification オブジェクトの一部です(例:_sampleorg.identification.core.ecid)。 ECIDは、スキーマ内で異なる構成される場合があります。
または、identityMapを使用してIDを照会することもできます。 identityMapはタイプ Mapで、 ネストされたデータ構造を使用します。
Experience PlatformでID フィールドを定義する方法について詳しくは、UIでのID フィールドの定義を参照してください。
Analytics ソースコネクタを使用する場合に、Adobe Analytics IDがExperience Platform IDにどのようにマッピングされるかについては、Analytics データのプライマリ IDを参照してください。 このマッピングは、Analytics ソースコネクタを使用していない場合でも、IDを設定するためのガイダンスとして機能する可能性があります。
ヒットレベルのデータと識別
実装に基づいて、従来Adobe Analyticsで収集されていたヒットレベルのデータが、タイムスタンプ付きのイベントデータとしてExperience Platformに保存されるようになりました。 次の表は、Analytics フィールドマッピング から抽出され、ヒットレベル固有のAdobe Analytics データフィード列をクエリ内の対応するXDM フィールドにマッピングする方法の例を示しています。 次の表に、XDM フィールドを使用したヒット、訪問、訪問者の識別方法の例を示します。
hitid_high + hitid_low_idhitid_low_idhitid_highと共に使用されます。hitid_high_idhitid_highと共に使用されます。hit_time_gmtreceivedTimestampcust_hit_time_gmttimestampvisid_high + visid_lowidentityMapvisid_high + visid_lowendUserIDs._experience.aaid.idvisid_highendUserIDs._experience.aaid.primaryvisid_lowと共に使用して、一意の訪問を識別します。visid_highendUserIDs._experience.aaid.namespace.codevisid_lowと共に使用して、一意の訪問を識別します。visid_lowidentityMapvisid_highと共に使用して、一意の訪問を識別します。cust_visididentityMapcust_visidendUserIDs._experience.aacustomid.idcust_visidendUserIDs._experience.aacustomid.primarycust_visidendUserIDs._experience.aacustomid.namespace.codevisid_lowと共に使用して、顧客訪問者IDを一意に識別します。geo\_*placeContext.geo.*event_listcommerce.purchases, commerce.productViews, commerce.productListOpens, commerce.checkouts, commerce.productListAdds, commerce.productListRemovals, commerce.productListViews, _experience.analytics.event101to200.*, …, _experience.analytics.event901_1000.*page_eventweb.webInteraction.typepage_eventweb.webInteraction.linkClicks.valuepage_event_var_1web.webInteraction.URLpage_event_var_2web.webInteraction.namepaid_searchsearch.isPaidref_typeweb.webReferrertype列を投稿
Adobe Analytics データフィードでは、処理後のデータを含む列であるpost_接頭辞を持つ列という概念を使用します。 詳しくは、データフィードに関する FAQ を参照してください。
Experience Platform Edge Network(Web SDK、モバイルSDK、サーバーAPI)を通じてデータセットで収集されたデータには、post_ フィールドという概念はありません。 その結果、接頭辞post_と接頭辞非-post_のデータフィード列が同じXDM フィールドにマッピングされます。 例えば、page_urlとpost_page_urlの両方のデータフィード列が同じweb.webPageDetails.URL XDM フィールドにマッピングされます。
データ処理の違いについて詳しくは、Adobe AnalyticsとCustomer Journey Analyticsのデータ処理の比較を参照してください。
ただし、Experience Platform データレイクで収集されたデータのpost_接頭辞カラム型は、データフィードのユースケースで正常に使用するには、高度な変換が必要です。 クエリでこれらの高度な変換を実行するには、セッション化、属性、重複排除にAdobe定義の関数を使用する必要があります。 これらの関数の使用方法については、例を参照してください。
参照
他のデータセットのデータを検索するには、標準のSQL機能(WHERE句、INNER JOIN、OUTER JOINなど)を使用します。
計算
フィールド(列)に対して計算を実行するには、標準のSQL関数(例:COUNT(*))、またはSpark SQLの数学演算子および統計演算子および関数部分を使用します。 また、 ウィンドウ関数は、集計を更新し、順序付きサブセットの各行について1つの項目を返すためのサポートを提供します。 これらの関数の使用方法については、例を参照してください。
ネストされたデータ構造
データセットのベースとなるスキーマには、多くの場合、ネストされたデータ構造などの複雑なデータ型が含まれます。 前述のidentityMapは、ネストされたデータ構造の例です。 identityMap データの例については、以下を参照してください。
{
"identityMap":{
"FPID":[
{
"id":"55613368189701342632255821452918751312",
"authenticatedState":"ambiguous"
}
],
"CRM":[
{
"id":"2394509340-30453470347",
"authenticatedState":"authenticated"
}
]
}
}
Spark SQLのexplode()またはその他の配列関数を使用して、ネストされたデータ構造内のデータにアクセスできます。例:
select explode(identityMap) from demosys_cja_ee_v1_website_global_v1_1 limit 15;
または、ドット表記法を使用して個々の要素を参照することもできます。 次に例を示します。
select identityMap.ecid from demosys_cja_ee_v1_website_global_v1_1 limit 15;
詳しくは、クエリサービスでのネストされたデータ構造の操作を参照してください。
例
クエリの場合:
- Experience Platformデータレイクのデータセットから,
- Adobe Defined FunctionsやSpark SQLなどの機能を利用し
- Adobe Analyticsのデータフィードと同様の結果を,
関連項目:
- 閲覧を放棄
- 属性分析
- ボットフィルタリング
- クエリ サービス ガイド 🔗でサポートされているその他の ユースケースを示します。
以下に、セッション間でアトリビューションを適切に適用する例を示します
-
過去90日間を振り返り,
-
セッション化やアトリビューションなどのウィンドウ関数を適用し
-
ingest_timeに基づいて出力を制限します。accordion 詳細 これを行うには、次のことが必要です。
- 処理状態テーブル
checkpoint_logを使用して、現在の取り込み時間と最後の取り込み時間を追跡します。 詳しくは、このガイド を参照してください。 - システム列の削除を無効にします。
_acp_system_metadata.ingestTimeを使用できます。 - 最も内側の
SELECTを使用して、使用するフィールドを取得し、セッション化やアトリビューション計算のためにイベントをルックバック期間に制限します。 例えば、90日です。 - 次のレベル
SELECTを使用して、セッション化および/またはアトリビューションウィンドウ関数およびその他の計算を適用します。 - 出力テーブルで
INSERT INTOを使用して、ルックバックを最後の処理時間から到着したイベントのみに制限します。 これは、_acp_system_metadata.ingestTimeに対して、処理ステータス テーブルに最後に保存された時間をフィルタリングすることで行います。
セッション化ウィンドウ関数の例
code language-sql $$ BEGIN -- Disable dropping system columns set drop_system_columns=false; -- Initialize variables SET @last_updated_timestamp = SELECT CURRENT_TIMESTAMP; -- Get the last processed batch ingestion time SET @from_batch_ingestion_time = SELECT coalesce(last_batch_ingestion_time, 'HEAD') FROM checkpoint_log a JOIN ( SELECT MAX(process_timestamp) AS process_timestamp FROM checkpoint_log WHERE process_name = 'data_feed' AND process_status = 'SUCCESSFUL' ) b ON a.process_timestamp = b.process_timestamp; -- Get the last batch ingestion time SET @to_batch_ingestion_time = SELECT MAX(_acp_system_metadata.ingestTime) FROM events_dataset; -- Sessionize the data and insert into data_feed. INSERT INTO data_feed SELECT * FROM ( SELECT userIdentity, timestamp, SESS_TIMEOUT(timestamp, 60 * 30) OVER ( PARTITION BY userIdentity ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS session_data, page_name, ingest_time FROM ( SELECT userIdentity, timestamp, web.webPageDetails.name AS page_name, _acp_system_metadata.ingestTime AS ingest_time FROM events_dataset WHERE timestamp >= current_date - 90 ) AS a ORDER BY userIdentity, timestamp ASC ) AS b WHERE b.ingest_time >= @from_batch_ingestion_time; -- Update the checkpoint_log table INSERT INTO checkpoint_log SELECT 'data_feed' process_name, 'SUCCESSFUL' process_status, cast(@to_batch_ingestion_time AS string) last_batch_ingestion_time, cast(@last_updated_timestamp AS TIMESTAMP) process_timestamp END $$;アトリビューションウィンドウ関数の例
code language-sql $$ BEGIN SET drop_system_columns=false; -- Initialize variables SET @last_updated_timestamp = SELECT CURRENT_TIMESTAMP; -- Get the last processed batch ingestion time 1718755872325 SET @from_batch_ingestion_time = SELECT coalesce(last_snapshot_id, 'HEAD') FROM checkpoint_log a JOIN ( SELECT MAX(process_timestamp) AS process_timestamp FROM checkpoint_log WHERE process_name = 'data_feed' AND process_status = 'SUCCESSFUL' ) b ON a.process_timestamp = b.process_timestamp; -- Get the last batch ingestion time 1718758687865 SET @to_batch_ingestion_time = SELECT MAX(_acp_system_metadata.ingestTime) FROM demo_data_trey_mcintyre_midvalues; -- Sessionize the data and insert into new_sessionized_data INSERT INTO new_sessionized_data SELECT * FROM ( SELECT _id, timestamp, struct(User_Identity, cast(SESS_TIMEOUT(timestamp, 60 * 30) OVER ( PARTITION BY User_Identity ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as string) AS SessionData, to_timestamp(from_unixtime(ingest_time/1000, 'yyyy-MM-dd HH:mm:ss')) AS IngestTime, PageName, first_url, first_channel_type ) as _demosystem5 FROM ( SELECT _id, ENDUSERIDS._EXPERIENCE.MCID.ID as User_Identity, timestamp, web.webPageDetails.name AS PageName, attribution_first_touch(timestamp, '', web.webReferrer.url) OVER (PARTITION BY ENDUSERIDS._EXPERIENCE.MCID.ID ORDER BY timestamp ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).value AS first_url, attribution_first_touch(timestamp, '',channel.typeAtSource) OVER (PARTITION BY ENDUSERIDS._EXPERIENCE.MCID.ID ORDER BY timestamp ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).value AS first_channel_type, _acp_system_metadata.ingestTime AS ingest_time FROM demo_data_trey_mcintyre_midvalues WHERE timestamp >= current_date - 90 ) ORDER BY User_Identity, timestamp ASC ) WHERE _demosystem5.IngestTime >= to_timestamp(from_unixtime(@from_batch_ingestion_time/1000, 'yyyy-MM-dd HH:mm:ss')); -- Update the checkpoint_log table INSERT INTO checkpoint_log SELECT 'data_feed' as process_name, 'SUCCESSFUL' as process_status, cast(@to_batch_ingestion_time AS string) as last_snapshot_id, cast(@last_updated_timestamp AS timestamp) as process_timestamp; END $$; - 処理状態テーブル
スケジュールクエリ
クエリをスケジュールして、クエリが実行され、結果が好みの間隔で生成されるようにします。
クエリエディターの使用
クエリエディターを使用して、クエリをスケジュールできます。 クエリのスケジュールを設定する際には、出力データセットを定義します。 詳しくは、 クエリスケジュール を参照してください。
Query Service APIの使用
または、RESTful APIを使用して、クエリを定義し、クエリのスケジュールを設定することもできます。 詳しくは、Query Service API ガイド を参照してください。
クエリの作成時( クエリの作成)またはクエリのスケジュール作成時( スケジュールされたクエリの作成)に、オプションのctasParameters プロパティの一部として出力データセットを定義してください。
データセットの書き出し
クエリを作成してスケジュールし、結果を検証したら、生のデータセットをクラウドストレージの宛先に書き出すことができます。 この書き出しは、データセット書き出し先と呼ばれるExperience Platformの宛先の用語です。 概要については、 データセットをクラウドストレージの宛先に書き出しを参照してください。
次のクラウドストレージの宛先がサポートされています。
EXPERIENCE PLATFORM UI
Experience Platform UIを使用して、出力データセットの書き出しと書き出しをスケジュールできます。 この節では、関連する手順について説明します。
宛先を選択
出力データセットを書き出すクラウドストレージの宛先を決定したら、宛先を選択します。 優先クラウドストレージの宛先をまだ設定していない場合は、新しい宛先接続を作成する必要があります。
宛先の設定の一部として、次のことができます
- ファイルタイプ(JSONまたはParquet)を定義します。
- 生成されるファイルを圧縮するかどうか、および
- マニフェストファイルを含めるかどうかを指定します。
データセットを選択
宛先を選択した場合、次のデータセットを選択 ステップで、データセットのリストから出力データセットを選択する必要があります。 複数のスケジュール済みクエリを作成しており、出力データセットを同じクラウドストレージの宛先に送信する場合は、対応する出力データセットを選択できます。 詳しくは、 データセットの選択を参照してください。
データセット書き出しのスケジュール設定
最後に、スケジューリング手順の一環として、データセットの書き出しをスケジュールします。 この手順では、スケジュールと、出力データセットの書き出しを増分にするかどうかを定義できます。 詳しくは、 データセットの書き出しをスケジュール を参照してください。
最終手順
選択内容を確認し、正しい場合は、出力データセットをクラウドストレージの宛先に書き出します。
データの書き出しを成功させるには、検証する必要があります。 データセットを書き出す場合、Experience Platformは、宛先で定義されたストレージの場所に1つまたは複数の.jsonまたは.parquet個のファイルを作成します。 設定した書き出しスケジュールに従って、新しいファイルがストレージの場所に格納されることを期待します。 Experience Platformは、選択した保存先の一部として指定した保存場所にフォルダー構造を作成し、書き出されたファイルを保存します。 書き出し時間ごとに、パターン folder-name-you-provided/datasetID/exportTime=YYYYMMDDHHMMに従って新しいフォルダーが作成されます。 デフォルトのファイル名はランダムに生成され、書き出されたファイルの名前は必ず一意になります。
Flow Service API
または、APIを使用して、出力データセットの書き出しを書き出し、スケジュールすることもできます。 関連する手順については、Flow Service APIを使用したデータセットの書き出しを参照してください。
基本を学ぶ
データセットを書き出すには、必要な権限があることを確認してください。 また、出力データセットを送信する宛先がデータセットの書き出しをサポートしていることを確認します。 次に、API呼び出しで使用する必須ヘッダーとオプション ヘッダーの値を収集する必要があります。 また、データセットを書き出す宛先🔗の接続仕様とフロー仕様IDを特定する必要があります。
適格なデータセットの取得
書き出し用に適格なデータセット のリストを取得し、GET /connectionSpecs/{id}/configs APIを使用して、出力データセットがそのリストに含まれているかどうかを確認できます。
ソース接続の作成
次に、クラウドストレージの宛先に書き出す一意のIDを使用して、出力データセットのソース接続🔗を作成する必要があります。 POST /sourceConnections APIを使用しています。
宛先への認証(ベース接続の作成)
POST /targetConection APIを使用して資格情報を認証し、クラウドストレージの宛先に安全に保存するには、 ベース接続を作成する必要があります。
書き出しパラメーターを指定
次に、POST /targetConection APIをもう1回使用して、出力データセットの書き出しパラメーター🔗を格納する追加のターゲット接続を作成する必要があります。 これらのエクスポートパラメーターには、場所、ファイル形式、圧縮などが含まれます。
データフローの設定
最後に、出力データセットがPOST /flows APIを使用してクラウドストレージの宛先に書き出されるように、 データフローを設定します。 この手順では、scheduleParams パラメーターを使用して、書き出しのスケジュールを定義できます。
データフローの検証
データフロー🔗の正常な実行を確認するには、GET /runs APIを使用し、データフローIDをクエリパラメーターとして指定します。 このデータフローIDは、データフローの設定時に返される識別子です。
データの書き出しが成功したことを確認します。 データセットを書き出す場合、Experience Platformは、宛先で定義されたストレージの場所に1つまたは複数の.jsonまたは.parquet個のファイルを作成します。 設定した書き出しスケジュールに従って、新しいファイルがストレージの場所に格納されることを期待します。 Experience Platformは、選択した保存先の一部として指定した保存場所にフォルダー構造を作成し、書き出されたファイルを保存します。 書き出し時間ごとに、パターン folder-name-you-provided/datasetID/exportTime=YYYYMMDDHHMMに従って新しいフォルダーが作成されます。 デフォルトのファイル名はランダムに生成され、書き出されたファイルの名前は必ず一意になります。
まとめ
つまり、Adobe Analytics データフィード機能をエミュレートすると、クエリサービスを使用してスケジュールされたクエリを設定し、スケジュールされたデータセットの書き出しでこれらのクエリの結果を使用する必要があります。