Reading a dataset

While using Spark you have access to two modes of reading: interactive and batch.

Interactive mode creates a Java Database Connectivity (JDBC) connection to Query Service and gets results through a regular JDBC ResultSet that is automatically translated to a DataFrame. This mode works similarly to the built-in Spark method spark.read.jdbc(). This mode is meant only for small datasets. If your dataset exceeds 5 million rows, it is suggested you swap to batch mode.

Batch mode uses Query Service’s COPY command to generate Parquet result sets in a shared location. These Parquet files can then be further processed.

An example of reading a dataset in interactive mode can be seen below:

  // Read the configs
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

 val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "interactive")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df
  }

Similarly, an example of reading a dataset in batch mode can be seen below:

val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "batch")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df

SELECT columns from the dataset

df = df.select("column-a", "column-b").show()

DISTINCT clause

The DISTINCT clause allows you to fetch all the distinct values at a row/column level, removing all duplicate values from the response.

An example of using the distinct() function can be seen below:

df = df.select("column-a", "column-b").distinct().show()

WHERE clause

The Spark SDK allows for two methods for filtering: Using an SQL expression or by filtering through conditions.

An example of using these filtering functions can be seen below:

SQL expression

df.where("age > 15")

Filtering conditions

df.where("age" > 15 || "name" = "Steve")

ORDER BY clause

The ORDER BY clause allows received results to be sorted by a specified column in a specific order (ascending or descending). In the Spark SDK, this is done by using the sort() function.

An example of using the sort() function can be seen below:

df = df.sort($"column1", $"column2".desc)

LIMIT clause

The LIMIT clause allows you to limit the number of records received from the dataset.

An example of using the limit() function can be seen below:

df = df.limit(100)

Writing to a dataset

Using your configProperties mapping, you can write to a dataset in Experience Platform using QSOption.


val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

    df.write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .save()