Il seguente documento contiene esempi su come accedere ai dati utilizzando Spark per l’utilizzo in Data Science Workspace. Per informazioni sull’accesso ai dati tramite i notebook JupyterLab, visita il Accesso ai dati dei notebook JupyterLab documentazione.
Utilizzo di Spark richiede ottimizzazioni delle prestazioni che devono essere aggiunte al SparkSession
. Inoltre, puoi anche impostare configProperties
per poter poi essere letti e scritti nei set di dati.
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}
Class Helper {
/**
*
* @param configProperties - Configuration Properties map
* @param sparkSession - SparkSession
* @return - DataFrame which is loaded for training
*/
def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
// Read the configs
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
}
}
Durante l’utilizzo di Spark è possibile accedere a due modalità di lettura: interattiva e batch.
La modalità interattiva crea una connessione JDBC (Java Database Connectivity) a Query Service e ottiene risultati tramite un JDBC regolare ResultSet
che viene tradotto automaticamente in una DataFrame
. Questa modalità funziona in modo simile al Spark metodo spark.read.jdbc()
. Questa modalità è destinata solo ai set di dati di piccole dimensioni. Se il set di dati supera i 5 milioni di righe, si consiglia di passare alla modalità batch.
La modalità batch utilizza Query Servicecomando COPY di per generare set di risultati Parquet in una posizione condivisa. Questi file di Parquet possono quindi essere ulteriormente elaborati.
Di seguito è riportato un esempio di lettura di un set di dati in modalità interattiva:
// Read the configs
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "interactive")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
}
Analogamente, di seguito è riportato un esempio di lettura di un set di dati in modalità batch:
val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "batch")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
df = df.select("column-a", "column-b").show()
La clausola DISTINCT consente di recuperare tutti i valori distinti a livello di riga/colonna, rimuovendo tutti i valori duplicati dalla risposta.
Un esempio di utilizzo di distinct()
di seguito:
df = df.select("column-a", "column-b").distinct().show()
Il Spark SDK consente due metodi per filtrare: utilizzando un’espressione SQL o filtrando attraverso le condizioni.
Di seguito è riportato un esempio di utilizzo di queste funzioni di filtro:
df.where("age > 15")
df.where("age" > 15 || "name" = "Steve")
La clausola ORDER BY consente di ordinare i risultati ricevuti in base a una colonna specificata in un ordine specifico (crescente o decrescente). In Spark SDK, questo viene effettuato utilizzando sort()
funzione.
Un esempio di utilizzo di sort()
di seguito:
df = df.sort($"column1", $"column2".desc)
La clausola LIMIT consente di limitare il numero di record ricevuti dal set di dati.
Un esempio di utilizzo di limit()
di seguito:
df = df.limit(100)
Utilizzo di configProperties
mappatura, puoi scrivere su un set di dati in Experience Platform utilizzando QSOption
.
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
df.write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.option(QSOption.sandboxName, sandboxName)
.save()
Adobe Experience Platform Data Science Workspace fornisce un esempio di ricetta Scala (Spark) che utilizza gli esempi di codice precedenti per leggere e scrivere dati. Per ulteriori informazioni su come utilizzare Spark per accedere ai dati, consulta la sezione Archivio GitHub Scala di Data Science Workspace.