El siguiente documento contiene ejemplos de cómo acceder a los datos mediante Spark para utilizarlos en Área de trabajo de ciencias de datos. Para obtener información sobre el acceso a los datos mediante los blocs de notas de JupyterLab, visite la documentación de acceso a datos de los blocs de notas de JupyterLab.
El uso de Spark requiere optimizaciones de performance que deben agregarse al SparkSession
. Además, también puede configurar configProperties
para que más adelante pueda leer y escribir en conjuntos de datos.
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}
Class Helper {
/**
*
* @param configProperties - Configuration Properties map
* @param sparkSession - SparkSession
* @return - DataFrame which is loaded for training
*/
def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
// Read the configs
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
}
}
Mientras utiliza Spark tiene acceso a dos modos de lectura: interactivo y por lotes.
El modo interactivo crea una conexión de Conectividad de base de datos Java (JDBC) a Query Service y obtiene resultados a través de un JDBC ResultSet
regular que se traduce automáticamente a un DataFrame
. Este modo funciona de manera similar al método Spark incorporado spark.read.jdbc()
. Este modo solo está diseñado para conjuntos de datos pequeños. Si el conjunto de datos supera los 5 millones de filas, se sugiere cambiar al modo por lotes.
El modo Lote utiliza el comando COPY de Query Service para generar conjuntos de resultados de parquet en una ubicación compartida. Estos archivos de parquet pueden procesarse posteriormente.
A continuación se muestra un ejemplo de lectura de un conjunto de datos en modo interactivo:
// Read the configs
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "interactive")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
}
De manera similar, a continuación se muestra un ejemplo de lectura de un conjunto de datos en modo por lotes:
val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "batch")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
df = df.select("column-a", "column-b").show()
La cláusula DISTINCT permite recuperar todos los valores distintos en un nivel de fila/columna, eliminando todos los valores de duplicado de la respuesta.
A continuación se muestra un ejemplo del uso de la función distinct()
:
df = df.select("column-a", "column-b").distinct().show()
El SDK Spark permite dos métodos de filtrado: Uso de una expresión SQL o filtrado mediante condiciones.
A continuación se muestra un ejemplo del uso de estas funciones de filtrado:
df.where("age > 15")
df.where("age" > 15 || "name" = "Steve")
La cláusula ORDER BY permite ordenar los resultados recibidos por una columna específica en un orden específico (ascendente o descendente). En el SDK Spark, esto se realiza mediante la función sort()
.
A continuación se muestra un ejemplo del uso de la función sort()
:
df = df.sort($"column1", $"column2".desc)
La cláusula LIMIT permite limitar el número de registros recibidos del conjunto de datos.
A continuación se muestra un ejemplo del uso de la función limit()
:
df = df.limit(100)
Al utilizar la asignación configProperties
, puede escribir en un conjunto de datos en el Experience Platform mediante QSOption
.
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
df.write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.option(QSOption.sandboxName, sandboxName)
.save()
Adobe Experience Platform Data Science Workspace proporciona un ejemplo de fórmula Scala (Spark) que utiliza las muestras de código anteriores para leer y escribir datos. Si desea obtener más información sobre cómo utilizar Spark para acceder a sus datos, consulte el Repositorio de GitHub de Data Science Workspace.