SDK per authoring modelli
L'SDK per l'authoring dei modelli consente di sviluppare ricette di apprendimento automatico personalizzate e pipeline di funzioni che possono essere utilizzate in Adobe Experience Platform Data Science Workspace, fornendo modelli implementabili in PySpark e Spark (Scala).
Questo documento fornisce informazioni sulle varie classi presenti nell’SDK di authoring dei modelli.
DataLoader dataloader
La classe DataLoader racchiude qualsiasi elemento correlato al recupero, al filtraggio e alla restituzione di dati di input non elaborati. Esempi di dati di input includono quelli per l’apprendimento, il punteggio o la progettazione di funzionalità. I caricatori di dati estendono la classe astratta DataLoader
e devono eseguire l'override del metodo astratto load
.
PySpark
Nella tabella seguente vengono descritti i metodi astratti di una classe PySpark Data Loader:
load(self, configProperties, spark)
Caricare e restituire dati Platform come DataFrame Pandas
self
: autoreferenzaconfigProperties
: mappa delle proprietà di configurazionespark
: sessione Spark
Scintilla
Nella tabella seguente vengono descritti i metodi astratti di una classe del caricatore dati Spark:
load(configProperties, sparkSession)
Caricare e restituire dati Platform come DataFrame
configProperties
: mappa delle proprietà di configurazionesparkSession
: sessione Spark
Carica dati da un set di dati Platform load-data-from-a-platform-dataset
L'esempio seguente recupera i dati Platform per ID e restituisce un DataFrame, dove l'ID del set di dati (datasetId
) è una proprietà definita nel file di configurazione.
PySpark
# PySpark
from sdk.data_loader import DataLoader
class MyDataLoader(DataLoader):
"""
Implementation of DataLoader which loads a DataFrame and prepares data
"""
def load_dataset(config_properties, spark, task_id):
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
PLATFORM_SDK_PQS_INTERACTIVE = "interactive"
# prepare variables
service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
dataset_id = str(config_properties.get(task_id))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
# load dataset through Spark session
query_options = get_query_options(spark.sparkContext)
pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
.option(query_options.datasetId(), dataset_id) \
.load()
pd.show()
# return as DataFrame
return pd
Scintilla (Scala)
// Spark
package com.adobe.platform.ml
import java.time.LocalDateTime
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column
/**
* Implementation of DataLoader which loads a DataFrame and prepares data
*/
class MyDataLoader extends DataLoader {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
*
* @param configProperties - Configuration Properties map
* @param sparkSession - SparkSession
* @return - DataFrame which is loaded for training
*/
def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
require(configProperties != null)
require(sparkSession != null)
// Read the configs
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
.option(QSOption.datasetId, dataSetId)
.load()
df.show()
df
}
}
DataSaver datasaver
La classe DataSaver incapsula qualsiasi elemento correlato all'archiviazione dei dati di output, inclusi quelli derivanti dall'assegnazione di punteggi o dalla progettazione di funzionalità. I Data Savers estendono la classe astratta DataSaver
e devono sostituire il metodo astratto save
.
PySpark
Nella tabella seguente vengono descritti i metodi astratti di una classe Data Saver PySpark:
save(self, configProperties, dataframe)
Ricevi i dati di output come DataFrame e li memorizza in un set di dati Platform
self
: autoreferenzaconfigProperties
: mappa delle proprietà di configurazionedataframe
: dati da archiviare sotto forma di DataFrame
Scintilla (Scala)
Nella tabella seguente vengono descritti i metodi astratti di una classe Data Saver Spark:
save(configProperties, dataFrame)
Ricevi i dati di output come DataFrame e li memorizza in un set di dati Platform
configProperties
: mappa delle proprietà di configurazionedataFrame
: dati da archiviare sotto forma di DataFrame
Salva dati in un set di dati Platform save-data-to-a-platform-dataset
Per archiviare i dati in un set di dati Platform, le proprietà devono essere specificate o definite nel file di configurazione:
- ID del set di dati Platform valido in cui verranno archiviati i dati
- ID tenant appartenente alla tua organizzazione
Negli esempi seguenti i dati (prediction
) vengono archiviati in un set di dati Platform, dove l'ID del set di dati (datasetId
) e l'ID tenant (tenantId
) sono proprietà definite nel file di configurazione.
PySpark
# PySpark
from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *
class MyDataSaver(DataSaver):
"""
Implementation of DataSaver which stores a DataFrame to a Platform dataset
"""
def save(self, config_properties, prediction):
# Spark context
sparkContext = prediction._sc
# preliminary checks
if config_properties is None:
raise ValueError("config_properties parameter is null")
if prediction is None:
raise ValueError("prediction parameter is null")
if sparkContext is None:
raise ValueError("sparkContext parameter is null")
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
# prepare variables
scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
tenant_id = str(config_properties.get("tenant_id"))
timestamp = "2019-01-01 00:00:00"
service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
scored_df = prediction.withColumn("date", col("date").cast(StringType()))
scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty")
# store data into dataset
query_options = get_query_options(sparkContext)
scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.datasetId(), scored_dataset_id) \
.save()
Scintilla (Scala)
// Spark
package com.adobe.platform.ml
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType
/**
* Implementation of DataSaver which stores a DataFrame to a Platform dataset
*/
class ScoringDataSaver extends DataSaver {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
* Method that saves the scoring data into a dataframe
* @param configProperties - Configuration Properties map
* @param dataFrame - Dataframe with the scoring results
*/
override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit = {
require(configProperties != null)
require(dataFrame != null)
val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
val sparkSession = dataFrame.sparkSession
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val tenantId:String = configProperties.get("tenantId").getOrElse("")
val timestamp:String = "2019-01-01 00:00:00"
val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
import sparkSession.implicits._
var df = dataFrame.withColumn("date", $"date".cast("String"))
var scored_df = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty"))
scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.save()
}
}
DatasetTransformer datasettransformer
La classe DatasetTransformer modifica e trasforma la struttura di un set di dati. Sensei Machine Learning Runtime non richiede la definizione di questo componente e viene implementato in base alle tue esigenze.
Per quanto riguarda una pipeline di funzioni, i trasformatori di set di dati possono essere utilizzati in collaborazione con una feature pipeline factory per preparare i dati per la progettazione di funzioni.
PySpark
Nella tabella seguente vengono descritti i metodi di classe di una classe trasformatore di set di dati PySpark:
riassuntotransform(self, configProperties, dataset)
Accetta un set di dati come input e restituisce un nuovo set di dati derivato
self
: autoreferenzaconfigProperties
: mappa delle proprietà di configurazionedataset
: set di dati di input per la trasformazione
Scintilla (Scala)
Nella tabella seguente vengono descritti i metodi astratti di una classe di trasformazione del set di dati Spark:
transform(configProperties, dataset)
Accetta un set di dati come input e restituisce un nuovo set di dati derivato
configProperties
: mappa delle proprietà di configurazionedataset
: set di dati di input per la trasformazione
FeaturePipelineFactory featurepipelinefactory
La classe FeaturePipelineFactory contiene gli algoritmi di estrazione delle feature e definisce gli stadi di una tubazione di feature dall'inizio alla fine.
PySpark
Nella tabella seguente vengono descritti i metodi di classe di una FeaturePipelineFactory di PySpark:
riassuntocreate_pipeline(self, configProperties)
Creare e restituire una pipeline Spark contenente una serie di trasformatori Spark
self
: autoreferenzaconfigProperties
: mappa delle proprietà di configurazione
riassuntoget_param_map(self, configProperties, sparkSession)
Recupera e restituisce la mappa del parametro dalle proprietà di configurazione
self
: autoreferenzaconfigProperties
: proprietà di configurazionesparkSession
: sessione Spark
Scintilla (Scala)
Nella tabella seguente vengono descritti i metodi di classe di FeaturePipelineFactory Spark:
riassuntocreatePipeline(configProperties)
Creare e restituire una pipeline contenente una serie di trasformatori
configProperties
: mappa delle proprietà di configurazione
riassuntogetParamMap(configProperties, sparkSession)
Recupera e restituisce la mappa del parametro dalle proprietà di configurazione
configProperties
: proprietà di configurazionesparkSession
: sessione Spark
PipelineFactory pipelinefactory
La classe PipelineFactory incapsula i metodi e le definizioni per l'apprendimento e il punteggio del modello, in cui la logica di addestramento e gli algoritmi sono definiti sotto forma di pipeline Spark.
PySpark
Nella tabella seguente vengono descritti i metodi di classe di PySpark PipelineFactory:
riassuntoapply(self, configProperties)
Creare e restituire una pipeline Spark contenente la logica e l’algoritmo per l’apprendimento e il punteggio del modello
self
: autoreferenzaconfigProperties
: proprietà di configurazione
riassuntotrain(self, configProperties, dataframe)
Restituisci una pipeline personalizzata contenente la logica e l’algoritmo per addestrare un modello. Questo metodo non è necessario se si utilizza una pipeline Spark
self
: autoreferenzaconfigProperties
: proprietà di configurazionedataframe
: set di dati delle funzioni per l’input dell’apprendimento
riassuntoscore(self, configProperties, dataframe, model)
Punteggio utilizzando il modello addestrato e restituire i risultati
self
: autoreferenzaconfigProperties
: proprietà di configurazionedataframe
: set di dati di input per il punteggiomodel
: modello addestrato utilizzato per il punteggio
riassuntoget_param_map(self, configProperties, sparkSession)
Recupera e restituisce la mappa del parametro dalle proprietà di configurazione
self
: autoreferenzaconfigProperties
: proprietà di configurazionesparkSession
: sessione Spark
Scintilla (Scala)
Nella tabella seguente vengono descritti i metodi di classe di una pipelineFactory Spark:
riassuntoapply(configProperties)
Creare e restituire una pipeline contenente la logica e l’algoritmo per l’apprendimento e il punteggio del modello
configProperties
: proprietà di configurazione
riassuntogetParamMap(configProperties, sparkSession)
Recupera e restituisce la mappa del parametro dalle proprietà di configurazione
configProperties
: proprietà di configurazionesparkSession
: sessione Spark
MLEvaluator mlevaluator
La classe MLEvaluator fornisce metodi per definire le metriche di valutazione e determinare i set di dati di formazione e test.
PySpark
Nella tabella seguente vengono descritti i metodi di classe di un oggetto PySpark MLEvaluator:
riassuntosplit(self, configProperties, dataframe)
Suddivide il set di dati di input in sottoinsiemi di addestramento e test
self
: autoreferenzaconfigProperties
: proprietà di configurazionedataframe
: set di dati di input da dividere
riassuntoevaluate(self, dataframe, model, configProperties)
Valuta un modello addestrato e restituisce i risultati della valutazione
self
: autoreferenzadataframe
: DataFrame costituito da dati di addestramento e testmodel
: modello addestratoconfigProperties
: proprietà di configurazione
Scintilla (Scala)
Nella tabella seguente vengono descritti i metodi di classe di un MLEvaluator Spark:
riassuntosplit(configProperties, data)
Suddivide il set di dati di input in sottoinsiemi di addestramento e test
configProperties
: proprietà di configurazionedata
: set di dati di input da dividere
riassuntoevaluate(configProperties, model, data)
Valuta un modello addestrato e restituisce i risultati della valutazione
configProperties
: proprietà di configurazionemodel
: modello addestratodata
: DataFrame costituito da dati di addestramento e test