次のドキュメントには、Spark を使用して Data Science Workspace でデータにアクセスし、使用する方法の例が含まれています。 JupyterLab ノートブックを使用したデータへのアクセスについては、 JupyterLab ノートブックのデータアクセス ドキュメント。
使用 Spark には、 SparkSession
. また、 configProperties
後でデータセットに読み書きする場合に使用します。
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}
Class Helper {
/**
*
* @param configProperties - Configuration Properties map
* @param sparkSession - SparkSession
* @return - DataFrame which is loaded for training
*/
def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
// Read the configs
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
}
}
Spark を使用する場合は、次の 2 つの読み取りモードにアクセスできます。インタラクティブおよびバッチ。
インタラクティブモードは、Java Database Connectivity(JDBC) 接続を Query Service とは、通常の JDBC を通じて結果を取得します。 ResultSet
が DataFrame
. このモードは、組み込みの Spark メソッド spark.read.jdbc()
. このモードは、小さなデータセットにのみ使用します。 データセットの行数が 500 万行を超える場合は、バッチモードに置き換えることをお勧めします。
バッチモードでは Query Serviceの COPY コマンドを使用して、共有場所に Parquet 結果セットを生成します。 これらの Parquet ファイルは、後で処理できます。
インタラクティブモードでのデータセットの読み取りの例を次に示します。
// Read the configs
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "interactive")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
}
同様に、バッチモードでのデータセットの読み取りの例を次に示します。
val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "batch")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
df = df.select("column-a", "column-b").show()
DISTINCT 句を使用すると、行/列レベルですべてのユニーク値を取得し、応答からすべての重複値を削除できます。
の使用例 distinct()
関数は次のように表示されます。
df = df.select("column-a", "column-b").distinct().show()
この Spark SDK では、次の 2 つのフィルタリング方法を使用できます。SQL 式を使用するか、条件をフィルタリングして使用します。
これらのフィルタリング関数の使用例を以下に示します。
df.where("age > 15")
df.where("age" > 15 || "name" = "Steve")
ORDER BY 句を使用すると、受け取った結果を特定の順序(昇順または降順)で指定した列で並べ替えることができます。 内 Spark SDK の場合、これは sort()
関数に置き換えます。
の使用例 sort()
関数は次のように表示されます。
df = df.sort($"column1", $"column2".desc)
LIMIT 句を使用すると、データセットから受け取るレコードの数を制限できます。
の使用例 limit()
関数は次のように表示されます。
df = df.limit(100)
を使用して、 configProperties
マッピングを使用する場合は、次を使用してExperience Platform内のデータセットに書き込むことができます QSOption
.
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
df.write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.option(QSOption.sandboxName, sandboxName)
.save()
Adobe Experience Platform Data Science Workspace には、上記のコードサンプルを使用してデータの読み取りと書き込みをおこなう Scala(Spark) レシピのサンプルが用意されています。 Spark を使用してデータにアクセスする方法の詳細については、 Data Science Workspace Scala GitHub リポジトリ.