クエリサービス(Data Distiller)とデータセットの書き出し
この記事では、Experience Platformクエリサービス(データDistiller)とデータセット書き出しの組み合わせを使用して、次の データ書き出しの使用例を実装する方法の概要を説明します。
- データの検証
- データレイク、BI ツールのData Warehouse
- 人工知能と機械学習の準備。
Adobe Analyticsでは、 データフィード機能を使用して、これらのユースケースを実装できます。 データフィードは、Adobe Analytics から生データを取得するための強力な方法です。この記事では、上記のユースケースを実装できるよう、Experience Platformから同様のタイプの生データを取得する方法について説明します。 該当する場合、この記事で説明されている機能をAdobe Analytics データフィードと比較して、データとプロセスの違いを明確にします。
はじめに
クエリサービス(Data Distiller)とデータセット書き出しを使用したデータの書き出しは、次の要素で構成されています。
- クエリサービス を使用して、データフィードのデータを出力データセット として生成する スケジュールされたクエリ を定義する。
- データセット書き出し を使用して、出力データセットをクラウドストレージの宛先に書き出す スケジュールされたデータセット書き出し を定義する。
前提条件
このユースケースで説明している機能を使用する前に、次の要件をすべて満たしていることを確認してください。
- Experience Platformのデータレイクにデータを収集する実用的な実装。
- バッチクエリを実行する権限を持っていることを確認するには、Data Distiller アドオンにアクセスします。 詳しくは、 クエリサービスのパッケージ化を参照してください。
- Real-Time CDP Prime または Ultimate パッケージ、Adobe Journey OptimizerまたはCustomer Journey Analyticsを購入した場合に使用できる、データセットの書き出し機能にアクセスできます。 詳しくは、 クラウドストレージの宛先へのデータセットの書き出しを参照してください。
- データフィードの生データの書き出し先となる、1 つ以上の設定済みの宛先(例:Amazon S3、Google Cloud Storage)。
クエリサービス
Experience Platformクエリサービスを使用すると、Experience Platformデータレイク内のデータセットに対して、データベースのテーブルと同じようにクエリを実行して結合できます。 その後、結果を新しいデータセットとして取得し、レポートや書き出しでさらに使用できます。
クエリサービス ユーザーインターフェイス、PostgresQL プロトコルを介して接続された クライアント、または RESTful API を使用して、データフィードのデータを収集するクエリを作成およびスケジュールできます。
クエリを作成
SELECT 文やその他の制限付きコマンドに標準 ANSI SQL のすべての機能を使用して、データフィードのデータを生成するクエリを作成および実行できます。 詳しくは、SQL 構文を参照してください。 この SQL 構文を超えて、Adobeでは次の機能をサポートしています。
- 事前に作成された Adobe定義関数(ADF)はExperience Platformデータレイクに保存されたイベントデータに対して一般的なビジネス関連タスクを実行するのに役立ちます。これには、 セッション化および アトリビューションの関数が含まれます。
- いくつかのビルトイン Spark SQL 関数、
- メタデータ PostgreSQL コマンド、
- 準備済みステートメント。
データフィード列
クエリで使用できる XDM フィールドは、データセットが基づいているスキーマ定義によって異なります。 データセットの基礎となるスキーマを理解していることを確認します。 詳しくは、 データセット UI ガイドを参照してください。
データフィード列と XDM フィールド間のマッピングを定義するには、Analytics フィールドマッピングを参照してください。 スキーマ、クラス、フィールドグループ、データタイプなど、XDM リソースの管理方法について詳しくは、 スキーマ UI の概要も参照してください。
例えば、ページ名 をデータフィードの一部として使用する場合は、次のようになります。
- Adobe Analytics データフィードの UI では、列として pagename を選択すると、データフィードの定義に追加されます。
- クエリサービスでは、Web サイトのサンプルイベントスキーマ(グローバル v1.1) エクスペリエンスイベントスキーマに基づいて)
sample_event_dataset_for_website_global_v1_1
データセットのweb.webPageDetails.name
をクエリに含めます。 詳しくは、Web 詳細スキーマフィールドグループを参照してください。
ID
Experience Platformとして、様々な ID を使用できます。 クエリを作成する場合は、ID に対して正しくクエリを実行していることを確認します。
ID は、多くの場合、別のフィールドグループで見つかります。 実装では、ECID (ecid
)は、core
オブジェクトを持つフィールドグループの一部として定義できます。このフィールドグループは、それ自体が identification
オブジェクトの一部です(例:_sampleorg.identification.core.ecid
)。 ECID の整理はスキーマによって異なる場合があります。
または、identityMap
を使用して ID をクエリできます。 identityMap
は Map
型で、 ネストされたデータ構造を使用します。
Experience Platformで ID フィールドを定義する方法について詳しくは、UI での ID フィールドの定義を参照してください。
Analytics ソースコネクタを使用する際にAdobe Analytics ID がプライマリ ID にどのようにマッピングされるかについては 🔗Analytics データのExperience Platform ID を参照してください。 このマッピングは、Analytics ソースコネクタを使用していない場合でも、ID を設定するためのガイダンスとして機能する場合があります。
ヒットレベルのデータと識別
実装方法に基づいて、従来Adobe Analyticsで収集されていたヒットレベルのデータが、タイムスタンプ付きのイベントデータとしてExperience Platformに保存されるようになりました。 次の表は、Analytics フィールドマッピングから抽出したもので、ヒットレベル固有のAdobe Analytics データフィード列をクエリ内の対応する XDM フィールドにマッピングする方法の例を示しています。 また、この表は、XDM フィールドを使用してヒット、訪問、訪問者を識別する方法の例も示しています。
hitid_high
+ hitid_low
_id
hitid_low
_id
hitid_high
と共に使用して、ヒットを一意に識別します。hitid_high
_id
hitid_high
と共に使用して、ヒットを一意に識別します。hit_time_gmt
receivedTimestamp
cust_hit_time_gmt
timestamp
visid_high
+ visid_low
identityMap
visid_high
+ visid_low
endUserIDs._experience.aaid.id
visid_high
endUserIDs._experience.aaid.primary
visid_low
と共に使用して、訪問を一意に識別します。visid_high
endUserIDs._experience.aaid.namespace.code
visid_low
と共に使用して、訪問を一意に識別します。visid_low
identityMap
visid_high
と共に使用して、訪問を一意に識別します。cust_visid
identityMap
cust_visid
endUserIDs._experience.aacustomid.id
cust_visid
endUserIDs._experience.aacustomid.primary
cust_visid
endUserIDs._experience.aacustomid.namespace.code
visid_low
と共に使用して、顧客訪問者 ID を一意に識別します。geo\_*
placeContext.geo.*
event_list
commerce.purchases
, commerce.productViews
, commerce.productListOpens
, commerce.checkouts
, commerce.productListAdds
, commerce.productListRemovals
, commerce.productListViews
, _experience.analytics.event101to200.*
, …, _experience.analytics.event901_1000.*
page_event
web.webInteraction.type
page_event
web.webInteraction.linkClicks.value
page_event_var_1
web.webInteraction.URL
page_event_var_2
web.webInteraction.name
paid_search
search.isPaid
ref_type
web.webReferrertype
列を投稿
Adobe Analytics データフィードでは、post_
プレフィックスが付いた列という概念を使用します。プレフィックスは、処理後のデータを含む列です。 詳しくは、データフィードに関する FAQ を参照してください。
Experience PlatformEdge Network(Web SDK、Mobile SDK、Server API)を使用してデータセットに収集されたデータには、post_
フィールドという概念はありません。 その結果、プレフィックス post_
付けられたデータフィード列と 非 プレフィックスのデータフィード列は、同じ XDM フィールドにマッピ post_
グされます。 例えば、page_url
と post_page_url
の両方のデータフィード列が同じ web.webPageDetails.URL
XDM フィールドにマッピングされるとします。
データ処理の違いの概要については、Adobe AnalyticsとCustomer Journey Analytics間のデータ処理の比較を参照してください。
ただし、Experience Platformデータレイクで収集されるデータの post_
プレフィックス列タイプでは、データフィードのユースケースで使用する前に高度な変換が必要です。 クエリでこれらの高度な変換を実行するには、セッション化、アトリビューション、重複排除に 🔗0}Adobe定義関数を使用する必要があります。 これらの関数の使用方法については、 例を参照してください。
参照
他のデータセットからのデータを検索するには、標準の SQL 機能(WHERE
句、INNER JOIN
、OUTER JOIN
など)を使用します。
計算
フィールド(列)に対して計算を実行するには、標準の SQL 関数(例:COUNT(*)
)または Spark SQL の一部である 数学演算子と統計演算子および関数を使用します。 また、window 関数は、集計を更新し、順序付きサブセット内の各行の単一の項目を返すサポートを提供します。 これらの関数の使用方法については、 例を参照してください。
ネストされたデータ構造
データセットのベースとなるスキーマには、多くの場合、ネストされたデータ構造など、複雑なデータタイプが含まれています。 前述の identityMap
は、ネストされたデータ構造の例です。 identityMap
データの例については、以下を参照してください。
{
"identityMap":{
"FPID":[
{
"id":"55613368189701342632255821452918751312",
"authenticatedState":"ambiguous"
}
],
"CRM":[
{
"id":"2394509340-30453470347",
"authenticatedState":"authenticated"
}
]
}
}
Spark SQL の explode()
関数またはその他の配列関数を使用してネストされたデータ構造内のデータにアクセスできます。次に例を示します。
select explode(identityMap) from demosys_cja_ee_v1_website_global_v1_1 limit 15;
または、ドット表記を使用して個々の要素を参照することもできます。 次に例を示します。
select identityMap.ecid from demosys_cja_ee_v1_website_global_v1_1 limit 15;
詳しくは、クエリサービスでのネストされたデータ構造の操作を参照してください。
例
クエリ:
- Experience Platformデータレイク内のデータセットのデータを使用する
- は、Adobe定義関数や Spark SQL、あるいはこれらのいずれかのその他の機能を活用しています。
- 同等のAdobe Analytics データフィードと同様の結果が得られます。
詳しくは、
以下に、セッション間でアトリビューションを適切に適用する例と、その方法を示します
- 過去 90 日間をルックバックとして使用します。
- セッション化やアトリビューションなどのウィンドウ関数を適用する。
ingest_time
に基づいて出力を制限します。
+++
詳細
そのためには…
- 処理ステータステーブル
checkpoint_log
を使用して、現在の取り込み時間と最後の取り込み時間を追跡します。 詳しくは、 このガイドを参照してください。 - システム列の削除を無効にして、
_acp_system_metadata.ingestTime
を使用できるようにします。 - 最も内側の
SELECT
を使用して、使用するフィールドを取得し、セッション化やアトリビューションの計算のためにイベントをルックバック期間に制限します。 例:90 日。 - 次のレベルの
SELECT
を使用して、セッション化やアトリビューションウィンドウ関数、その他の計算を適用します。 - 出力テーブルで
INSERT INTO
を使用して、ルックバックを前回の処理時間以降に到達したイベントのみに制限します。 これを行うには、処理ステータステーブルに最後に保存された時間に対して_acp_system_metadata.ingestTime
フィルタリングします。
セッション化ウィンドウ関数の例
$$ BEGIN
-- Disable dropping system columns
set drop_system_columns=false;
-- Initialize variables
SET @last_updated_timestamp = SELECT CURRENT_TIMESTAMP;
-- Get the last processed batch ingestion time
SET @from_batch_ingestion_time = SELECT coalesce(last_batch_ingestion_time, 'HEAD')
FROM checkpoint_log a
JOIN (
SELECT MAX(process_timestamp) AS process_timestamp
FROM checkpoint_log
WHERE process_name = 'data_feed'
AND process_status = 'SUCCESSFUL'
) b
ON a.process_timestamp = b.process_timestamp;
-- Get the last batch ingestion time
SET @to_batch_ingestion_time = SELECT MAX(_acp_system_metadata.ingestTime)
FROM events_dataset;
-- Sessionize the data and insert into data_feed.
INSERT INTO data_feed
SELECT *
FROM (
SELECT
userIdentity,
timestamp,
SESS_TIMEOUT(timestamp, 60 * 30) OVER (
PARTITION BY userIdentity
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS session_data,
page_name,
ingest_time
FROM (
SELECT
userIdentity,
timestamp,
web.webPageDetails.name AS page_name,
_acp_system_metadata.ingestTime AS ingest_time
FROM events_dataset
WHERE timestamp >= current_date - 90
) AS a
ORDER BY userIdentity, timestamp ASC
) AS b
WHERE b.ingest_time >= @from_batch_ingestion_time;
-- Update the checkpoint_log table
INSERT INTO checkpoint_log
SELECT
'data_feed' process_name,
'SUCCESSFUL' process_status,
cast(@to_batch_ingestion_time AS string) last_batch_ingestion_time,
cast(@last_updated_timestamp AS TIMESTAMP) process_timestamp
END
$$;
アトリビューションウィンドウ関数の例
$$ BEGIN
SET drop_system_columns=false;
-- Initialize variables
SET @last_updated_timestamp = SELECT CURRENT_TIMESTAMP;
-- Get the last processed batch ingestion time 1718755872325
SET @from_batch_ingestion_time =
SELECT coalesce(last_snapshot_id, 'HEAD')
FROM checkpoint_log a
JOIN (
SELECT MAX(process_timestamp) AS process_timestamp
FROM checkpoint_log
WHERE process_name = 'data_feed'
AND process_status = 'SUCCESSFUL'
) b
ON a.process_timestamp = b.process_timestamp;
-- Get the last batch ingestion time 1718758687865
SET @to_batch_ingestion_time =
SELECT MAX(_acp_system_metadata.ingestTime)
FROM demo_data_trey_mcintyre_midvalues;
-- Sessionize the data and insert into new_sessionized_data
INSERT INTO new_sessionized_data
SELECT *
FROM (
SELECT
_id,
timestamp,
struct(User_Identity,
cast(SESS_TIMEOUT(timestamp, 60 * 30) OVER (
PARTITION BY User_Identity
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as string) AS SessionData,
to_timestamp(from_unixtime(ingest_time/1000, 'yyyy-MM-dd HH:mm:ss')) AS IngestTime,
PageName,
first_url,
first_channel_type
) as _demosystem5
FROM (
SELECT
_id,
ENDUSERIDS._EXPERIENCE.MCID.ID as User_Identity,
timestamp,
web.webPageDetails.name AS PageName,
attribution_first_touch(timestamp, '', web.webReferrer.url) OVER (PARTITION BY ENDUSERIDS._EXPERIENCE.MCID.ID ORDER BY timestamp ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).value AS first_url,
attribution_first_touch(timestamp, '',channel.typeAtSource) OVER (PARTITION BY ENDUSERIDS._EXPERIENCE.MCID.ID ORDER BY timestamp ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).value AS first_channel_type,
_acp_system_metadata.ingestTime AS ingest_time
FROM demo_data_trey_mcintyre_midvalues
WHERE timestamp >= current_date - 90
)
ORDER BY User_Identity, timestamp ASC
)
WHERE _demosystem5.IngestTime >= to_timestamp(from_unixtime(@from_batch_ingestion_time/1000, 'yyyy-MM-dd HH:mm:ss'));
-- Update the checkpoint_log table
INSERT INTO checkpoint_log
SELECT
'data_feed' as process_name,
'SUCCESSFUL' as process_status,
cast(@to_batch_ingestion_time AS string) as last_snapshot_id,
cast(@last_updated_timestamp AS timestamp) as process_timestamp;
END
$$;
+++
スケジュール クエリ
クエリをスケジュールして、クエリが実行され、結果が好みの間隔で生成されるようにします。
クエリエディターの使用
クエリエディターを使用してクエリをスケジュールできます。 クエリをスケジュールする場合は、出力データセットを定義します。 詳しくは、 クエリスケジュールを参照してください。
Query Service API の使用
または、RESTful API を使用してクエリを定義し、クエリのスケジュールを設定することもできます。 詳しくは、Query Service API ガイドを参照してください。
クエリを作成する場合( クエリを作成する)やクエリのスケジュールを作成する場合( スケジュールされたクエリを作成する)は、出力データセットをオプションの ctasParameters
プロパティの一部として定義していることを確認します。
データセットの書き出し
クエリを作成し、スケジュールを設定し、結果を確認したら、生のデータセットをクラウドストレージの宛先に書き出すことができます。 この書き出しは、データセット書き出し宛先と呼ばれるExperience Platformー宛先の用語で使用されます。 詳しくは、 クラウドストレージの宛先へのデータセットの書き出しを参照してください。
次のクラウドストレージの宛先がサポートされています。
EXPERIENCE PLATFORMUI
Experience PlatformUI を使用して、出力データセットの書き出しをスケジュールできます。 この節では、関連する手順について説明します。
宛先を選択
出力データセットを書き出すクラウドストレージの宛先を決定したら、 宛先を選択します。 優先クラウドストレージの宛先をまだ設定していない場合は、 新しい宛先接続を作成する必要があります。
宛先の設定の一環として、次のことができます
- ファイルタイプ(JSON または Parquet)を定義します。
- 結果のファイルが圧縮されるかどうか、および
- マニフェストファイルを含めるかどうか。
データセットを選択
宛先を選択したら、次の データセットを選択 手順で、データセットのリストから出力データセットを選択する必要があります。 複数のスケジュールされたクエリを作成し、出力データセットを同じクラウドストレージ宛先に送信する場合、対応する出力データセットを選択できます。 詳しくは データセットの選択を参照してください。
データセット書き出しのスケジュール設定
最後に、スケジュール設定 手順の一部としてデータセットの書き出しをスケジュールします。 その手順では、スケジュールと、出力データセットの書き出しを増分にするかどうかを定義できます。 詳しくは データセットの書き出しをスケジュールを参照してください。
最終手順
確認選択し、正しければ、出力データセットのクラウドストレージ宛先への書き出しを開始します。
データの書き出しが正常に行われたことを 確認する必要があります。 データセットを書き出す際、Experience Platformは、書き出し先に定義されたストレージの場所に 1 つまたは複数の .json
ファイルまたは .parquet
ファイルを作成します。 設定した書き出しスケジュールに従って、新しいファイルがストレージの場所に格納されます。 Experience Platformは、選択された出力先の一部として指定されたストレージの場所にフォルダー構造を作成し、書き出されたファイルを格納します。 folder-name-you-provided/datasetID/exportTime=YYYYMMDDHHMM
のパターンに従って、書き出しのたびに新しいフォルダーが作成されます。 デフォルトのファイル名はランダムに生成され、書き出されたファイルの名前は必ず一意になります。
フローサービス API
または、API を使用して出力データセットの書き出しを書き出し、スケジュールすることもできます。 含まれる手順は、Flow Service API を使用したデータセットの書き出しに記載されています。
基本を学ぶ
データセットを書き出すには、 必要な権限があることを確認します。 また、出力データセットの送信先がデータセットの書き出しをサポートしていることを確認します。 次に、API 呼び出しで使用する 必須ヘッダーとオプションヘッダーの値を収集する必要があります。 また、データセットを書き出す 宛先の接続仕様 ID とフロー仕様 ID を識別する必要もあります。
適格なデータセットの取得
書き出し用に 適格なデータセットのリストを取得し、GET /connectionSpecs/{id}/configs
API を使用して、出力データセットがそのリストに含まれているかどうかを確認できます。
ソース接続を作成
次に、クラウドストレージ宛先に書き出す、出力データセット用に一意の ID を使用して、 ソース接続を作成する必要があります。 POST /sourceConnections
API を使用します。
宛先に対する認証(ベース接続の作成)
POST /targetConection
API を使用して認証を行い、クラウドストレージの宛先に資格情報を安全に保存するには、 ベース接続を作成する必要があります。
エクスポートパラメーターの指定
次に、もう一度 POST /targetConection
API を使用して、出力データセット用に 書き出しパラメーターを保存する追加のターゲット接続を作成する必要があります。 これらの書き出しパラメーターには、場所、ファイル形式、圧縮などが含まれます。
データフローの設定
最後に、 データフローの設定を行い、POST /flows
API を使用して出力データセットがクラウドストレージの宛先に書き出されるようにします。 この手順では、scheduleParams
パラメーターを使用して、書き出しのスケジュールを定義できます。
データフローの検証
データフローの正常な実行を確認するには、GET /runs
API を使用して、データフロー ID をクエリパラメーターとして指定します。 このデータフロー ID は、データフローを設定したときに返される識別子です。
検証成功したデータ書き出し。 データセットを書き出す際、Experience Platformは、書き出し先に定義されたストレージの場所に 1 つまたは複数の .json
ファイルまたは .parquet
ファイルを作成します。 設定した書き出しスケジュールに従って、新しいファイルがストレージの場所に格納されます。 Experience Platformは、選択された出力先の一部として指定されたストレージの場所にフォルダー構造を作成し、書き出されたファイルを格納します。 folder-name-you-provided/datasetID/exportTime=YYYYMMDDHHMM
のパターンに従って、書き出しのたびに新しいフォルダーが作成されます。 デフォルトのファイル名はランダムに生成され、書き出されたファイルの名前は必ず一意になります。
まとめ
つまり、Adobe Analytics データフィード機能のエミュレートは、クエリサービスを使用してスケジュール済みクエリを設定し、これらのクエリの結果をスケジュール済みデータセット書き出しで使用することを意味します。