Modell-Authoring-SDK

Das Modell-Authoring-SDK ermöglicht Ihnen die Entwicklung benutzerdefinierter maschineller Lernrezepte und Feature Pipelines, die in Adobe Experience Platform Data Science Workspace verwendet werden können, und stellt implementierbare Vorlagen in PySpark und Spark (Scala) bereit.

Dieses Dokument enthält Informationen zu den verschiedenen Klassen im Model Authoring SDK.

DataLoader

Die DataLoader-Klasse enthält alles, was zum Abrufen, Filtern und Zurückgeben von rohen Eingabedaten benötigt wird. Beispiele für Eingabedaten sind Daten für Training, Scoring oder Funktionsentwicklung. Datenlader erweitern die abstrakte Klasse DataLoader und müssen die abstrakte Methode load überschreiben.

PySpark

In der folgenden Tabelle werden die abstrakten Methoden einer PySpark-Datenlader-Klasse beschrieben:

Methode und Beschreibung Parameter

load(self, configProperties, spark)

Platform-Daten als Pandas-DataFrame laden und zurückgeben

  • self: Eigenverweis
  • configProperties: Zuordnung von Konfigurationseigenschaften
  • spark: Spark-Sitzung

Spark

Die folgende Tabelle beschreibt die abstrakten Methoden einer Spark Data Loader-Klasse:

Methode und Beschreibung Parameter

load(configProperties, sparkSession)

Platform-Daten als DataFrame laden und zurückgeben

  • configProperties: Zuordnung von Konfigurationseigenschaften
  • sparkSession: Spark-Sitzung

Daten aus einem Platform-Datensatz laden

Im folgenden Beispiel werden Platform-Daten nach ID abgerufen und ein DataFrame zurückgegeben, wobei die DataSet-ID (datasetId) eine definierte Eigenschaft in der Konfigurationsdatei ist.

PySpark

# PySpark

from sdk.data_loader import DataLoader

class MyDataLoader(DataLoader):
    """
    Implementation of DataLoader which loads a DataFrame and prepares data
    """

    def load_dataset(config_properties, spark, task_id):

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
        PLATFORM_SDK_PQS_INTERACTIVE = "interactive"

        # prepare variables
        service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        dataset_id = str(config_properties.get(task_id))

        # validate variables
        for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        # load dataset through Spark session

        query_options = get_query_options(spark.sparkContext)

        pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
            .option(query_options.datasetId(), dataset_id) \
            .load()
        pd.show()

        # return as DataFrame
        return pd

Spark (Scala)

// Spark

package com.adobe.platform.ml

import java.time.LocalDateTime

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column

/**
 * Implementation of DataLoader which loads a DataFrame and prepares data
 */
class MyDataLoader extends DataLoader {

    final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
    final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
    final val PLATFORM_SDK_PQS_BATCH: String = "batch"

    /**
    *
    * @param configProperties - Configuration Properties map
    * @param sparkSession     - SparkSession
    * @return                 - DataFrame which is loaded for training
    */


  def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {

    require(configProperties != null)
    require(sparkSession != null)

    // Read the configs
    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString

    val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
      .option(QSOption.datasetId, dataSetId)
      .load()
    df.show()
    df
    }
}

DataSaver

Die DataSaver-Klasse enthält alles, was mit dem Speichern von Ausgabedaten zu tun hat, einschließlich Daten für Scoring oder Funktionsentwicklung. Data Saver erweitern die abstrakte Klasse DataSaver und müssen die abstrakte Methode save überschreiben.

PySpark

Die folgende Tabelle beschreibt die abstrakten Methoden einer PySpark Data Saver-Klasse:

Methode und Beschreibung Parameter

save(self, configProperties, dataframe)

Ausgabedaten als DataFrame empfangen und in einem Platform-Datensatz speichern

  • self: Eigenverweis
  • configProperties: Zuordnung von Konfigurationseigenschaften
  • dataframe: Daten, die in Form eines DataFrame gespeichert werden sollen

Spark (Scala)

Die folgende Tabelle beschreibt die abstrakten Methoden einer Spark Data Saver-Klasse:

Methode und Beschreibung Parameter

save(configProperties, dataFrame)

Ausgabedaten als DataFrame empfangen und in einem Platform-Datensatz speichern

  • configProperties: Zuordnung von Konfigurationseigenschaften
  • dataFrame: Daten, die in Form eines DataFrame gespeichert werden sollen

Daten in einem Platform-Datensatz speichern

Um Daten in einem Platform-Datensatz zu speichern, müssen die Eigenschaften entweder bereitgestellt oder in der Konfigurationsdatei definiert werden:

  • Eine gültige Platform-Dataset-ID, in der Daten gespeichert werden
  • Die Mandanten-ID in Ihrer Organisation

Im folgenden Beispiel werden Daten (prediction) in einem Platform-Datensatz gespeichert, wobei die Dataset-ID (datasetId) und die Mandant-ID (tenantId) Eigenschaften in der Konfigurationsdatei sind.

PySpark

# PySpark

from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *


class MyDataSaver(DataSaver):
    """
    Implementation of DataSaver which stores a DataFrame to a Platform dataset
    """

    def save(self, config_properties, prediction):

        # Spark context
        sparkContext = prediction._sc

        # preliminary checks
        if config_properties is None:
            raise ValueError("config_properties parameter is null")
        if prediction is None:
            raise ValueError("prediction parameter is null")
        if sparkContext is None:
            raise ValueError("sparkContext parameter is null")
        
        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"

        # prepare variables
        scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
        tenant_id = str(config_properties.get("tenant_id"))
        timestamp = "2019-01-01 00:00:00"

        service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        # validate variables
       for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)
        
        scored_df = prediction.withColumn("date", col("date").cast(StringType()))
        scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
        scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
        scored_df = scored_df.withColumn("_id", lit("empty"))
        scored_df = scored_df.withColumn("eventType", lit("empty")

        # store data into dataset

        query_options = get_query_options(sparkContext)

        scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.datasetId(), scored_dataset_id) \
            .save()

Spark (Scala)

// Spark

package com.adobe.platform.ml

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

/**
 * Implementation of DataSaver which stores a DataFrame to a Platform dataset
 */

class ScoringDataSaver extends DataSaver {

  final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
  final val PLATFORM_SDK_PQS_BATCH: String = "batch"

  /**
    * Method that saves the scoring data into a dataframe
    * @param configProperties  - Configuration Properties map
    * @param dataFrame         - Dataframe with the scoring results
    */
    
  override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit =  {

    require(configProperties != null)
    require(dataFrame != null)

    val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
    val sparkSession = dataFrame.sparkSession

    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val tenantId:String = configProperties.get("tenantId").getOrElse("")
    val timestamp:String = "2019-01-01 00:00:00"

    val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
    import sparkSession.implicits._

    var df = dataFrame.withColumn("date", $"date".cast("String"))

    var scored_df  = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
    scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
    scored_df = scored_df.withColumn("_id", lit("empty"))
    scored_df = scored_df.withColumn("eventType", lit("empty"))

    scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .save()
    }
}

DatasetTransformer

Die DatasetTransformer-Klasse ändert und transformiert die Struktur eines Datensatzes. Für Sensei Machine Learning Runtime muss diese Komponente nicht definiert sein und wird entsprechend Ihren Anforderungen implementiert.

Im Hinblick auf eine Funktions-Pipeline können DataSet Transformer gemeinsam mit einer Feature-Pipeline-Factory zur Vorbereitung von Daten für die Funktionsentwicklung genutzt werden.

PySpark

In der folgenden Tabelle werden die Klassenmethoden einer PySpark-DataSet Transformer-Klasse beschrieben:

Methode und Beschreibung Parameter

abstract
transform(self, configProperties, dataset)

Nimmt einen Datensatz als Eingabe und gibt einen neuen abgeleiteten Datensatz aus

  • self: Eigenverweis
  • configProperties: Zuordnung von Konfigurationseigenschaften
  • dataset: Der Eingabedatensatz für die Transformation

Spark (Scala)

Die folgende Tabelle beschreibt die abstrakten Methoden einer Spark DataSet-Transformer-Klasse:

Methode und Beschreibung Parameter

transform(configProperties, dataset)

Nimmt einen Datensatz als Eingabe und gibt einen neuen abgeleiteten Datensatz aus

  • configProperties: Zuordnung von Konfigurationseigenschaften
  • dataset: Der Eingabedatensatz für die Transformation

FeaturePipelineFactory

Die FeaturePipelineFactory-Klasse enthält Extraktionsalgorithmen für Funktionen und definiert die Phasen einer Funktions-Pipeline von Anfang bis Ende.

PySpark

In der folgenden Tabelle werden die Klassenmethoden einer PySpark-FeaturePipelineFactory beschrieben:

Methode und Beschreibung Parameter

abstract
create_pipeline(self, configProperties)

Spark-Pipeline erstellen und zurückgeben, die eine Reihe von Spark-Transformern enthält

  • self: Eigenverweis
  • configProperties: Zuordnung von Konfigurationseigenschaften

abstract
get_param_map(self, configProperties, sparkSession)

Parameter-Mapping aus Konfigurationseigenschaften abrufen und zurückgeben

  • self: Eigenverweis
  • configProperties: Konfigurationseigenschaften
  • sparkSession: Spark-Sitzung

Spark (Scala)

Die folgende Tabelle beschreibt die Klassenmethoden einer Spark FeaturePipelineFactory:

Methode und Beschreibung Parameter

abstract
createPipeline(configProperties)

Pipeline mit einer Reihe von Umwandlern erstellen und zurückgeben

  • configProperties: Zuordnung von Konfigurationseigenschaften

abstract
getParamMap(configProperties, sparkSession)

Parameter-Mapping aus Konfigurationseigenschaften abrufen und zurückgeben

  • configProperties: Konfigurationseigenschaften
  • sparkSession: Spark-Sitzung

PipelineFactory

Die PipelineFactory-Klasse enthält Methoden und Definitionen für Modellschulungen und -bewertungen, wobei Schulungslogik und -algorithmen in Form einer Spark-Pipeline definiert werden.

PySpark

Die folgende Tabelle beschreibt die Klassenmethoden einer PySpark-PipelineFactory:

Methode und Beschreibung Parameter

abstract
apply(self, configProperties)

Spark-Pipeline erstellen und zurückgeben, die die Logik und den Algorithmus für das Trainieren und Bewerten von Modellen enthält

  • self: Eigenverweis
  • configProperties: Konfigurationseigenschaften

abstract
train(self, configProperties, dataframe)

Benutzerdefinierte Pipeline zurückgeben, die die Logik und den Algorithmus zum Trainieren eines Modells enthält. Diese Methode ist nicht erforderlich, wenn eine Spark-Pipeline verwendet wird

  • self: Eigenverweis
  • configProperties: Konfigurationseigenschaften
  • dataframe: Funktionsdatensatz für Trainings-Eingabe

abstract
score(self, configProperties, dataframe, model)

Mit dem trainierten Modell bewerten und die Ergebnisse zurückgeben

  • self: Eigenverweis
  • configProperties: Konfigurationseigenschaften
  • dataframe: Eingabedatensatz für Scoring
  • model: Ein trainiertes Modell, das zum Scoring verwendet wird

abstract
get_param_map(self, configProperties, sparkSession)

Parameter-Mapping aus Konfigurationseigenschaften abrufen und zurückgeben

  • self: Eigenverweis
  • configProperties: Konfigurationseigenschaften
  • sparkSession: Spark-Sitzung

Spark (Scala)

Die folgende Tabelle beschreibt die Klassenmethoden einer Spark PipelineFactory:

Methode und Beschreibung Parameter

abstract
apply(configProperties)

Pipeline erstellen und zurückgeben, die die Logik und den Algorithmus für das Trainieren und Bewerten von Modellen enthält

  • configProperties: Konfigurationseigenschaften

abstract
getParamMap(configProperties, sparkSession)

Parameter-Mapping aus Konfigurationseigenschaften abrufen und zurückgeben

  • configProperties: Konfigurationseigenschaften
  • sparkSession: Spark-Sitzung

MLEvaluator

Die MLEvaluator-Klasse stellt Methoden zum Definieren von Bewertungsmetriken und zum Festlegen von Trainings- und Testdatensätzen bereit.

PySpark

In der folgenden Tabelle werden die Klassenmethoden eines PySpark-MLEvaluator beschrieben:

Methode und Beschreibung Parameter

abstract
split(self, configProperties, dataframe)

Teilt den Eingabedatensatz in Trainings- und Testuntergruppen

  • self: Eigenverweis
  • configProperties: Konfigurationseigenschaften
  • dataframe: Zu teilender Eingabedatensatz

abstract
evaluate(self, dataframe, model, configProperties)

Wertet ein trainiertes Modell aus und gibt die Auswertungsergebnisse zurück

  • self: Eigenverweis
  • dataframe: Ein DataFrame, bestehend aus Trainings- und Testdaten
  • model: Ein trainiertes Modell
  • configProperties: Konfigurationseigenschaften

Spark (Scala)

Die folgende Tabelle beschreibt die Klassenmethoden eines Spark MLEvaluators:

Methode und Beschreibung Parameter

abstract
split(configProperties, data)

Teilt den Eingabedatensatz in Trainings- und Testuntergruppen

  • configProperties: Konfigurationseigenschaften
  • data: Zu teilender Eingabedatensatz

abstract
evaluate(configProperties, model, data)

Wertet ein trainiertes Modell aus und gibt die Auswertungsergebnisse zurück

  • configProperties: Konfigurationseigenschaften
  • model: Ein trainiertes Modell
  • data: Ein DataFrame, bestehend aus Trainings- und Testdaten

Auf dieser Seite

Adobe Summit Banner

A virtual event April 27-28.

Expand your skills and get inspired.

Register for free
Adobe Summit Banner

A virtual event April 27-28.

Expand your skills and get inspired.

Register for free
Adobe Maker Awards Banner

Time to shine!

Apply now for the 2021 Adobe Experience Maker Awards.

Apply now
Adobe Maker Awards Banner

Time to shine!

Apply now for the 2021 Adobe Experience Maker Awards.

Apply now