Accès aux données à l’aide de Spark dans Data Science Workspace

Le document suivant contient des exemples d’accès aux données à l’aide de Spark pour une utilisation dans Data Science Workspace. Pour plus d’informations sur l’accès aux données à l’aide des notebooks JupyterLab, consultez la documentation Accès aux données des notebooks JupyterLab .

Prise en main

L’utilisation de Spark nécessite des optimisations de performances qui doivent être ajoutées à SparkSession. En outre, vous pouvez également configurer configProperties pour que les jeux de données puissent être lus et écrits ultérieurement.

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}

Class Helper {

 /**
   *
   * @param configProperties - Configuration Properties map
   * @param sparkSession     - SparkSession
   * @return                 - DataFrame which is loaded for training
   */

   def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
            // Read the configs
            val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
            val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
            val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
            val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

   }
}

Lecture d’un jeu de données

Lorsque vous utilisez Spark, vous avez accès à deux modes de lecture : interactive et par lots.

Le mode interactif crée une connexion JDBC (Java Database Connectivity) à Query Service et obtient des résultats par le biais d’un JDBC normal ResultSet qui est automatiquement traduit en DataFrame. Ce mode fonctionne de la même manière que la méthode Spark intégrée spark.read.jdbc(). Ce mode est destiné uniquement aux petits jeux de données. Si votre jeu de données dépasse 5 millions de lignes, il est conseillé de passer en mode batch.

Le mode batch utilise la commande COPY de Query Service pour générer les ensembles de résultats Parquet dans un emplacement partagé. Ces fichiers Parquet peuvent ensuite être traités plus en détail.

Vous trouverez ci-dessous un exemple de lecture d’un jeu de données en mode interactif :

  // Read the configs
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

 val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "interactive")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df
  }

De même, vous trouverez ci-dessous un exemple de lecture d’un jeu de données en mode batch :

val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "batch")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df

SÉLECTIONNER des colonnes du jeu de données

df = df.select("column-a", "column-b").show()

Clause DISTINCT

La clause DISTINCT vous permet de récupérer toutes les valeurs distinctes au niveau d’une ligne/colonne, supprimant toutes les valeurs en double de la réponse.

Vous trouverez ci-dessous un exemple d’utilisation de la fonction distinct() :

df = df.select("column-a", "column-b").distinct().show()

Clause WHERE

Le SDK Spark permet deux méthodes de filtrage : Utilisation d’une expression SQL ou filtrage par conditions.

Vous trouverez ci-dessous un exemple d’utilisation de ces fonctions de filtrage :

Expression SQL

df.where("age > 15")

Critères de filtrage

df.where("age" > 15 || "name" = "Steve")

Clause ORDER BY

La clause ORDER BY permet de trier les résultats reçus par une colonne spécifiée dans un ordre spécifique (croissant ou décroissant). Dans le SDK Spark, cette opération s’effectue à l’aide de la fonction sort() .

Vous trouverez ci-dessous un exemple d’utilisation de la fonction sort() :

df = df.sort($"column1", $"column2".desc)

Clause LIMIT

La clause LIMIT vous permet de limiter le nombre d’enregistrements reçus du jeu de données.

Vous trouverez ci-dessous un exemple d’utilisation de la fonction limit() :

df = df.limit(100)

Écriture dans un jeu de données

À l’aide de votre mappage configProperties, vous pouvez écrire dans un jeu de données en Experience Platform à l’aide de QSOption.

val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString 

    df.write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .save()

Étapes suivantes

Adobe Experience Platform Data Science Workspace fournit un exemple de recette Scala (Spark) qui utilise les exemples de code ci-dessus pour lire et écrire des données. Si vous souhaitez en savoir plus sur l’utilisation de Spark pour accéder à vos données, consultez le référentiel Scala GitHub de Data Science Workspace.

Sur cette page