Reading a dataset
While using Spark you have access to two modes of reading: interactive and batch.
Interactive mode creates a Java Database Connectivity (JDBC) connection to Query Service and gets results through a regular JDBC ResultSet
that is automatically translated to a DataFrame
. This mode works similarly to the built-in Spark method spark.read.jdbc()
. This mode is meant only for small datasets. If your dataset exceeds 5 million rows, it is suggested you swap to batch mode.
Batch mode uses Query Service’s COPY command to generate Parquet result sets in a shared location. These Parquet files can then be further processed.
An example of reading a dataset in interactive mode can be seen below:
// Read the configs
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "interactive")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
}
Similarly, an example of reading a dataset in batch mode can be seen below:
val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "batch")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
SELECT columns from the dataset
df = df.select("column-a", "column-b").show()
DISTINCT clause
The DISTINCT clause allows you to fetch all the distinct values at a row/column level, removing all duplicate values from the response.
An example of using the distinct()
function can be seen below:
df = df.select("column-a", "column-b").distinct().show()
WHERE clause
The Spark SDK allows for two methods for filtering: Using an SQL expression or by filtering through conditions.
An example of using these filtering functions can be seen below:
SQL expression
df.where("age > 15")
Filtering conditions
df.where("age" > 15 || "name" = "Steve")
ORDER BY clause
The ORDER BY clause allows received results to be sorted by a specified column in a specific order (ascending or descending). In the Spark SDK, this is done by using the sort()
function.
An example of using the sort()
function can be seen below:
df = df.sort($"column1", $"column2".desc)
LIMIT clause
The LIMIT clause allows you to limit the number of records received from the dataset.
An example of using the limit()
function can be seen below:
df = df.limit(100)
Writing to a dataset
Using your configProperties
mapping, you can write to a dataset in Experience Platform using QSOption
.
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
df.write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.option(QSOption.sandboxName, sandboxName)
.save()