Acceder a datos con Spark en Data Science Espacio de trabajo

NOTE
La Espacio de trabajo de ciencia de datos ya no está disponible para su compra.
Esta documentación está destinada a clientes existentes con derechos previos a Data Science Espacio de trabajo.

El siguiente documento contiene ejemplos sobre cómo acceder a datos con Spark para su uso en Data Science Espacio de trabajo. Para obtener información sobre el acceso a los datos con blocs de notas de JupyterLab, visita la documentación de acceso🔗 a datos de blocs de notas de JupyterLab.

Primeros pasos

El uso Spark requiere optimizaciones de rendimiento que deben agregarse al SparkSessionarchivo . Además, también puede configurar configProperties para que más tarde lea y escriba en conjuntos de datos.

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}

Class Helper {

 /**
   *
   * @param configProperties - Configuration Properties map
   * @param sparkSession     - SparkSession
   * @return                 - DataFrame which is loaded for training
   */

   def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
            // Read the configs
            val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
            val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
            val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
            val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

   }
}

Leer una conjunto de datos

Mientras usas Spark tienes acceso a dos modos de lectura: interactivo y por lotes.

interactivo modo crea una conexión Query Service JDBBC (Java Database Connectivity) y obtiene resultados a través de un JDBC ResultSet normal que se traduce automáticamente a un DataFramearchivo . Este modo funciona de manera similar al método spark.read.jdbc()incorporadoSpark. Este modo está pensado solo para conjuntos de datos pequeños. Si su conjunto de datos supera los 5 millones de filas, se recomienda cambiar al modo por lotes.

Lote modo utiliza Query Serviceel comando COPY de para generar conjuntos de resultados de parquet en una ubicación compartida. Estos archivos Parquet pueden procesarse posteriormente.

A continuación se puede ver un ejemplo de lectura de un conjunto de datos en modo interactivo:

  // Read the configs
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

 val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "interactive")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df
  }

Del mismo modo, a continuación se puede ver un ejemplo de lectura de un conjunto de datos en modo por lotes:

val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "batch")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df

SELECCIONAR columnas en el conjunto de datos

df = df.select("column-a", "column-b").show()

Cláusula DISTINCT

La cláusula DISTINCT le permite obtener todos los valores distintos en un nivel de fila/columna, eliminando todos los valores duplicado de la respuesta.

A continuación se muestra un ejemplo de uso de la distinct() función:

df = df.select("column-a", "column-b").distinct().show()

Cláusula WHERE

El Spark SDK permite dos métodos de filtrado: mediante un expresión SQL o filtrando condiciones.

A continuación se muestra un ejemplo del uso de estas funciones de filtrado:

expresión SQL

df.where("age > 15")

Condiciones de filtrado

df.where("age" > 15 || "name" = "Steve")

Cláusula ORDER BY

La cláusula ORDER BY permite que los resultados recibidos se ordenen por una columna específica en un orden específico (ascendente o de bajada). En el Spark SDK, esto se hace mediante la sort() función.

A continuación se muestra un ejemplo de uso de la sort() función:

df = df.sort($"column1", $"column2".desc)

Cláusula LIMIT

La cláusula LIMIT le permite limitar el número de registros recibidos del conjunto de datos.

A continuación se muestra un ejemplo de uso de la limit() función:

df = df.limit(100)

Escribir a un conjunto de datos

Mediante la configProperties asignación, puede escribir en una conjunto de datos de Experience Platform mediante QSOption.

val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

    df.write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .save()

Pasos siguientes

Adobe Experience Platform Espacio de trabajo de ciencia de datos proporciona un ejemplo de fórmula de Scala (Spark) que usa los ejemplos de código anteriores para leer y escribir datos. Si desea obtener más información sobre cómo usar Spark para acceder a sus datos, consulte el Repositorio de GitHub de Data Science Espacio de trabajo Scala.

recommendation-more-help
cc79fe26-64da-411e-a6b9-5b650f53e4e9