Åtkomst av data med Spark in Data Science Workspace

Följande dokument innehåller exempel på hur man får åtkomst till data med Spark för användning i Data Science Workspace. Information om hur du får åtkomst till data med JupyterLab-anteckningsböcker finns i dokumentationen för JupyterLab-anteckningsböcker för dataåtkomst.

Komma igång

Om du använder Spark krävs prestandaoptimeringar som måste läggas till i SparkSession. Du kan även konfigurera configProperties för att senare läsa och skriva till datauppsättningar.

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}

Class Helper {

 /**
   *
   * @param configProperties - Configuration Properties map
   * @param sparkSession     - SparkSession
   * @return                 - DataFrame which is loaded for training
   */

   def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
            // Read the configs
            val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
            val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
            val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
            val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

   }
}

Läsa en datauppsättning

När du använder Spark har du tillgång till två läslägen: interaktiv och batch.

I det interaktiva läget skapas en Java Database Connectivity-anslutning (JDBC) till Query Service och resultat hämtas via en vanlig JDBC ResultSet som automatiskt översätts till en DataFrame. Det här läget fungerar ungefär som den inbyggda Spark-metoden spark.read.jdbc(). Det här läget är endast avsett för små datauppsättningar. Om datauppsättningen överstiger 5 miljoner rader föreslår vi att du byter till gruppläge.

I gruppläget används Query Services COPY-kommando för att generera Parquet-resultatuppsättningar på en delad plats. Dessa Parquet-filer kan sedan bearbetas ytterligare.

Ett exempel på hur du läser en datauppsättning i interaktivt läge visas nedan:

  // Read the configs
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

 val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "interactive")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df
  }

Ett exempel på hur du läser en datauppsättning i batchläge visas nedan:

val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "batch")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df

SELECT-kolumner från datauppsättningen

df = df.select("column-a", "column-b").show()

DISTINCT-sats

Med DISTINCT-satsen kan du hämta alla distinkta värden på rad-/kolumnnivå och ta bort alla dubblettvärden från svaret.

Ett exempel på hur funktionen distinct() används visas nedan:

df = df.select("column-a", "column-b").distinct().show()

WHERE-sats

Spark SDK tillåter två filtreringsmetoder: Använda ett SQL-uttryck eller filtrera genom villkor.

Ett exempel på hur du använder dessa filtreringsfunktioner finns nedan:

SQL-uttryck

df.where("age > 15")

Filtreringsvillkor

df.where("age" > 15 || "name" = "Steve")

ORDER BY-instruktion

ORDER BY-satsen tillåter att mottagna resultat sorteras efter en angiven kolumn i en viss ordning (stigande eller fallande). I SDK:t Spark görs detta med funktionen sort().

Ett exempel på hur funktionen sort() används visas nedan:

df = df.sort($"column1", $"column2".desc)

LIMIT-klausul

Med LIMIT-satsen kan du begränsa antalet poster som tas emot från datauppsättningen.

Ett exempel på hur funktionen limit() används visas nedan:

df = df.limit(100)

Skriva till en datauppsättning

Med hjälp av din configProperties-mappning kan du skriva till en datauppsättning i Experience Platform med QSOption.

val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

    df.write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .save()

Nästa steg

Adobe Experience Platform Data Science Workspace tillhandahåller ett Scala-recept (Spark) som använder ovanstående kodexempel för att läsa och skriva data. Om du vill veta mer om hur du använder Spark för att få tillgång till dina data kan du läsa Data Science Workspace Scala GitHub Repository.

recommendation-more-help
cc79fe26-64da-411e-a6b9-5b650f53e4e9