Model Authoring SDK
The Model Authoring SDK enables you to develop custom machine learning Recipes and Feature Pipelines which can be used in Adobe Experience Platform Data Science Workspace, providing implementable templates in PySpark and Spark (Scala).
This document provides information regarding the various classes found within the Model Authoring SDK.
DataLoader dataloader
The DataLoader class encapsulates anything related to the retrieving, filtering, and returning of raw input data. Examples of input data include those for training, scoring, or feature engineering. Data loaders extend the abstract class DataLoader
and must override the abstract method load
.
PySpark
The following table describes the abstract methods of a PySpark Data Loader class:
load(self, configProperties, spark)
Load and return Platform data as a Pandas DataFrame
self
: Self referenceconfigProperties
: Configuration properties mapspark
: Spark session
Spark
The following table describes the abstract methods of a Spark Data Loader class:
load(configProperties, sparkSession)
Load and return Platform data as a DataFrame
configProperties
: Configuration properties mapsparkSession
: Spark session
Load data from a Platform dataset load-data-from-a-platform-dataset
The following example retrieves Platform data by ID and returns a DataFrame, where the dataset ID (datasetId
) is a defined property in the configuration file.
PySpark
# PySpark
from sdk.data_loader import DataLoader
class MyDataLoader(DataLoader):
"""
Implementation of DataLoader which loads a DataFrame and prepares data
"""
def load_dataset(config_properties, spark, task_id):
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
PLATFORM_SDK_PQS_INTERACTIVE = "interactive"
# prepare variables
service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
dataset_id = str(config_properties.get(task_id))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
# load dataset through Spark session
query_options = get_query_options(spark.sparkContext)
pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
.option(query_options.datasetId(), dataset_id) \
.load()
pd.show()
# return as DataFrame
return pd
Spark (Scala)
// Spark
package com.adobe.platform.ml
import java.time.LocalDateTime
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column
/**
* Implementation of DataLoader which loads a DataFrame and prepares data
*/
class MyDataLoader extends DataLoader {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
*
* @param configProperties - Configuration Properties map
* @param sparkSession - SparkSession
* @return - DataFrame which is loaded for training
*/
def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
require(configProperties != null)
require(sparkSession != null)
// Read the configs
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
.option(QSOption.datasetId, dataSetId)
.load()
df.show()
df
}
}
DataSaver datasaver
The DataSaver class encapsulates anything related to storing output data including those from scoring or feature engineering. Data savers extend the abstract class DataSaver
and must override the abstract method save
.
PySpark
The following table describes the abstract methods of a PySpark Data Saver class:
save(self, configProperties, dataframe)
Receive output data as a DataFrame and stores it in a Platform dataset
self
: Self referenceconfigProperties
: Configuration properties mapdataframe
: Data to be stored in the form of a DataFrame
Spark (Scala)
The following table describes the abstract methods of a Spark Data Saver class:
save(configProperties, dataFrame)
Receive output data as a DataFrame and stores it in a Platform dataset
configProperties
: Configuration properties mapdataFrame
: Data to be stored in the form of a DataFrame
Save data to a Platform dataset save-data-to-a-platform-dataset
In order to store data onto a Platform dataset, the properties must be either provided or defined in the configuration file:
- A valid Platform dataset ID to which data will be stored
- The tenant ID belonging to your organization
The following examples store data (prediction
) onto a Platform dataset, where the dataset ID (datasetId
) and tenant ID (tenantId
) are defined properties within the configuration file.
PySpark
# PySpark
from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *
class MyDataSaver(DataSaver):
"""
Implementation of DataSaver which stores a DataFrame to a Platform dataset
"""
def save(self, config_properties, prediction):
# Spark context
sparkContext = prediction._sc
# preliminary checks
if config_properties is None:
raise ValueError("config_properties parameter is null")
if prediction is None:
raise ValueError("prediction parameter is null")
if sparkContext is None:
raise ValueError("sparkContext parameter is null")
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
# prepare variables
scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
tenant_id = str(config_properties.get("tenant_id"))
timestamp = "2019-01-01 00:00:00"
service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
scored_df = prediction.withColumn("date", col("date").cast(StringType()))
scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty")
# store data into dataset
query_options = get_query_options(sparkContext)
scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.datasetId(), scored_dataset_id) \
.save()
Spark (Scala)
// Spark
package com.adobe.platform.ml
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType
/**
* Implementation of DataSaver which stores a DataFrame to a Platform dataset
*/
class ScoringDataSaver extends DataSaver {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
* Method that saves the scoring data into a dataframe
* @param configProperties - Configuration Properties map
* @param dataFrame - Dataframe with the scoring results
*/
override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit = {
require(configProperties != null)
require(dataFrame != null)
val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
val sparkSession = dataFrame.sparkSession
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val tenantId:String = configProperties.get("tenantId").getOrElse("")
val timestamp:String = "2019-01-01 00:00:00"
val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
import sparkSession.implicits._
var df = dataFrame.withColumn("date", $"date".cast("String"))
var scored_df = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty"))
scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.save()
}
}
DatasetTransformer datasettransformer
The DatasetTransformer class modifies and transforms the structure of a dataset. The Sensei Machine Learning Runtime does not require this component to be defined, and is implemented based on your requirements.
In regards to a feature pipeline, dataset transformers can be used cooporatively with a feature pipeline factory to prepare data for feature engineering.
PySpark
The following table describes the class methods of a PySpark dataset transformer class:
abstracttransform(self, configProperties, dataset)
Takes a dataset as input and output a new derived dataset
self
: Self referenceconfigProperties
: Configuration properties mapdataset
: The input dataset for transformation
Spark (Scala)
The following table describes the abstract methods of a Spark dataset transformer class:
transform(configProperties, dataset)
Takes a dataset as input and output a new derived dataset
configProperties
: Configuration properties mapdataset
: The input dataset for transformation
FeaturePipelineFactory featurepipelinefactory
The FeaturePipelineFactory class contains feature extraction algorithms and defines the stages of a Feature Pipeline from start to finish.
PySpark
The following table describes the class methods of a PySpark FeaturePipelineFactory:
abstractcreate_pipeline(self, configProperties)
Create and return a Spark Pipeline that contains a series of Spark Transformers
self
: Self referenceconfigProperties
: Configuration properties map
abstractget_param_map(self, configProperties, sparkSession)
Retrieve and return param map from configuration properties
self
: Self referenceconfigProperties
: Configuration propertiessparkSession
: Spark session
Spark (Scala)
The following table describes the class methods of a Spark FeaturePipelineFactory:
abstractcreatePipeline(configProperties)
Create and return a Pipeline that contains a series of Transformers
configProperties
: Configuration properties map
abstractgetParamMap(configProperties, sparkSession)
Retrieve and return param map from configuration properties
configProperties
: Configuration propertiessparkSession
: Spark session
PipelineFactory pipelinefactory
The PipelineFactory class encapsulates methods and definitions for model training and scoring, where training logic and algorithms are defined in the form of a Spark Pipeline.
PySpark
The following table describes the class methods of a PySpark PipelineFactory:
abstractapply(self, configProperties)
Create and Return a Spark Pipeline which contains the logic and algorithm for model training and scoring
self
: Self referenceconfigProperties
: Configuration properties
abstracttrain(self, configProperties, dataframe)
Return a custom Pipeline which contains the logic and algorithm to train a model. This method is not required if a Spark Pipeline is used
self
: Self referenceconfigProperties
: Configuration propertiesdataframe
: Feature dataset for training input
abstractscore(self, configProperties, dataframe, model)
Score using the trained model and return the results
self
: Self referenceconfigProperties
: Configuration propertiesdataframe
: Input dataset for scoringmodel
: A trained model used for scoring
abstractget_param_map(self, configProperties, sparkSession)
Retrieve and return param map from configuration properties
self
: Self referenceconfigProperties
: Configuration propertiessparkSession
: Spark session
Spark (Scala)
The following table describes the class methods of a Spark PipelineFactory:
abstractapply(configProperties)
Create and Return a Pipeline which contains the logic and algorithm for model training and scoring
configProperties
: Configuration properties
abstractgetParamMap(configProperties, sparkSession)
Retrieve and return param map from configuration properties
configProperties
: Configuration propertiessparkSession
: Spark session
MLEvaluator mlevaluator
The MLEvaluator class provides methods for defining evaluation metrics and determining training and testing datasets.
PySpark
The following table describes the class methods of a PySpark MLEvaluator:
abstractsplit(self, configProperties, dataframe)
Splits the input dataset into training and testing subsets
self
: Self referenceconfigProperties
: Configuration propertiesdataframe
: Input dataset to be split
abstractevaluate(self, dataframe, model, configProperties)
Evaluates a trained model and return the evaluation results
self
: Self referencedataframe
: A DataFrame consisting of training and testing datamodel
: A trained modelconfigProperties
: Configuration properties
Spark (Scala)
The following table describes the class methods of a Spark MLEvaluator:
abstractsplit(configProperties, data)
Splits the input dataset into training and testing subsets
configProperties
: Configuration propertiesdata
: Input dataset to be split
abstractevaluate(configProperties, model, data)
Evaluates a trained model and return the evaluation results
configProperties
: Configuration propertiesmodel
: A trained modeldata
: A DataFrame consisting of training and testing data