データサイエンスWorkspaceでの Spark を使用したデータへのアクセス

次のドキュメントでは、Data Science Workspaceで使用する Spark を使用してデータにアクセスする方法の例を示します。 JupyterLab ノートブックを使用したデータへのアクセスについて詳しくは、JupyterLab ノートブックのデータアクセスのドキュメントを参照してください。

はじめに

Spark を使用するには、SparkSession に追加する必要があるパフォーマンスの最適化が必要です。 また、データセットに対する読み取りと書き込みを後で行うように configProperties を設定することもできます。

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}

Class Helper {

 /**
   *
   * @param configProperties - Configuration Properties map
   * @param sparkSession     - SparkSession
   * @return                 - DataFrame which is loaded for training
   */

   def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
            // Read the configs
            val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
            val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
            val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
            val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

   }
}

データセットの読み取り

Spark を使用する際は、インタラクティブとバッチの 2 つの読み取りモードにアクセスできます。

インタラクティブモードは、Query Service への Java Database Connectivity (JDBC)接続を作成し、DataFrame に自動変換される通常の JDBC ResultSet を通じて結果を取得します。 このモードは、組み込みの Spark メソッド spark.read.jdbc() と同様に機能します。 このモードは、小規模なデータセットのみを対象としています。 データセットが 500 万行を超える場合は、バッチモードにスワップすることをお勧めします。

バッチモードでは、Query Service の COPY コマンドを使用して、共有場所に Parquet 結果セットを生成します。 その後、これらの Parquet ファイルをさらに処理できます。

インタラクティブモードでのデータセットの読み取り例を以下に示します。

  // Read the configs
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

 val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "interactive")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df
  }

同様に、バッチモードでデータセットを読み取る例を次に示します。

val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "batch")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df

データセットから列を選択

df = df.select("column-a", "column-b").show()

DISTINCT 句

DISTINCT 句を使用すると、行/列レベルでユニークな値をすべて取得し、応答から重複する値をすべて削除できます。

distinct() 関数の使用例を次に示します。

df = df.select("column-a", "column-b").distinct().show()

WHERE 句

Spark SDK でフィルタリングできる方法は、SQL 式を使用する方法と、条件を使用したフィルタリング方法の 2 つです。

これらのフィルター関数の使用例を次に示します。

SQL 式

df.where("age > 15")

フィルター条件

df.where("age" > 15 || "name" = "Steve")

ORDER BY 句

ORDER BY 句を使用すると、指定した列を指定した順序(昇順または降順)で並べ替えて、受信した結果を表示できます。 Spark SDK では、sort() 関数を使用してこれを行います。

sort() 関数の使用例を次に示します。

df = df.sort($"column1", $"column2".desc)

LIMIT 句

LIMIT 句を使用すると、データセットから受信するレコードの数を制限できます。

limit() 関数の使用例を次に示します。

df = df.limit(100)

データセットへの書き込み

configProperties マッピングを使用すると、QSOption を使用してExperience Platformのデータセットに書き込むことができます。

val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

    df.write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .save()

次の手順

Adobe Experience Platform Data Science Workspaceは、上記のコードサンプルを使用してデータの読み取りと書き込みを行う Scala (Spark)レシピサンプルを提供します。 Spark を使用してデータにアクセスする方法について詳しくは、 データサイエンス Workspace Scala GitHub リポジトリを参照してください。

recommendation-more-help
cc79fe26-64da-411e-a6b9-5b650f53e4e9