SDK per authoring modelli

L’SDK per l’authoring dei modelli consente di sviluppare ricette di apprendimento automatico personalizzate e pipeline di funzioni che possono essere utilizzate in Adobe Experience Platform Data Science Workspace, che fornisce modelli implementabili in PySpark e Spark (Scala).

Questo documento fornisce informazioni sulle varie classi presenti nell’SDK di authoring dei modelli.

DataLoader dataloader

La classe DataLoader racchiude qualsiasi elemento correlato al recupero, al filtraggio e alla restituzione di dati di input non elaborati. Esempi di dati di input includono quelli per l’apprendimento, il punteggio o la progettazione di funzionalità. I caricatori di dati estendono la classe astratta DataLoader e devono eseguire l'override del metodo astratto load.

PySpark

Nella tabella seguente vengono descritti i metodi astratti di una classe PySpark Data Loader:

Metodo e descrizione
Parametri

load(self, configProperties, spark)

Caricare e restituire dati Platform come DataFrame Pandas

  • self: autoreferenza
  • configProperties: mappa delle proprietà di configurazione
  • spark: sessione Spark

Scintilla

Nella tabella seguente vengono descritti i metodi astratti di un Spark Classe Data Loader:

Metodo e descrizione
Parametri

load(configProperties, sparkSession)

Caricare e restituire dati Platform come DataFrame

  • configProperties: mappa delle proprietà di configurazione
  • sparkSession: sessione Spark

Caricare dati da un Platform set di dati load-data-from-a-platform-dataset

L’esempio seguente recupera Platform e restituisce un DataFrame, dove l’ID del set di dati (datasetId) è una proprietà definita nel file di configurazione.

PySpark

# PySpark

from sdk.data_loader import DataLoader

class MyDataLoader(DataLoader):
    """
    Implementation of DataLoader which loads a DataFrame and prepares data
    """

    def load_dataset(config_properties, spark, task_id):

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
        PLATFORM_SDK_PQS_INTERACTIVE = "interactive"

        # prepare variables
        service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        dataset_id = str(config_properties.get(task_id))

        # validate variables
        for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        # load dataset through Spark session

        query_options = get_query_options(spark.sparkContext)

        pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
            .option(query_options.datasetId(), dataset_id) \
            .load()
        pd.show()

        # return as DataFrame
        return pd

Scintilla (Scala)

// Spark

package com.adobe.platform.ml

import java.time.LocalDateTime

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column

/**
 * Implementation of DataLoader which loads a DataFrame and prepares data
 */
class MyDataLoader extends DataLoader {

    final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
    final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
    final val PLATFORM_SDK_PQS_BATCH: String = "batch"

    /**
    *
    * @param configProperties - Configuration Properties map
    * @param sparkSession     - SparkSession
    * @return                 - DataFrame which is loaded for training
    */


  def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {

    require(configProperties != null)
    require(sparkSession != null)

    // Read the configs
    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString

    val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
      .option(QSOption.datasetId, dataSetId)
      .load()
    df.show()
    df
    }
}

DataSaver datasaver

La classe DataSaver incapsula qualsiasi elemento correlato all'archiviazione dei dati di output, inclusi quelli derivanti dall'assegnazione di punteggi o dalla progettazione di funzionalità. I Data Savers estendono la classe astratta DataSaver e devono eseguire l'override del metodo astratto save.

PySpark

Nella tabella seguente vengono descritti i metodi astratti di un PySpark Classe Data Saver:

Metodo e descrizione
Parametri

save(self, configProperties, dataframe)

Ricevi i dati di output come DataFrame e li memorizza in un set di dati Platform

  • self: autoreferenza
  • configProperties: mappa delle proprietà di configurazione
  • dataframe: dati da archiviare sotto forma di DataFrame

Scintilla (Scala)

Nella tabella seguente vengono descritti i metodi astratti di un Spark Classe Data Saver:

Metodo e descrizione
Parametri

save(configProperties, dataFrame)

Ricevi i dati di output come DataFrame e li memorizza in un set di dati Platform

  • configProperties: mappa delle proprietà di configurazione
  • dataFrame: dati da archiviare sotto forma di DataFrame

Salva dati in un Platform set di dati save-data-to-a-platform-dataset

Per memorizzare i dati su una Platform set di dati, le proprietà devono essere fornite o definite nel file di configurazione:

  • Un valore valido Platform ID del set di dati in cui verranno memorizzati i dati
  • ID tenant appartenente alla tua organizzazione

Negli esempi seguenti vengono memorizzati i dati (prediction) su una Platform set di dati, in cui l’ID del set di dati (datasetId) e ID tenant (tenantId) sono proprietà definite nel file di configurazione.

PySpark

# PySpark

from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *


class MyDataSaver(DataSaver):
    """
    Implementation of DataSaver which stores a DataFrame to a Platform dataset
    """

    def save(self, config_properties, prediction):

        # Spark context
        sparkContext = prediction._sc

        # preliminary checks
        if config_properties is None:
            raise ValueError("config_properties parameter is null")
        if prediction is None:
            raise ValueError("prediction parameter is null")
        if sparkContext is None:
            raise ValueError("sparkContext parameter is null")

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"

        # prepare variables
        scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
        tenant_id = str(config_properties.get("tenant_id"))
        timestamp = "2019-01-01 00:00:00"

        service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        # validate variables
       for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        scored_df = prediction.withColumn("date", col("date").cast(StringType()))
        scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
        scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
        scored_df = scored_df.withColumn("_id", lit("empty"))
        scored_df = scored_df.withColumn("eventType", lit("empty")

        # store data into dataset

        query_options = get_query_options(sparkContext)

        scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.datasetId(), scored_dataset_id) \
            .save()

Scintilla (Scala)

// Spark

package com.adobe.platform.ml

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

/**
 * Implementation of DataSaver which stores a DataFrame to a Platform dataset
 */

class ScoringDataSaver extends DataSaver {

  final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
  final val PLATFORM_SDK_PQS_BATCH: String = "batch"

  /**
    * Method that saves the scoring data into a dataframe
    * @param configProperties  - Configuration Properties map
    * @param dataFrame         - Dataframe with the scoring results
    */

  override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit =  {

    require(configProperties != null)
    require(dataFrame != null)

    val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
    val sparkSession = dataFrame.sparkSession

    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val tenantId:String = configProperties.get("tenantId").getOrElse("")
    val timestamp:String = "2019-01-01 00:00:00"

    val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
    import sparkSession.implicits._

    var df = dataFrame.withColumn("date", $"date".cast("String"))

    var scored_df  = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
    scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
    scored_df = scored_df.withColumn("_id", lit("empty"))
    scored_df = scored_df.withColumn("eventType", lit("empty"))

    scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .save()
    }
}

DatasetTransformer datasettransformer

La classe DatasetTransformer modifica e trasforma la struttura di un set di dati. Il Sensei Machine Learning Runtime non richiede la definizione di questo componente, che viene implementato in base alle tue esigenze.

Per quanto riguarda una pipeline di funzioni, i trasformatori di set di dati possono essere utilizzati in collaborazione con una feature pipeline factory per preparare i dati per la progettazione di funzioni.

PySpark

Nella tabella seguente vengono descritti i metodi di classe di una classe trasformatore di set di dati PySpark:

Metodo e descrizione
Parametri

riassunto
transform(self, configProperties, dataset)

Accetta un set di dati come input e restituisce un nuovo set di dati derivato

  • self: autoreferenza
  • configProperties: mappa delle proprietà di configurazione
  • dataset: set di dati di input per la trasformazione

Scintilla (Scala)

Nella tabella seguente vengono descritti i metodi astratti di un Spark classe trasformatore set di dati:

Metodo e descrizione
Parametri

transform(configProperties, dataset)

Accetta un set di dati come input e restituisce un nuovo set di dati derivato

  • configProperties: mappa delle proprietà di configurazione
  • dataset: set di dati di input per la trasformazione

FeaturePipelineFactory featurepipelinefactory

La classe FeaturePipelineFactory contiene gli algoritmi di estrazione delle feature e definisce gli stadi di una tubazione di feature dall'inizio alla fine.

PySpark

Nella tabella seguente vengono descritti i metodi di classe di una FeaturePipelineFactory di PySpark:

Metodo e descrizione
Parametri

riassunto
create_pipeline(self, configProperties)

Creare e restituire una pipeline Spark contenente una serie di trasformatori Spark

  • self: autoreferenza
  • configProperties: mappa delle proprietà di configurazione

riassunto
get_param_map(self, configProperties, sparkSession)

Recupera e restituisce la mappa del parametro dalle proprietà di configurazione

  • self: autoreferenza
  • configProperties: proprietà di configurazione
  • sparkSession: sessione Spark

Scintilla (Scala)

Nella tabella seguente vengono descritti i metodi di classe di un Spark FeaturePipelineFactory:

Metodo e descrizione
Parametri

riassunto
createPipeline(configProperties)

Creare e restituire una pipeline contenente una serie di trasformatori

  • configProperties: mappa delle proprietà di configurazione

riassunto
getParamMap(configProperties, sparkSession)

Recupera e restituisce la mappa del parametro dalle proprietà di configurazione

  • configProperties: proprietà di configurazione
  • sparkSession: sessione Spark

PipelineFactory pipelinefactory

La classe PipelineFactory racchiude metodi e definizioni per l'apprendimento e il punteggio dei modelli, in cui la logica di apprendimento e gli algoritmi sono definiti sotto forma di Spark Pipeline.

PySpark

Nella tabella seguente vengono descritti i metodi di classe di PySpark PipelineFactory:

Metodo e descrizione
Parametri

riassunto
apply(self, configProperties)

Creare e restituire una pipeline Spark contenente la logica e l’algoritmo per l’apprendimento e il punteggio del modello

  • self: autoreferenza
  • configProperties: proprietà di configurazione

riassunto
train(self, configProperties, dataframe)

Restituisci una pipeline personalizzata contenente la logica e l’algoritmo per addestrare un modello. Questo metodo non è necessario se si utilizza una pipeline Spark

  • self: autoreferenza
  • configProperties: proprietà di configurazione
  • dataframe: set di dati delle funzioni per l’input dell’apprendimento

riassunto
score(self, configProperties, dataframe, model)

Punteggio utilizzando il modello addestrato e restituire i risultati

  • self: autoreferenza
  • configProperties: proprietà di configurazione
  • dataframe: set di dati di input per il punteggio
  • model: modello addestrato utilizzato per il punteggio

riassunto
get_param_map(self, configProperties, sparkSession)

Recupera e restituisce la mappa del parametro dalle proprietà di configurazione

  • self: autoreferenza
  • configProperties: proprietà di configurazione
  • sparkSession: sessione Spark

Scintilla (Scala)

Nella tabella seguente vengono descritti i metodi di classe di un Spark PipelineFactory:

Metodo e descrizione
Parametri

riassunto
apply(configProperties)

Creare e restituire una pipeline contenente la logica e l’algoritmo per l’apprendimento e il punteggio del modello

  • configProperties: proprietà di configurazione

riassunto
getParamMap(configProperties, sparkSession)

Recupera e restituisce la mappa del parametro dalle proprietà di configurazione

  • configProperties: proprietà di configurazione
  • sparkSession: sessione Spark

MLEvaluator mlevaluator

La classe MLEvaluator fornisce metodi per definire le metriche di valutazione e determinare i set di dati di formazione e test.

PySpark

Nella tabella seguente vengono descritti i metodi di classe di un oggetto PySpark MLEvaluator:

Metodo e descrizione
Parametri

riassunto
split(self, configProperties, dataframe)

Suddivide il set di dati di input in sottoinsiemi di addestramento e test

  • self: autoreferenza
  • configProperties: proprietà di configurazione
  • dataframe: set di dati di input da dividere

riassunto
evaluate(self, dataframe, model, configProperties)

Valuta un modello addestrato e restituisce i risultati della valutazione

  • self: autoreferenza
  • dataframe: DataFrame costituito da dati di addestramento e test
  • model: modello addestrato
  • configProperties: proprietà di configurazione

Scintilla (Scala)

Nella tabella seguente vengono descritti i metodi di classe di un Spark Valore MLE:

Metodo e descrizione
Parametri

riassunto
split(configProperties, data)

Suddivide il set di dati di input in sottoinsiemi di addestramento e test

  • configProperties: proprietà di configurazione
  • data: set di dati di input da dividere

riassunto
evaluate(configProperties, model, data)

Valuta un modello addestrato e restituisce i risultati della valutazione

  • configProperties: proprietà di configurazione
  • model: modello addestrato
  • data: DataFrame costituito da dati di addestramento e test
recommendation-more-help
cc79fe26-64da-411e-a6b9-5b650f53e4e9