Data Science WorkspaceのSparkを使用したデータへのアクセス

次のドキュメントには、Data Science Workspaceで使用するSparkを使用してデータにアクセスする方法の例が含まれています。 JupterLabノートブックを使用したデータへのアクセスについては、JupyterLabノートブックデータアクセスのドキュメントを参照してください。

はじめに

Sparkを使用するには、SparkSessionに追加する必要のあるパフォーマンスの最適化が必要です。 また、後でデータセットに対して読み書きを行うconfigPropertiesを設定することもできます。

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}

Class Helper {

 /**
   *
   * @param configProperties - Configuration Properties map
   * @param sparkSession     - SparkSession
   * @return                 - DataFrame which is loaded for training
   */

   def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
            // Read the configs
            val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
            val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
            val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
            val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

   }
}

データセットの読み取り

Sparkを使用する場合は、次の2つの読み取りモードにアクセスできます。インタラクティブとバッチ。

インタラクティブモードは、Query ServiceへのJava Database Connectivity(JDBC)接続を作成し、通常のJDBC ResultSetを介して結果を取得します。この結果は、DataFrameに自動的に変換されます。 このモードは、組み込みのSparkメソッドspark.read.jdbc()と同様に動作します。 このモードは、小さなデータセットに対してのみ使用できます。 データセットの行数が500万行を超える場合は、バッチモードに切り替えることをお勧めします。

バッチモードは、Query ServiceのCOPYコマンドを使用して、共有場所にParket結果セットを生成します。 これらのパーケファイルは、後でさらに処理できます。

インタラクティブモードでのデータセットの読み取りの例を次に示します。

  // Read the configs
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

 val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "interactive")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df
  }

同様に、バッチモードでデータセットを読み取る例を次に示します。

val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "batch")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df

データセットからSELECT列を

df = df.select("column-a", "column-b").show()

DISTINCT句

DISTINCT句を使用すると、行/列レベルですべての個別の値を取得し、応答からすべての重複値を削除できます。

distinct()関数の使用例を次に示します。

df = df.select("column-a", "column-b").distinct().show()

WHERE句

Spark SDKでは、次の2つのフィルタリング方法が可能です。SQL式を使用するか、条件をフィルタリングして使用します。

これらのフィルター機能の使用例を次に示します。

SQL式

df.where("age > 15")

フィルター条件

df.where("age" > 15 || "name" = "Steve")

ORDER BY句

ORDER BY句を使用すると、受け取った結果を、指定した列で特定の順序(昇順または降順)で並べ替えることができます。 Spark SDKでは、sort()関数を使用して行います。

sort()関数の使用例を次に示します。

df = df.sort($"column1", $"column2".desc)

LIMIT句

LIMIT句を使用すると、データセットから受け取るレコードの数を制限できます。

limit()関数の使用例を次に示します。

df = df.limit(100)

データセットへの書き込み

configPropertiesマッピングを使用して、QSOptionを使用してExperience Platform内のデータセットに書き込むことができます。

val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString 

    df.write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .save()

次の手順

Adobe Experience Platformデータサイエンスワークスペースには、上記のコードサンプルを使用してデータの読み取りと書き込みを行うScala(Spark)レシピサンプルが用意されています。 Sparkを使用してデータにアクセスする方法の詳細については、Data Science Workspace Scala GitHub Repositoryを参照してください。

このページ

Adobe Maker Awards Banner

Time to shine!

Apply now for the 2021 Adobe Experience Maker Awards.

Apply now
Adobe Maker Awards Banner

Time to shine!

Apply now for the 2021 Adobe Experience Maker Awards.

Apply now