L’SDK per l’authoring dei modelli consente di sviluppare ricette di apprendimento automatico personalizzate e pipeline di funzioni che possono essere utilizzate in Adobe Experience Platform Data Science Workspace, che fornisce modelli implementabili in PySpark e Spark (Scala).
Questo documento fornisce informazioni sulle varie classi presenti nell’SDK di authoring dei modelli.
La classe DataLoader racchiude qualsiasi elemento correlato al recupero, al filtraggio e alla restituzione di dati di input non elaborati. Esempi di dati di input includono quelli per l’apprendimento, il punteggio o la progettazione di funzionalità. I caricatori di dati estendono la classe astratta DataLoader
e devono eseguire l'override del metodo astratto load
.
PySpark
Nella tabella seguente vengono descritti i metodi astratti di una classe PySpark Data Loader:
Metodo e descrizione | Parametri |
---|---|
Caricare e restituire dati Platform come DataFrame Pandas |
|
Scintilla
Nella tabella seguente vengono descritti i metodi astratti di un Spark Classe Data Loader:
Metodo e descrizione | Parametri |
---|---|
Caricare e restituire dati Platform come DataFrame |
|
L’esempio seguente recupera Platform e restituisce un DataFrame, dove l’ID del set di dati (datasetId
) è una proprietà definita nel file di configurazione.
PySpark
# PySpark
from sdk.data_loader import DataLoader
class MyDataLoader(DataLoader):
"""
Implementation of DataLoader which loads a DataFrame and prepares data
"""
def load_dataset(config_properties, spark, task_id):
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
PLATFORM_SDK_PQS_INTERACTIVE = "interactive"
# prepare variables
service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
dataset_id = str(config_properties.get(task_id))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
# load dataset through Spark session
query_options = get_query_options(spark.sparkContext)
pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
.option(query_options.datasetId(), dataset_id) \
.load()
pd.show()
# return as DataFrame
return pd
Scintilla (Scala)
// Spark
package com.adobe.platform.ml
import java.time.LocalDateTime
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column
/**
* Implementation of DataLoader which loads a DataFrame and prepares data
*/
class MyDataLoader extends DataLoader {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
*
* @param configProperties - Configuration Properties map
* @param sparkSession - SparkSession
* @return - DataFrame which is loaded for training
*/
def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
require(configProperties != null)
require(sparkSession != null)
// Read the configs
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
.option(QSOption.datasetId, dataSetId)
.load()
df.show()
df
}
}
La classe DataSaver incapsula qualsiasi elemento correlato all'archiviazione dei dati di output, inclusi quelli derivanti dall'assegnazione di punteggi o dalla progettazione di funzionalità. I Data Savers estendono la classe astratta DataSaver
e devono eseguire l'override del metodo astratto save
.
PySpark
Nella tabella seguente vengono descritti i metodi astratti di un PySpark Classe Data Saver:
Metodo e descrizione | Parametri |
---|---|
Ricevi i dati di output come DataFrame e li memorizza in un set di dati Platform |
|
Scintilla (Scala)
Nella tabella seguente vengono descritti i metodi astratti di un Spark Classe Data Saver:
Metodo e descrizione | Parametri |
---|---|
Ricevi i dati di output come DataFrame e li memorizza in un set di dati Platform |
|
Per memorizzare i dati su una Platform set di dati, le proprietà devono essere fornite o definite nel file di configurazione:
Negli esempi seguenti vengono memorizzati i dati (prediction
) su una Platform set di dati, in cui l’ID del set di dati (datasetId
) e ID tenant (tenantId
) sono proprietà definite nel file di configurazione.
PySpark
# PySpark
from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *
class MyDataSaver(DataSaver):
"""
Implementation of DataSaver which stores a DataFrame to a Platform dataset
"""
def save(self, config_properties, prediction):
# Spark context
sparkContext = prediction._sc
# preliminary checks
if config_properties is None:
raise ValueError("config_properties parameter is null")
if prediction is None:
raise ValueError("prediction parameter is null")
if sparkContext is None:
raise ValueError("sparkContext parameter is null")
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
# prepare variables
scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
tenant_id = str(config_properties.get("tenant_id"))
timestamp = "2019-01-01 00:00:00"
service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
scored_df = prediction.withColumn("date", col("date").cast(StringType()))
scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty")
# store data into dataset
query_options = get_query_options(sparkContext)
scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.datasetId(), scored_dataset_id) \
.save()
Scintilla (Scala)
// Spark
package com.adobe.platform.ml
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType
/**
* Implementation of DataSaver which stores a DataFrame to a Platform dataset
*/
class ScoringDataSaver extends DataSaver {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
* Method that saves the scoring data into a dataframe
* @param configProperties - Configuration Properties map
* @param dataFrame - Dataframe with the scoring results
*/
override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit = {
require(configProperties != null)
require(dataFrame != null)
val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
val sparkSession = dataFrame.sparkSession
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val tenantId:String = configProperties.get("tenantId").getOrElse("")
val timestamp:String = "2019-01-01 00:00:00"
val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
import sparkSession.implicits._
var df = dataFrame.withColumn("date", $"date".cast("String"))
var scored_df = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty"))
scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.save()
}
}
La classe DatasetTransformer modifica e trasforma la struttura di un set di dati. Il Sensei Machine Learning Runtime non richiede la definizione di questo componente, che viene implementato in base alle tue esigenze.
Per quanto riguarda una pipeline di funzioni, i trasformatori di set di dati possono essere utilizzati in collaborazione con una feature pipeline factory per preparare i dati per la progettazione di funzioni.
PySpark
Nella tabella seguente vengono descritti i metodi di classe di una classe trasformatore di set di dati PySpark:
Metodo e descrizione | Parametri |
---|---|
riassunto Accetta un set di dati come input e restituisce un nuovo set di dati derivato |
|
Scintilla (Scala)
Nella tabella seguente vengono descritti i metodi astratti di un Spark classe trasformatore set di dati:
Metodo e descrizione | Parametri |
---|---|
Accetta un set di dati come input e restituisce un nuovo set di dati derivato |
|
La classe FeaturePipelineFactory contiene gli algoritmi di estrazione delle feature e definisce gli stadi di una tubazione di feature dall'inizio alla fine.
PySpark
Nella tabella seguente vengono descritti i metodi di classe di una FeaturePipelineFactory di PySpark:
Metodo e descrizione | Parametri |
---|---|
riassunto Creare e restituire una pipeline Spark contenente una serie di trasformatori Spark |
|
riassunto Recupera e restituisce la mappa del parametro dalle proprietà di configurazione |
|
Scintilla (Scala)
Nella tabella seguente vengono descritti i metodi di classe di un Spark FeaturePipelineFactory:
Metodo e descrizione | Parametri |
---|---|
riassunto Creare e restituire una pipeline contenente una serie di trasformatori |
|
riassunto Recupera e restituisce la mappa del parametro dalle proprietà di configurazione |
|
La classe PipelineFactory racchiude metodi e definizioni per l'apprendimento e il punteggio dei modelli, in cui la logica di apprendimento e gli algoritmi sono definiti sotto forma di Spark Pipeline.
PySpark
Nella tabella seguente vengono descritti i metodi di classe di PySpark PipelineFactory:
Metodo e descrizione | Parametri |
---|---|
riassunto Creare e restituire una pipeline Spark contenente la logica e l’algoritmo per l’apprendimento e il punteggio del modello |
|
riassunto Restituisci una pipeline personalizzata contenente la logica e l’algoritmo per addestrare un modello. Questo metodo non è necessario se si utilizza una pipeline Spark |
|
riassunto Punteggio utilizzando il modello addestrato e restituire i risultati |
|
riassunto Recupera e restituisce la mappa del parametro dalle proprietà di configurazione |
|
Scintilla (Scala)
Nella tabella seguente vengono descritti i metodi di classe di un Spark PipelineFactory:
Metodo e descrizione | Parametri |
---|---|
riassunto Creare e restituire una pipeline contenente la logica e l’algoritmo per l’apprendimento e il punteggio del modello |
|
riassunto Recupera e restituisce la mappa del parametro dalle proprietà di configurazione |
|
La classe MLEvaluator fornisce metodi per definire le metriche di valutazione e determinare i set di dati di formazione e test.
PySpark
Nella tabella seguente vengono descritti i metodi di classe di un oggetto PySpark MLEvaluator:
Metodo e descrizione | Parametri |
---|---|
riassunto Suddivide il set di dati di input in sottoinsiemi di addestramento e test |
|
riassunto Valuta un modello addestrato e restituisce i risultati della valutazione |
|
Scintilla (Scala)
Nella tabella seguente vengono descritti i metodi di classe di un Spark Valore MLE:
Metodo e descrizione | Parametri |
---|---|
riassunto Suddivide il set di dati di input in sottoinsiemi di addestramento e test |
|
riassunto Valuta un modello addestrato e restituisce i risultati della valutazione |
|