Model Authoring SDK

L’SDK per l’authoring dei modelli consente di sviluppare ricette di apprendimento automatico personalizzate e pipeline delle funzioni che possono essere utilizzate in Adobe Experience Platform Data Science Workspace, fornendo modelli implementabili in PySpark e Spark (Scala).

Questo documento fornisce informazioni sulle varie classi rilevate nell’SDK per l’authoring dei modelli.

DataLoader

La classe DataLoader racchiude qualsiasi elemento correlato al recupero, al filtraggio e alla restituzione di dati di input non elaborati. Esempi di dati di input includono quelli per la formazione, il punteggio o la progettazione di funzionalità. I caricatori di dati estendono la classe astratta DataLoader e devono sostituire il metodo astratto load.

PySpark

Nella tabella seguente sono descritti i metodi astratti di una classe PySpark Data Loader:

Metodo e descrizione Parametri

load(self, configProperties, spark)

Carica e restituisce i dati della piattaforma come dati Pandas DataFrame

  • self: Riferimento autonomo
  • configProperties: Mappa delle proprietà di configurazione
  • spark: Spark session

Spark

Nella tabella seguente sono descritti i metodi astratti di una classe Spark Data Loader:

Metodo e descrizione Parametri

load(configProperties, sparkSession)

Carica e restituisce i dati della piattaforma come DataFrame

  • configProperties: Mappa delle proprietà di configurazione
  • sparkSession: Spark session

Carica dati da un set di dati Platform

L'esempio seguente recupera i dati Platform per ID e restituisce un DataFrame, dove l'ID dataset (datasetId) è una proprietà definita nel file di configurazione.

PySpark

# PySpark

from sdk.data_loader import DataLoader

class MyDataLoader(DataLoader):
    """
    Implementation of DataLoader which loads a DataFrame and prepares data
    """

    def load_dataset(config_properties, spark, task_id):

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
        PLATFORM_SDK_PQS_INTERACTIVE = "interactive"

        # prepare variables
        service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        dataset_id = str(config_properties.get(task_id))

        # validate variables
        for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        # load dataset through Spark session

        query_options = get_query_options(spark.sparkContext)

        pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
            .option(query_options.datasetId(), dataset_id) \
            .load()
        pd.show()

        # return as DataFrame
        return pd

Spark (Scala)

// Spark

package com.adobe.platform.ml

import java.time.LocalDateTime

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column

/**
 * Implementation of DataLoader which loads a DataFrame and prepares data
 */
class MyDataLoader extends DataLoader {

    final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
    final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
    final val PLATFORM_SDK_PQS_BATCH: String = "batch"

    /**
    *
    * @param configProperties - Configuration Properties map
    * @param sparkSession     - SparkSession
    * @return                 - DataFrame which is loaded for training
    */


  def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {

    require(configProperties != null)
    require(sparkSession != null)

    // Read the configs
    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString

    val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
      .option(QSOption.datasetId, dataSetId)
      .load()
    df.show()
    df
    }
}

DataSaver

La classe DataSaver racchiude qualsiasi elemento correlato alla memorizzazione dei dati di output, inclusi quelli derivanti dal punteggio o dalla progettazione di funzionalità. I salvatori di dati estendono la classe astratta DataSaver e devono sostituire il metodo astratto save.

PySpark

Nella tabella seguente sono descritti i metodi astratti di una classe PySpark Data Saver:

Metodo e descrizione Parametri

save(self, configProperties, dataframe)

Ricevi i dati di output come DataFrame e li memorizza in un dataset della piattaforma

  • self: Riferimento autonomo
  • configProperties: Mappa delle proprietà di configurazione
  • dataframe: Dati da memorizzare sotto forma di DataFrame

Spark (Scala)

Nella tabella seguente sono descritti i metodi astratti di una classe Spark Data Saver:

Metodo e descrizione Parametri

save(configProperties, dataFrame)

Ricevi i dati di output come DataFrame e li memorizza in un dataset della piattaforma

  • configProperties: Mappa delle proprietà di configurazione
  • dataFrame: Dati da memorizzare sotto forma di DataFrame

Salva i dati in un set di dati Platform

Per memorizzare i dati in un dataset Platform, le proprietà devono essere fornite o definite nel file di configurazione:

  • Un ID di set di dati Platform valido a cui memorizzare i dati
  • L'ID tenant appartenente alla tua organizzazione

Gli esempi seguenti memorizzano i dati (prediction) in un dataset Platform, dove l'ID dataset (datasetId) e l'ID tenant (tenantId) sono proprietà definite all'interno del file di configurazione.

PySpark

# PySpark

from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *


class MyDataSaver(DataSaver):
    """
    Implementation of DataSaver which stores a DataFrame to a Platform dataset
    """

    def save(self, config_properties, prediction):

        # Spark context
        sparkContext = prediction._sc

        # preliminary checks
        if config_properties is None:
            raise ValueError("config_properties parameter is null")
        if prediction is None:
            raise ValueError("prediction parameter is null")
        if sparkContext is None:
            raise ValueError("sparkContext parameter is null")
        
        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"

        # prepare variables
        scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
        tenant_id = str(config_properties.get("tenant_id"))
        timestamp = "2019-01-01 00:00:00"

        service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        # validate variables
       for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)
        
        scored_df = prediction.withColumn("date", col("date").cast(StringType()))
        scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
        scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
        scored_df = scored_df.withColumn("_id", lit("empty"))
        scored_df = scored_df.withColumn("eventType", lit("empty")

        # store data into dataset

        query_options = get_query_options(sparkContext)

        scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.datasetId(), scored_dataset_id) \
            .save()

Spark (Scala)

// Spark

package com.adobe.platform.ml

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

/**
 * Implementation of DataSaver which stores a DataFrame to a Platform dataset
 */

class ScoringDataSaver extends DataSaver {

  final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
  final val PLATFORM_SDK_PQS_BATCH: String = "batch"

  /**
    * Method that saves the scoring data into a dataframe
    * @param configProperties  - Configuration Properties map
    * @param dataFrame         - Dataframe with the scoring results
    */
    
  override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit =  {

    require(configProperties != null)
    require(dataFrame != null)

    val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
    val sparkSession = dataFrame.sparkSession

    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val tenantId:String = configProperties.get("tenantId").getOrElse("")
    val timestamp:String = "2019-01-01 00:00:00"

    val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
    import sparkSession.implicits._

    var df = dataFrame.withColumn("date", $"date".cast("String"))

    var scored_df  = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
    scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
    scored_df = scored_df.withColumn("_id", lit("empty"))
    scored_df = scored_df.withColumn("eventType", lit("empty"))

    scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .save()
    }
}

DatasetTransformer

La classe DatasetTransformer modifica e trasforma la struttura di un dataset. Il componente Sensei Machine Learning Runtime non richiede la definizione di questo componente ed è implementato in base ai requisiti dell'utente.

Per quanto riguarda una tubazione di feature, i trasformatori di set di dati possono essere utilizzati in modo collaborativo con una fabbrica di tubazioni di feature per preparare i dati per la progettazione di feature.

PySpark

Nella tabella seguente sono descritti i metodi di classe di una classe di trasformatore di set di dati PySpark:

Metodo e descrizione Parametri

abstract
transform(self, configProperties, dataset)

Assume un dataset come input e genera un nuovo dataset derivato

  • self: Riferimento autonomo
  • configProperties: Mappa delle proprietà di configurazione
  • dataset: Il set di dati di input per la trasformazione

Spark (Scala)

Nella tabella seguente sono descritti i metodi astratti di una classe di trasformatore di set di dati Spark:

Metodo e descrizione Parametri

transform(configProperties, dataset)

Assume un dataset come input e genera un nuovo dataset derivato

  • configProperties: Mappa delle proprietà di configurazione
  • dataset: Il set di dati di input per la trasformazione

FeaturePipelineFactory

La classe FeaturePipelineFactory contiene algoritmi di estrazione delle feature e definisce le fasi di una feature Pipeline dall'inizio alla fine.

PySpark

Nella tabella seguente sono descritti i metodi di classe di una FeaturePipelineFactory di PySpark:

Metodo e descrizione Parametri

abstract
create_pipeline(self, configProperties)

Creare e restituire una pipeline Spark che contiene una serie di trasformatori Spark

  • self: Riferimento autonomo
  • configProperties: Mappa delle proprietà di configurazione

abstract
get_param_map(self, configProperties, sparkSession)

Recuperare e restituire la mappa param dalle proprietà di configurazione

  • self: Riferimento autonomo
  • configProperties: Proprietà di configurazione
  • sparkSession: Spark session

Spark (Scala)

Nella tabella seguente sono descritti i metodi di classe di una Spark FeaturePipelineFactory:

Metodo e descrizione Parametri

abstract
createPipeline(configProperties)

Creare e restituire una tubazione contenente una serie di trasformatori

  • configProperties: Mappa delle proprietà di configurazione

abstract
getParamMap(configProperties, sparkSession)

Recuperare e restituire la mappa param dalle proprietà di configurazione

  • configProperties: Proprietà di configurazione
  • sparkSession: Spark session

PipelineFactory

La classe PipelineFactory racchiude metodi e definizioni per la formazione e il punteggio dei modelli, in cui la logica e gli algoritmi di formazione sono definiti sotto forma di Spark Pipeline.

PySpark

Nella tabella seguente sono descritti i metodi di classe di una PipelineFactory di PySpark:

Metodo e descrizione Parametri

abstract
apply(self, configProperties)

Creare e restituire una pipeline Spark che contiene la logica e l'algoritmo per la formazione e il punteggio dei modelli

  • self: Riferimento autonomo
  • configProperties: Proprietà di configurazione

abstract
train(self, configProperties, dataframe)

Restituisce una pipeline personalizzata che contiene la logica e l'algoritmo per l'addestramento di un modello. Questo metodo non è richiesto se viene utilizzata una pipeline Spark

  • self: Riferimento autonomo
  • configProperties: Proprietà di configurazione
  • dataframe: Set di dati delle funzioni per l'input di formazione

abstract
score(self, configProperties, dataframe, model)

Punteggio utilizzando il modello preparato e restituire i risultati

  • self: Riferimento autonomo
  • configProperties: Proprietà di configurazione
  • dataframe: Set di dati di input per il punteggio
  • model: Un modello qualificato utilizzato per il punteggio

abstract
get_param_map(self, configProperties, sparkSession)

Recuperare e restituire la mappa param dalle proprietà di configurazione

  • self: Riferimento autonomo
  • configProperties: Proprietà di configurazione
  • sparkSession: Spark session

Spark (Scala)

Nella tabella seguente sono descritti i metodi di classe di una Spark PipelineFactory:

Metodo e descrizione Parametri

abstract
apply(configProperties)

Creare e restituire una pipeline che contiene la logica e l'algoritmo per la formazione e il punteggio dei modelli

  • configProperties: Proprietà di configurazione

abstract
getParamMap(configProperties, sparkSession)

Recuperare e restituire la mappa param dalle proprietà di configurazione

  • configProperties: Proprietà di configurazione
  • sparkSession: Spark session

MLEperatore

La classe MLEvalue fornisce metodi per definire le metriche di valutazione e determinare i set di dati di formazione e test.

PySpark

Nella tabella seguente sono descritti i metodi di classe di un oggetto PySpark MLEvalue:

Metodo e descrizione Parametri

abstract
split(self, configProperties, dataframe)

Suddivide il set di dati di input in sottoinsiemi di formazione e test

  • self: Riferimento autonomo
  • configProperties: Proprietà di configurazione
  • dataframe: Set di dati di input da dividere

abstract
evaluate(self, dataframe, model, configProperties)

Valuta un modello preparato e restituisce i risultati della valutazione

  • self: Riferimento autonomo
  • dataframe: Un DataFrame costituito da dati di formazione e test
  • model: Un modello preparato
  • configProperties: Proprietà di configurazione

Spark (Scala)

Nella tabella seguente sono descritti i metodi di classe di un Spark MLEvalue:

Metodo e descrizione Parametri

abstract
split(configProperties, data)

Suddivide il set di dati di input in sottoinsiemi di formazione e test

  • configProperties: Proprietà di configurazione
  • data: Set di dati di input da dividere

abstract
evaluate(configProperties, model, data)

Valuta un modello preparato e restituisce i risultati della valutazione

  • configProperties: Proprietà di configurazione
  • model: Un modello preparato
  • data: Un DataFrame costituito da dati di formazione e test

In questa pagina

Adobe Summit Banner

A virtual event April 27-28.

Expand your skills and get inspired.

Register for free
Adobe Summit Banner

A virtual event April 27-28.

Expand your skills and get inspired.

Register for free
Adobe Maker Awards Banner

Time to shine!

Apply now for the 2021 Adobe Experience Maker Awards.

Apply now
Adobe Maker Awards Banner

Time to shine!

Apply now for the 2021 Adobe Experience Maker Awards.

Apply now