Il seguente documento contiene esempi su come accedere ai dati utilizzando Spark per l’utilizzo in Data Science Workspace. Per informazioni sull'accesso ai dati tramite i notebook JupyterLab, visita la documentazione JupyterLab data access .
L'utilizzo di Spark richiede ottimizzazioni delle prestazioni che devono essere aggiunte a SparkSession
. Inoltre, puoi impostare configProperties
in modo che in seguito sia possibile leggere e scrivere nei set di dati.
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}
Class Helper {
/**
*
* @param configProperties - Configuration Properties map
* @param sparkSession - SparkSession
* @return - DataFrame which is loaded for training
*/
def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
// Read the configs
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
}
}
Utilizzando Spark è possibile accedere a due modalità di lettura: interattivo e batch.
La modalità interattiva crea una connessione Java Database Connectivity (JDBC) a Query Service e ottiene risultati tramite un JDBC ResultSet
regolare che viene convertito automaticamente in un DataFrame
. Questa modalità funziona in modo simile al metodo incorporato Spark spark.read.jdbc()
. Questa modalità è destinata solo ai set di dati di piccole dimensioni. Se il set di dati supera i 5 milioni di righe, è consigliabile passare alla modalità batch.
La modalità batch utilizza il comando COPY di Query Service per generare set di risultati Parquet in una posizione condivisa. Questi file Parquet possono quindi essere ulteriormente elaborati.
Di seguito è riportato un esempio di lettura di un set di dati in modalità interattiva:
// Read the configs
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "interactive")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
}
Analogamente, un esempio di lettura di un set di dati in modalità batch può essere visto di seguito:
val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "batch")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
df = df.select("column-a", "column-b").show()
La clausola DISTINCT consente di recuperare tutti i valori distinti a livello di riga/colonna, rimuovendo tutti i valori duplicati dalla risposta.
Di seguito è riportato un esempio di utilizzo della funzione distinct()
:
df = df.select("column-a", "column-b").distinct().show()
L'SDK Spark consente due metodi di filtraggio: Utilizzo di un'espressione SQL o filtrando tra le condizioni.
Di seguito è riportato un esempio di utilizzo di queste funzioni di filtro:
df.where("age > 15")
df.where("age" > 15 || "name" = "Steve")
La clausola ORDER BY consente di ordinare i risultati ricevuti in base a una colonna specifica in un ordine specifico (crescente o decrescente). Nell’SDK Spark, questo viene fatto utilizzando la funzione sort()
.
Di seguito è riportato un esempio di utilizzo della funzione sort()
:
df = df.sort($"column1", $"column2".desc)
La clausola LIMIT ti consente di limitare il numero di record ricevuti dal set di dati.
Di seguito è riportato un esempio di utilizzo della funzione limit()
:
df = df.limit(100)
Utilizzando la mappatura configProperties
, puoi scrivere in un set di dati in un Experience Platform utilizzando QSOption
.
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
df.write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.option(QSOption.sandboxName, sandboxName)
.save()
Adobe Experience Platform Data Science Workspace fornisce un esempio di ricetta Scala (Spark) che utilizza gli esempi di codice riportati sopra per leggere e scrivere i dati. Per ulteriori informazioni su come utilizzare Spark per accedere ai dati, consulta l’ Archivio Scala GitHub di Data Science Workspace.