Åtkomst till data med Spark in Data Science Workspace

Följande dokument innehåller exempel på hur du får åtkomst till data med Spark för användning i Data Science Workspace. Information om hur du får åtkomst till data med JupyterLab-anteckningsböcker finns på Dataåtkomst för JupyterLab-anteckningsböcker dokumentation.

Komma igång

Använda Spark kräver prestandaoptimeringar som behöver läggas till i SparkSession. Dessutom kan du konfigurera configProperties för att senare läsa och skriva till datauppsättningar.

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}

Class Helper {

 /**
   *
   * @param configProperties - Configuration Properties map
   * @param sparkSession     - SparkSession
   * @return                 - DataFrame which is loaded for training
   */

   def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
            // Read the configs
            val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
            val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
            val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
            val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

   }
}

Läsa en datauppsättning

När du använder Spark har du tillgång till två läslägen: interaktivt och gruppvis.

I det interaktiva läget skapas en Java Database Connectivity-anslutning (JDBC) till Query Service och får resultat via en vanlig JDBC ResultSet som automatiskt översätts till DataFrame. Det här läget fungerar ungefär som det inbyggda Spark method spark.read.jdbc(). Det här läget är endast avsett för små datauppsättningar. Om datauppsättningen överstiger 5 miljoner rader föreslår vi att du byter till gruppläge.

Batchläget använder Query Service's COPY command to generate Parquet result sets in a shared location. Dessa Parquet-filer kan sedan bearbetas ytterligare.

Ett exempel på hur du läser en datauppsättning i interaktivt läge visas nedan:

  // Read the configs
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

 val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "interactive")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df
  }

Ett exempel på hur du läser en datauppsättning i batchläge visas nedan:

val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "batch")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df

SELECT-kolumner från datauppsättningen

df = df.select("column-a", "column-b").show()

DISTINCT-sats

Med DISTINCT-satsen kan du hämta alla distinkta värden på rad-/kolumnnivå och ta bort alla dubblettvärden från svaret.

Ett exempel på hur du använder distinct() finns nedan:

df = df.select("column-a", "column-b").distinct().show()

WHERE-sats

The Spark SDK tillåter två filtreringsmetoder: Använda ett SQL-uttryck eller filtrera genom villkor.

Ett exempel på hur du använder dessa filtreringsfunktioner finns nedan:

SQL-uttryck

df.where("age > 15")

Filtreringsvillkor

df.where("age" > 15 || "name" = "Steve")

ORDER BY-instruktion

ORDER BY-satsen tillåter att mottagna resultat sorteras efter en angiven kolumn i en viss ordning (stigande eller fallande). I Spark SDK, detta görs med sort() funktion.

Ett exempel på hur du använder sort() finns nedan:

df = df.sort($"column1", $"column2".desc)

LIMIT-klausul

Med LIMIT-satsen kan du begränsa antalet poster som tas emot från datauppsättningen.

Ett exempel på hur du använder limit() finns nedan:

df = df.limit(100)

Skriva till en datauppsättning

Med configProperties kan du skriva till en datauppsättning i Experience Platform med QSOption.

val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

    df.write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .save()

Nästa steg

Adobe Experience Platform Data Science Workspace innehåller ett Scala-recept (Spark) som använder ovanstående kodexempel för att läsa och skriva data. Om du vill veta mer om hur du använder Spark för att få tillgång till dina data kan du läsa Data Science Workspace Scala GitHub-databas.

recommendation-more-help
cc79fe26-64da-411e-a6b9-5b650f53e4e9