クエリサービス(Data Distiller)とデータセットの書き出し

この記事では、Experience Platform Query Service (データDistiller)とデータセット書き出しの組み合わせを使用して、次の ​ データ書き出しの使用例 ​ を実装する方法の概要を説明します。

  • データの検証
  • Data Lake、BI ツールのData Warehouse
  • 人工知能と機械学習の準備。

Adobe Analyticsでは、​ データフィード ​ 機能を使用して、これらのユースケースを実装できます。 データフィードは、Adobe Analytics から生データを取得するための強力な方法です。この記事では、Experience Platformから同様のタイプの生データを取得する方法について説明します。これにより、上記のユースケースを実装できます。 該当する場合、この記事で説明されている機能をAdobe Analytics データフィードと比較して、データとプロセスの違いを明確にします。

はじめに

クエリサービス(Data Distiller)とデータセット書き出しを使用したデータの書き出しは、次の要素で構成されています。

  • クエリサービス を使用して、データフィードのデータを出力データセット 出力データセット として生成する スケジュールされたクエリ を定義する。
  • データセット書き出し を使用して、出力データセットをクラウドストレージの宛先に書き出す スケジュールされたデータセット書き出し を定義する。

データフィード

前提条件

このユースケースで説明している機能を使用する前に、次の要件をすべて満たしていることを確認してください。

  • Experience Platformのデータレイクにデータを収集する実用的な実装。
  • バッチクエリを実行する権限を持っていることを確認するには、Data Distiller アドオンにアクセスします。 詳しくは、​ クエリサービスのパッケージ化 ​ を参照してください。
  • データセットの書き出し機能へのアクセス権。この機能は、Real-Time CDP PrimeまたはUltimate パッケージ、Adobe Journey OptimizerまたはCustomer Journey Analyticsを購入した場合に使用できます。 詳しくは、​ クラウドストレージの宛先へのデータセットの書き出し ​ を参照してください。
  • データフィードの生データの書き出し先となる、1 つ以上の設定済みの宛先(例:Amazon S3、Google Cloud Storage)。

クエリサービス

Experience Platform クエリサービスを使用すると、Experience Platform Data Lake 内のデータセットをデータベーステーブルのようにクエリして結合できます。 その後、結果を新しいデータセットとして取得し、レポートや書き出しでさらに使用できます。

クエリサービス ​ ユーザーインターフェイス ​、PostgresQL プロトコルを介して接続された ​ クライアント ​、または RESTful API を使用して、データフィードのデータを収集するクエリを作成およびスケジュールできます。

クエリを作成

SELECT 文やその他の制限付きコマンドに標準 ANSI SQL のすべての機能を使用して、データフィードのデータを生成するクエリを作成および実行できます。 詳しくは、SQL 構文 ​ を参照してください。 この SQL 構文を超えて、Adobeでは次の機能をサポートしています。

データフィード列

クエリで使用できる XDM フィールドは、データセットが基づいているスキーマ定義によって異なります。 データセットの基礎となるスキーマを理解していることを確認します。 詳しくは、​ データセット UI ガイド ​ を参照してください。

データフィード列と XDM フィールド間のマッピングを定義するには、Analytics フィールドマッピング ​ を参照してください。 スキーマ、クラス、フィールドグループ、データタイプなど、XDM リソースの管理方法について詳しくは、​ スキーマ UI の概要 ​ も参照してください。

例えば、ページ名 をデータフィードの一部として使用する場合は、次のようになります。

  • Adobe Analytics データフィードの UI では、列として pagename を選択すると、データフィードの定義に追加されます。
  • クエリサービスでは、web.webPageDetails.nameWeb サイトのサンプルイベントスキーマ(グローバル v1.1) sample_event_dataset_for_website_global_v1_1 エクスペリエンスイベントスキーマに基づいて) データセットの をクエリに含めます。 詳しくは、Web 詳細スキーマフィールドグループ ​ を参照してください。

ID

Experience Platformでは、様々な ID を使用できます。 クエリを作成する場合は、ID に対して正しくクエリを実行していることを確認します。

ID は、多くの場合、別のフィールドグループで見つかります。 実装では、ECID (ecid)は、core オブジェクトを持つフィールドグループの一部として定義できます。このフィールドグループは、それ自体が identification オブジェクトの一部です(例:_sampleorg.identification.core.ecid)。 ECID の整理はスキーマによって異なる場合があります。

または、identityMap を使用して ID をクエリできます。 identityMapMap 型で、​ ネストされたデータ構造 ​ を使用します。

Experience Platformで ID フィールドを定義する方法について詳しくは、UI での ID フィールドの定義 ​ を参照してください。

Analytics ソースコネクタを使用する際にAdobe Analytics ID をExperience Platform ID にマッピングする方法について詳しくは Analytics データの ​プライマリID を参照してください。 このマッピングは、Analytics ソースコネクタを使用していない場合でも、ID を設定するためのガイダンスとして機能する場合があります。

ヒットレベルのデータと識別

実装に基づいて、従来Adobe Analyticsで収集されていたヒットレベルのデータが、タイムスタンプ付きのイベントデータとしてExperience Platformに保存されるようになりました。 次の表は、Analytics フィールドマッピング ​ から抽出したもので、ヒットレベル固有のAdobe Analytics データフィード列をクエリ内の対応する XDM フィールドにマッピングする方法の例を示しています。 また、この表は、XDM フィールドを使用してヒット、訪問、訪問者を識別する方法の例も示しています。

データフィード列
XDM フィールド
タイプ
説明
hitid_high + hitid_low
_id
string
ヒットを識別する一意の ID。
hitid_low
_id
string
hitid_high と共に使用して、ヒットを一意に識別します。
hitid_high
_id
string
hitid_high と共に使用して、ヒットを一意に識別します。
hit_time_gmt
receivedTimestamp
string
ヒットのタイムスタンプ(UNIX® 時間に基づく)。
cust_hit_time_gmt
timestamp
string
このタイムスタンプは、タイムスタンプが有効なデータセットでのみ使用されます。 このタイムスタンプは、UNIX® 時間に基づいて、ヒットと共に送信されます。
visid_high + visid_low
identityMap
オブジェクト
訪問の一意の ID。
visid_high + visid_low
endUserIDs._experience.aaid.id
string
訪問の一意の ID。
visid_high
endUserIDs._experience.aaid.primary
ブール型
visid_low と共に使用して、訪問を一意に識別します。
visid_high
endUserIDs._experience.aaid.namespace.code
string
visid_low と共に使用して、訪問を一意に識別します。
visid_low
identityMap
オブジェクト
visid_high と共に使用して、訪問を一意に識別します。
cust_visid
identityMap
オブジェクト
顧客訪問者 ID。
cust_visid
endUserIDs._experience.aacustomid.id
オブジェクト
顧客訪問者 ID。
cust_visid
endUserIDs._experience.aacustomid.primary
ブール型
顧客訪問者 ID 名前空間コード。
cust_visid
endUserIDs._experience.aacustomid.namespace.code
string
visid_low と共に使用して、顧客訪問者 ID を一意に識別します。
geo\_*
placeContext.geo.*
文字列、数値
ジオロケーションデータ(国、地域、都市など)
event_list
commerce.purchases, commerce.productViews, commerce.productListOpens, commerce.checkouts, commerce.productListAdds, commerce.productListRemovals, commerce.productListViews, _experience.analytics.event101to200.*, …, _experience.analytics.event901_1000.*
string
ヒットに対してトリガーされる標準コマースイベントとカスタムイベント。
page_event
web.webInteraction.type
string
イメージリクエストで送信されるヒットのタイプ(標準的なヒット、ダウンロードリンク、離脱リンク、クリックされたカスタムリンク)。
page_event
web.webInteraction.linkClicks.value
number
イメージリクエストで送信されるヒットのタイプ(標準的なヒット、ダウンロードリンク、離脱リンク、クリックされたカスタムリンク)。
page_event_var_1
web.webInteraction.URL
string
リンクトラッキングイメージリクエストでのみ使用される変数。この変数には、クリックされたダウンロードリンク、離脱リンク、またはカスタムリンクの URL が含まれます。
page_event_var_2
web.webInteraction.name
string
リンクトラッキングイメージリクエストでのみ使用される変数。このリストは、リンクのカスタム名をリスト表示します(指定されている場合)。
paid_search
search.isPaid
ブール型
ヒットが有料検索の検出に一致した場合に設定されるフラグ。
ref_type
web.webReferrertype
string
ヒットのリファラルのタイプを表す数値 ID。

列を投稿

Adobe Analytics データフィードでは、post_ プレフィックスが付いた列という概念を使用します。プレフィックスは、処理後のデータを含む列です。 詳しくは、データフィードに関する FAQ を参照してください。

Experience Platform Edge Network(Web SDK、Mobile SDK、Server API)を使用してデータセットに収集されたデータには、post_ フィールドという概念はありません。 その結果、プレフィックス post_ 付けられたデータフィード列と プレフィックスのデータフィード列は、同じ XDM フィールドにマッピ post_ グされます。 例えば、page_urlpost_page_url の両方のデータフィード列が同じ web.webPageDetails.URL XDM フィールドにマッピングされるとします。

データ処理の違いの概要については、Adobe AnalyticsとCustomer Journey Analytics間のデータ処理の比較 ​ を参照してください。

ただし、Experience Platform Data Lake で収集されるデータの post_ プレフィックス列タイプでは、データフィードのユースケースで使用する前に高度な変換が必要です。 クエリでこれらの高度な変換を実行するには、セッション化、アトリビューション、重複排除に 0}Adobe定義の関数を使用する必要があります。 ​これらの関数の使用方法については、​ 例 ​ を参照してください。

参照

他のデータセットからのデータを検索するには、標準の SQL 機能(WHERE 句、INNER JOINOUTER JOIN など)を使用します。

計算

フィールド(列)に対して計算を実行するには、標準の SQL 関数(例:COUNT(*))または Spark SQL の一部である ​ 数学演算子と統計演算子および関数 ​ を使用します。 また、window 関数 ​ は、集計を更新し、順序付きサブセット内の各行の単一の項目を返すサポートを提供します。 これらの関数の使用方法については、​ 例 ​ を参照してください。

ネストされたデータ構造

データセットのベースとなるスキーマには、多くの場合、ネストされたデータ構造など、複雑なデータタイプが含まれています。 前述の identityMap は、ネストされたデータ構造の例です。 identityMap データの例については、以下を参照してください。

{
   "identityMap":{
      "FPID":[
         {
            "id":"55613368189701342632255821452918751312",
            "authenticatedState":"ambiguous"
         }
      ],
      "CRM":[
         {
            "id":"2394509340-30453470347",
            "authenticatedState":"authenticated"
         }
      ]
   }
}

Spark SQL の explode() 関数またはその他の配列関数を使用して ​ ネストされたデータ構造内のデータにアクセスできます。次に例を示します。

select explode(identityMap) from demosys_cja_ee_v1_website_global_v1_1 limit 15;

または、ドット表記を使用して個々の要素を参照することもできます。 例:

select identityMap.ecid from demosys_cja_ee_v1_website_global_v1_1 limit 15;

詳しくは、クエリサービスでのネストされたデータ構造の操作を参照してください。

クエリ:

  • これには、Experience Platform データレイク内のデータセットのデータを使用します。
  • は、Adobe定義関数や Spark SQL の追加機能を利用しています。
  • 同等のAdobe Analytics データフィードと同様の結果が得られます。

詳しくは、

以下に、セッション間でアトリビューションを適切に適用する例と、その方法を示します

  • 過去 90 日間をルックバックとして使用します。

  • セッション化やアトリビューションなどのウィンドウ関数を適用する。

  • ingest_time に基づいて出力を制限します。

    +++
    詳細

    そのためには…

    • 処理ステータステーブル checkpoint_log を使用して、現在の取り込み時間と最後の取り込み時間を追跡します。 詳しくは、​ このガイド ​ を参照してください。
    • システム列の削除を無効にして、_acp_system_metadata.ingestTime を使用できるようにします。
    • 最も内側の SELECT を使用して、使用するフィールドを取得し、セッション化やアトリビューションの計算のためにイベントをルックバック期間に制限します。 例:90 日。
    • 次のレベルの SELECT を使用して、セッション化やアトリビューションウィンドウ関数、その他の計算を適用します。
    • 出力テーブルで INSERT INTO を使用して、ルックバックを前回の処理時間以降に到達したイベントのみに制限します。 これを行うには、処理ステータステーブルに最後に保存された時間に対して _acp_system_metadata.ingestTime フィルタリングします。

    セッション化ウィンドウ関数の例

    code language-sql
    $$ BEGIN
       -- Disable dropping system columns
       set drop_system_columns=false;
    
       -- Initialize variables
       SET @last_updated_timestamp = SELECT CURRENT_TIMESTAMP;
    
       -- Get the last processed batch ingestion time
       SET @from_batch_ingestion_time = SELECT coalesce(last_batch_ingestion_time, 'HEAD')
          FROM checkpoint_log a
          JOIN (
                SELECT MAX(process_timestamp) AS process_timestamp
                FROM checkpoint_log
                WHERE process_name = 'data_feed'
                AND process_status = 'SUCCESSFUL'
          ) b
          ON a.process_timestamp = b.process_timestamp;
    
       -- Get the last batch ingestion time
       SET @to_batch_ingestion_time = SELECT MAX(_acp_system_metadata.ingestTime)
          FROM events_dataset;
    
       -- Sessionize the data and insert into data_feed.
       INSERT INTO data_feed
       SELECT *
       FROM (
          SELECT
                userIdentity,
                timestamp,
                SESS_TIMEOUT(timestamp, 60 * 30) OVER (
                   PARTITION BY userIdentity
                   ORDER BY timestamp
                   ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
                ) AS session_data,
                page_name,
                ingest_time
          FROM (
                SELECT
                   userIdentity,
                   timestamp,
                   web.webPageDetails.name AS page_name,
                   _acp_system_metadata.ingestTime AS ingest_time
                FROM events_dataset
                WHERE timestamp >= current_date - 90
          ) AS a
          ORDER BY userIdentity, timestamp ASC
       ) AS b
       WHERE b.ingest_time >= @from_batch_ingestion_time;
    
       -- Update the checkpoint_log table
       INSERT INTO checkpoint_log
       SELECT
          'data_feed' process_name,
          'SUCCESSFUL' process_status,
          cast(@to_batch_ingestion_time AS string) last_batch_ingestion_time,
          cast(@last_updated_timestamp AS TIMESTAMP) process_timestamp
    END
    $$;
    

    アトリビューションウィンドウ関数の例

    code language-sql
    $$ BEGIN
     SET drop_system_columns=false;
    
    -- Initialize variables
     SET @last_updated_timestamp = SELECT CURRENT_TIMESTAMP;
    
    -- Get the last processed batch ingestion time 1718755872325
     SET @from_batch_ingestion_time =
         SELECT coalesce(last_snapshot_id, 'HEAD')
         FROM checkpoint_log a
         JOIN (
             SELECT MAX(process_timestamp) AS process_timestamp
             FROM checkpoint_log
             WHERE process_name = 'data_feed'
             AND process_status = 'SUCCESSFUL'
         ) b
         ON a.process_timestamp = b.process_timestamp;
    
     -- Get the last batch ingestion time 1718758687865
     SET @to_batch_ingestion_time =
         SELECT MAX(_acp_system_metadata.ingestTime)
         FROM demo_data_trey_mcintyre_midvalues;
    
     -- Sessionize the data and insert into new_sessionized_data
     INSERT INTO new_sessionized_data
     SELECT *
     FROM (
         SELECT
             _id,
             timestamp,
             struct(User_Identity,
             cast(SESS_TIMEOUT(timestamp, 60 * 30) OVER (
                 PARTITION BY User_Identity
                 ORDER BY timestamp
                 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
             ) as string) AS SessionData,
             to_timestamp(from_unixtime(ingest_time/1000, 'yyyy-MM-dd HH:mm:ss')) AS IngestTime,
             PageName,
             first_url,
             first_channel_type
               ) as _demosystem5
         FROM (
             SELECT
                 _id,
                 ENDUSERIDS._EXPERIENCE.MCID.ID as User_Identity,
                 timestamp,
                 web.webPageDetails.name AS PageName,
                attribution_first_touch(timestamp, '', web.webReferrer.url) OVER (PARTITION BY ENDUSERIDS._EXPERIENCE.MCID.ID ORDER BY timestamp ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).value AS first_url,
                attribution_first_touch(timestamp, '',channel.typeAtSource) OVER (PARTITION BY ENDUSERIDS._EXPERIENCE.MCID.ID ORDER BY timestamp ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).value AS first_channel_type,
                 _acp_system_metadata.ingestTime AS ingest_time
             FROM demo_data_trey_mcintyre_midvalues
             WHERE timestamp >= current_date - 90
         )
         ORDER BY User_Identity, timestamp ASC
     )
     WHERE _demosystem5.IngestTime >= to_timestamp(from_unixtime(@from_batch_ingestion_time/1000, 'yyyy-MM-dd HH:mm:ss'));
    
    -- Update the checkpoint_log table
    INSERT INTO checkpoint_log
    SELECT
       'data_feed' as process_name,
       'SUCCESSFUL' as process_status,
       cast(@to_batch_ingestion_time AS string) as last_snapshot_id,
       cast(@last_updated_timestamp AS timestamp) as process_timestamp;
    
    END
    $$;
    

    +++

スケジュール クエリ

クエリをスケジュールして、クエリが実行され、結果が好みの間隔で生成されるようにします。

クエリエディターの使用

クエリエディターを使用してクエリをスケジュールできます。 クエリをスケジュールする場合は、出力データセットを定義します。 詳しくは、​ クエリスケジュール ​ を参照してください。

Query Service API の使用

または、RESTful API を使用してクエリを定義し、クエリのスケジュールを設定することもできます。 詳しくは、Query Service API ガイド ​ を参照してください。
クエリを作成する場合(ctasParameters クエリを作成する )やクエリのスケジュールを作成する場合( スケジュールされたクエリを作成する )は、出力データセットをオプションの ​ プロパティの一部として定義していることを確認します。

データセットの書き出し

クエリを作成し、スケジュールを設定し、結果を確認したら、生のデータセットをクラウドストレージの宛先に書き出すことができます。 この書き出しは、データセット書き出し宛先と呼ばれるExperience Platformの宛先の用語で使用されます。 詳しくは、​ クラウドストレージの宛先へのデータセットの書き出し ​ を参照してください。

次のクラウドストレージの宛先がサポートされています。

EXPERIENCE PLATFORM UI

Experience Platform UI を使用して、出力データセットの書き出しをスケジュールできます。 この節では、関連する手順について説明します。

宛先を選択

出力データセットを書き出すクラウドストレージの宛先を決定したら、​ 宛先を選択 ​ します。 優先クラウドストレージの宛先をまだ設定していない場合は、​ 新しい宛先接続を作成する ​ 必要があります。

宛先の設定の一環として、次のことができます

  • ファイルタイプ(JSON または Parquet)を定義します。
  • 結果のファイルが圧縮されるかどうか、および
  • マニフェストファイルを含めるかどうか。

データセットを選択

宛先を選択したら、次の データセットを選択 手順で、データセットのリストから出力データセットを選択する必要があります。 複数のスケジュールされたクエリを作成し、出力データセットを同じクラウドストレージ宛先に送信する場合、対応する出力データセットを選択できます。 詳しくは ​ データセットの選択 ​ を参照してください。

データセット書き出しのスケジュール設定

最後に、スケジュール設定 手順の一部としてデータセットの書き出しをスケジュールします。 その手順では、スケジュールと、出力データセットの書き出しを増分にするかどうかを定義できます。 詳しくは ​ データセットの書き出しをスケジュール ​ を参照してください。

最終手順

​ 確認 ​ 選択し、正しければ、出力データセットのクラウドストレージ宛先への書き出しを開始します。

データの書き出しが正常に行われたことを ​ 確認 ​ する必要があります。 データセットを書き出す際、Experience Platformは、宛先で定義されたストレージの場所に 1 つまたは複数の .json ファイルまたは .parquet ファイルを作成します。 設定した書き出しスケジュールに従って、新しいファイルがストレージの場所に格納されます。 Experience Platformは、選択された宛先の一部として指定されたストレージの場所にフォルダー構造を作成し、書き出されたファイルを格納します。 folder-name-you-provided/datasetID/exportTime=YYYYMMDDHHMM のパターンに従って、書き出しのたびに新しいフォルダーが作成されます。 デフォルトのファイル名はランダムに生成され、書き出されたファイルの名前は必ず一意になります。

フローサービス API

または、API を使用して出力データセットの書き出しを書き出し、スケジュールすることもできます。 含まれる手順は、Flow Service API を使用したデータセットの書き出し ​ に記載されています。

基本を学ぶ

データセットを書き出すには、​ 必要な権限 ​ があることを確認します。 また、出力データセットの送信先がデータセットの書き出しをサポートしていることを確認します。 次に、API 呼び出しで使用する ​ 必須ヘッダーとオプションヘッダーの値を収集する ​ 必要があります。 また、データセットを書き出す ​ 宛先の接続仕様 ID とフロー仕様 ID を識別する必要もあります。

適格なデータセットの取得

書き出し用に ​ 適格なデータセットのリストを取得 ​ し、GET /connectionSpecs/{id}/configs API を使用して、出力データセットがそのリストに含まれているかどうかを確認できます。

ソース接続を作成

次に、クラウドストレージ宛先に書き出す、出力データセット用に一意の ID を使用して、​ ソース接続を作成 ​ する必要があります。 POST /sourceConnections API を使用します。

宛先に対する認証(ベース接続の作成)

🔗 API を使用して認証を行い、クラウドストレージの宛先に資格情報を安全に保存するには、POST /targetConection ベース接続を作成 ​ する必要があります。

エクスポートパラメーターの指定

次に、もう一度 🔗 API を使用して、出力データセット用に POST /targetConection 書き出しパラメーターを保存する追加のターゲット接続を作成 ​ する必要があります。 これらの書き出しパラメーターには、場所、ファイル形式、圧縮などが含まれます。

データフローの設定

最後に、​ データフローの設定 ​ を行い、POST /flows API を使用して出力データセットがクラウドストレージの宛先に書き出されるようにします。 この手順では、scheduleParams パラメーターを使用して、書き出しのスケジュールを定義できます。

データフローの検証

​ データフローの正常な実行を確認 ​ するには、GET /runs API を使用して、データフロー ID をクエリパラメーターとして指定します。 このデータフロー ID は、データフローを設定したときに返される識別子です。

​ 検証 ​ 成功したデータ書き出し。 データセットを書き出す際、Experience Platformは、宛先で定義されたストレージの場所に 1 つまたは複数の .json ファイルまたは .parquet ファイルを作成します。 設定した書き出しスケジュールに従って、新しいファイルがストレージの場所に格納されます。 Experience Platformは、選択された宛先の一部として指定されたストレージの場所にフォルダー構造を作成し、書き出されたファイルを格納します。 folder-name-you-provided/datasetID/exportTime=YYYYMMDDHHMM のパターンに従って、書き出しのたびに新しいフォルダーが作成されます。 デフォルトのファイル名はランダムに生成され、書き出されたファイルの名前は必ず一意になります。

まとめ

つまり、Adobe Analytics データフィード機能のエミュレートは、クエリサービスを使用してスケジュール済みクエリを設定し、これらのクエリの結果をスケジュール済みデータセット書き出しで使用することを意味します。

IMPORTANT
このユースケースには、2 つのスケジューラーが含まれています。 エミュレートされたデータフィード機能が適切に動作することを保証するには、クエリサービスとデータ書き出しで設定されたスケジュールが干渉しないようにしてください。
recommendation-more-help
080e5213-7aa2-40d6-9dba-18945e892f79