Komma åt data med Spark i Data Science Workspace

NOTE
Data Science Workspace finns inte längre att köpa.
Denna dokumentation är avsedd för befintliga kunder med tidigare tillstånd till Data Science Workspace.

Följande dokument innehåller exempel på hur man får åtkomst till data med Spark för användning i Data Science Workspace. Information om hur du får åtkomst till data med JupyterLab-anteckningsböcker finns i dokumentationen för JupyterLab-anteckningsböcker för dataåtkomst.

Komma igång

Om du använder Spark krävs prestandaoptimeringar som måste läggas till i SparkSession. Du kan även konfigurera configProperties för att senare läsa och skriva till datauppsättningar.

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}

Class Helper {

 /**
   *
   * @param configProperties - Configuration Properties map
   * @param sparkSession     - SparkSession
   * @return                 - DataFrame which is loaded for training
   */

   def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
            // Read the configs
            val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
            val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
            val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
            val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

   }
}

Läsa en datauppsättning

När du använder Spark har du tillgång till två läslägen: interaktiv och batch.

Interaktivt läge skapar en Java Database Connectivity-anslutning (JDBC) till Query Service och får resultat via en vanlig JDBC ResultSet som automatiskt översätts till en DataFrame. Det här läget fungerar på samma sätt som den inbyggda Spark-metoden spark.read.jdbc(). Det här läget är endast avsett för små datauppsättningar. Om din datauppsättning överskrider 5 miljoner rader föreslår vi att du byter till batchläge.

I gruppläget används Query Services COPY-kommando för att generera Parquet-resultatuppsättningar på en delad plats. Dessa Parquet-filer kan sedan bearbetas ytterligare.

Ett exempel på hur du läser en datauppsättning i interaktivt läge visas nedan:

  // Read the configs
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

 val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "interactive")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df
  }

Ett exempel på hur du läser en datauppsättning i batchläge visas nedan:

val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "batch")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df

SELECT-kolumner från datauppsättningen

df = df.select("column-a", "column-b").show()

DISTINCT-sats

Med DISTINCT-satsen kan du hämta alla distinkta värden på rad-/kolumnnivå och ta bort alla dubblettvärden från svaret.

Ett exempel på hur funktionen distinct() används visas nedan:

df = df.select("column-a", "column-b").distinct().show()

WHERE-sats

I SDK:et Spark finns två filtreringsmetoder: Använd ett SQL-uttryck eller filtrera genom villkor.

Ett exempel på hur du använder dessa filtreringsfunktioner visas nedan:

SQL-uttryck

df.where("age > 15")

Filtreringsvillkor

df.where("age" > 15 || "name" = "Steve")

ORDER BY-instruktion

ORDER BY-satsen tillåter att mottagna resultat sorteras efter en angiven kolumn i en viss ordning (stigande eller fallande). I SDK:t Spark görs detta med funktionen sort().

Ett exempel på hur funktionen sort() används visas nedan:

df = df.sort($"column1", $"column2".desc)

LIMIT-klausul

Med LIMIT-satsen kan du begränsa antalet poster som tas emot från datauppsättningen.

Ett exempel på hur funktionen limit() används visas nedan:

df = df.limit(100)

Skriva till en datauppsättning

Med mappningen configProperties kan du skriva till en datauppsättning i Experience Platform med QSOption.

val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

    df.write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .save()

Nästa steg

Adobe Experience Platform Data Science Workspace tillhandahåller ett Scala-recept (Spark) som använder ovanstående kodexempel för att läsa och skriva data. Om du vill veta mer om hur du använder Spark för att få tillgång till dina data kan du läsa Data Science Workspace Scala GitHub Repository.

recommendation-more-help
cc79fe26-64da-411e-a6b9-5b650f53e4e9