El siguiente documento contiene ejemplos sobre cómo acceder a datos mediante Spark para utilizarlos en Data Science Workspace. Para obtener información sobre el acceso a los datos mediante los blocs de notas de JupyterLab, visite Acceso a los datos de los portátiles JupyterLab documentación.
Uso Spark requiere optimizaciones de rendimiento que se deben agregar al SparkSession
. Además, también puede configurar configProperties
para leer y escribir más tarde en conjuntos de datos.
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}
Class Helper {
/**
*
* @param configProperties - Configuration Properties map
* @param sparkSession - SparkSession
* @return - DataFrame which is loaded for training
*/
def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
// Read the configs
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
}
}
Mientras utiliza Spark tiene acceso a dos modos de lectura: interactivo y por lotes.
El modo interactivo crea una conexión de Java Database Connectivity (JDBC) a Query Service y obtiene resultados a través de un JDBC normal ResultSet
que se traduce automáticamente a un DataFrame
. Este modo funciona de forma similar al integrado Spark method spark.read.jdbc()
. Este modo solo está diseñado para conjuntos de datos pequeños. Si el conjunto de datos supera los 5 millones de filas, se sugiere que cambie al modo por lotes.
El modo por lotes utiliza Query ServiceEl comando COPY de para generar conjuntos de resultados de parquet en una ubicación compartida. Estos archivos de parquet se pueden seguir procesando.
A continuación se puede ver un ejemplo de lectura de un conjunto de datos en modo interactivo:
// Read the configs
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "interactive")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
}
Del mismo modo, a continuación se puede ver un ejemplo de lectura de un conjunto de datos en modo por lotes:
val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "batch")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
df = df.select("column-a", "column-b").show()
La cláusula DISTINCT permite recuperar todos los valores distintos a nivel de fila/columna, eliminando todos los valores duplicados de la respuesta.
Un ejemplo de uso de la variable distinct()
a continuación:
df = df.select("column-a", "column-b").distinct().show()
La variable Spark El SDK admite dos métodos de filtrado: Uso de una expresión SQL o filtrado mediante condiciones.
A continuación se puede ver un ejemplo del uso de estas funciones de filtrado:
df.where("age > 15")
df.where("age" > 15 || "name" = "Steve")
La cláusula ORDER BY permite ordenar los resultados recibidos por una columna especificada en un orden específico (ascendente o descendente). En el Spark SDK, esto se realiza utilizando la variable sort()
función.
Un ejemplo de uso de la variable sort()
a continuación:
df = df.sort($"column1", $"column2".desc)
La cláusula LIMIT permite limitar el número de registros recibidos del conjunto de datos.
Un ejemplo de uso de la variable limit()
a continuación:
df = df.limit(100)
Uso de configProperties
asignación, puede escribir en un conjunto de datos en Experience Platform mediante QSOption
.
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
df.write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.option(QSOption.sandboxName, sandboxName)
.save()
Adobe Experience Platform Data Science Workspace proporciona un ejemplo de fórmula Scala (Spark) que utiliza los ejemplos de código anteriores para leer y escribir datos. Si desea obtener más información sobre cómo utilizar Spark para acceder a sus datos, consulte la Repositorio de Data Science Workspace Scala GitHub.