SDK för modellredigering

Med SDK för modellredigering kan du utveckla anpassade maskininlärningsrecept och funktionsförlopp som kan användas i Adobe Experience Platform Data Science Workspace, som innehåller implementerbara mallar i PySpark och Spark (Scala).

Det här dokumentet innehåller information om de olika klasser som finns i SDK för modellredigering.

DataLoader dataloader

Klassen DataLoader kapslar in allt som rör hämtning, filtrering och returnering av rådata. Exempel på indata är sådana som används för utbildning, poängsättning eller funktionsteknik. Datainläsare utökar den abstrakta klassen DataLoader och måste åsidosätta den abstrakta metoden load.

PySpark

I följande tabell beskrivs de abstrakta metoderna för en PySpark Data Loader-klass:

Metod och beskrivning
Parametrar

load(self, configProperties, spark)

Läsa in och returnera plattformsdata som en Pandas DataFrame

  • self: Självreferens
  • configProperties: Konfilegenskapsmappning
  • spark: Spark-session

Spark

I följande tabell beskrivs de abstrakta metoderna för en Spark Data Loader-klass:

Metod och beskrivning
Parametrar

load(configProperties, sparkSession)

Läs in och returnera plattformsdata som en DataFrame

  • configProperties: Konfilegenskapsmappning
  • sparkSession: Spark-session

Läs in data från en Platform-datauppsättning load-data-from-a-platform-dataset

Följande exempel hämtar Platform-data efter ID och returnerar en DataFrame, där datauppsättnings-ID (datasetId) är en definierad egenskap i konfigurationsfilen.

PySpark

# PySpark

from sdk.data_loader import DataLoader

class MyDataLoader(DataLoader):
    """
    Implementation of DataLoader which loads a DataFrame and prepares data
    """

    def load_dataset(config_properties, spark, task_id):

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
        PLATFORM_SDK_PQS_INTERACTIVE = "interactive"

        # prepare variables
        service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        dataset_id = str(config_properties.get(task_id))

        # validate variables
        for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        # load dataset through Spark session

        query_options = get_query_options(spark.sparkContext)

        pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
            .option(query_options.datasetId(), dataset_id) \
            .load()
        pd.show()

        # return as DataFrame
        return pd

Spark (Scala)

// Spark

package com.adobe.platform.ml

import java.time.LocalDateTime

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column

/**
 * Implementation of DataLoader which loads a DataFrame and prepares data
 */
class MyDataLoader extends DataLoader {

    final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
    final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
    final val PLATFORM_SDK_PQS_BATCH: String = "batch"

    /**
    *
    * @param configProperties - Configuration Properties map
    * @param sparkSession     - SparkSession
    * @return                 - DataFrame which is loaded for training
    */


  def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {

    require(configProperties != null)
    require(sparkSession != null)

    // Read the configs
    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString

    val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
      .option(QSOption.datasetId, dataSetId)
      .load()
    df.show()
    df
    }
}

DataSaver datasaver

Klassen DataSaver kapslar in allt som rör lagring av utdata, inklusive data från poängsättning eller funktionskonstruktion. Datasparare utökar den abstrakta klassen DataSaver och måste åsidosätta den abstrakta metoden save.

PySpark

I följande tabell beskrivs de abstrakta metoderna för en PySpark Data Saver-klass:

Metod och beskrivning
Parametrar

save(self, configProperties, dataframe)

Ta emot utdata som en DataFrame och lagra dem i en plattformsdatauppsättning

  • self: Självreferens
  • configProperties: Konfilegenskapsmappning
  • dataframe: Data som ska lagras i form av en DataFrame

Spark (Scala)

I följande tabell beskrivs de abstrakta metoderna för en Spark Data Saver-klass:

Metod och beskrivning
Parametrar

save(configProperties, dataFrame)

Ta emot utdata som en DataFrame och lagra dem i en plattformsdatauppsättning

  • configProperties: Konfilegenskapsmappning
  • dataFrame: Data som ska lagras i form av en DataFrame

Spara data i en Platform-datauppsättning save-data-to-a-platform-dataset

För att kunna lagra data i en Platform-datauppsättning måste egenskaperna antingen anges eller definieras i konfigurationsfilen:

  • Ett giltigt Platform-datauppsättnings-ID som data ska lagras på
  • Klient-ID som tillhör din organisation

I följande exempel lagras data (prediction) i en Platform-datauppsättning, där datauppsättnings-ID (datasetId) och klient-ID (tenantId) definieras i konfigurationsfilen.

PySpark

# PySpark

from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *


class MyDataSaver(DataSaver):
    """
    Implementation of DataSaver which stores a DataFrame to a Platform dataset
    """

    def save(self, config_properties, prediction):

        # Spark context
        sparkContext = prediction._sc

        # preliminary checks
        if config_properties is None:
            raise ValueError("config_properties parameter is null")
        if prediction is None:
            raise ValueError("prediction parameter is null")
        if sparkContext is None:
            raise ValueError("sparkContext parameter is null")

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"

        # prepare variables
        scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
        tenant_id = str(config_properties.get("tenant_id"))
        timestamp = "2019-01-01 00:00:00"

        service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        # validate variables
       for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        scored_df = prediction.withColumn("date", col("date").cast(StringType()))
        scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
        scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
        scored_df = scored_df.withColumn("_id", lit("empty"))
        scored_df = scored_df.withColumn("eventType", lit("empty")

        # store data into dataset

        query_options = get_query_options(sparkContext)

        scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.datasetId(), scored_dataset_id) \
            .save()

Spark (Scala)

// Spark

package com.adobe.platform.ml

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

/**
 * Implementation of DataSaver which stores a DataFrame to a Platform dataset
 */

class ScoringDataSaver extends DataSaver {

  final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
  final val PLATFORM_SDK_PQS_BATCH: String = "batch"

  /**
    * Method that saves the scoring data into a dataframe
    * @param configProperties  - Configuration Properties map
    * @param dataFrame         - Dataframe with the scoring results
    */

  override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit =  {

    require(configProperties != null)
    require(dataFrame != null)

    val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
    val sparkSession = dataFrame.sparkSession

    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val tenantId:String = configProperties.get("tenantId").getOrElse("")
    val timestamp:String = "2019-01-01 00:00:00"

    val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
    import sparkSession.implicits._

    var df = dataFrame.withColumn("date", $"date".cast("String"))

    var scored_df  = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
    scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
    scored_df = scored_df.withColumn("_id", lit("empty"))
    scored_df = scored_df.withColumn("eventType", lit("empty"))

    scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .save()
    }
}

DatasetTransformer datasettransformer

Klassen DatasetTransformer ändrar och omformar strukturen i en datauppsättning. Sensei Machine Learning Runtime kräver inte att den här komponenten definieras och implementeras baserat på dina krav.

När det gäller en funktionspipeline kan datauppsättningsomvandlare användas tillsammans med en rörledningsfabrik för att förbereda data för funktionskonstruktion.

PySpark

I följande tabell beskrivs klassmetoderna för en PySpark-datamängdstransformeringsklass:

Metod och beskrivning
Parametrar

abstrakt
transform(self, configProperties, dataset)

Tar en datauppsättning som indata och skapar en ny härledd datauppsättning

  • self: Självreferens
  • configProperties: Konfilegenskapsmappning
  • dataset: Indatauppsättningen för omvandling

Spark (Scala)

I följande tabell beskrivs de abstrakta metoderna för en Spark-datamängdstransformatorklass:

Metod och beskrivning
Parametrar

transform(configProperties, dataset)

Tar en datauppsättning som indata och skapar en ny härledd datauppsättning

  • configProperties: Konfilegenskapsmappning
  • dataset: Indatauppsättningen för omvandling

FeaturePipelineFactory featurepipelinefactory

Klassen FeaturePipelineFactory innehåller funktionsextraheringsalgoritmer och definierar faserna i en funktionspipeline från början till slut.

PySpark

I följande tabell beskrivs klassmetoderna för en PySpark FeaturePipelineFactory:

Metod och beskrivning
Parametrar

abstrakt
create_pipeline(self, configProperties)

Skapa och returnera en Spark Pipeline som innehåller en serie Spark-omformare

  • self: Självreferens
  • configProperties: Konfilegenskapsmappning

abstrakt
get_param_map(self, configProperties, sparkSession)

Hämta och returnera parametermappning från konfigurationsegenskaper

  • self: Självreferens
  • configProperties: Konfigurationsegenskaper
  • sparkSession: Spark-session

Spark (Scala)

I följande tabell beskrivs klassmetoderna för en Spark FeaturePipelineFactory:

Metod och beskrivning
Parametrar

abstrakt
createPipeline(configProperties)

Skapa och returnera en pipeline som innehåller en serie transformerare

  • configProperties: Konfilegenskapsmappning

abstrakt
getParamMap(configProperties, sparkSession)

Hämta och returnera parametermappning från konfigurationsegenskaper

  • configProperties: Konfigurationsegenskaper
  • sparkSession: Spark-session

PipelineFactory pipelinefactory

Klassen PipelineFactory kapslar metoder och definitioner för modellutbildning och poängsättning, där utbildningslogik och algoritmer definieras i form av en Spark-pipeline.

PySpark

I följande tabell beskrivs klassmetoderna för en PySpark PipelineFactory:

Metod och beskrivning
Parametrar

abstrakt
apply(self, configProperties)

Skapa och returnera en Spark Pipeline som innehåller logik och algoritm för modellutbildning och poängsättning

  • self: Självreferens
  • configProperties: Konfigurationsegenskaper

abstrakt
train(self, configProperties, dataframe)

Returnera en anpassad pipeline som innehåller logik och algoritm för utbildning av en modell. Den här metoden krävs inte om en Spark Pipeline används

  • self: Självreferens
  • configProperties: Konfigurationsegenskaper
  • dataframe: Funktionsuppsättning för utbildningsmaterial

abstrakt
score(self, configProperties, dataframe, model)

Poäng med hjälp av den tränade modellen och returnera resultatet

  • self: Självreferens
  • configProperties: Konfigurationsegenskaper
  • dataframe: Indatauppsättning för poängsättning
  • model: En tränad modell som används för poängsättning

abstrakt
get_param_map(self, configProperties, sparkSession)

Hämta och returnera parametermappning från konfigurationsegenskaper

  • self: Självreferens
  • configProperties: Konfigurationsegenskaper
  • sparkSession: Spark-session

Spark (Scala)

I följande tabell beskrivs klassmetoderna för en Spark PipelineFactory:

Metod och beskrivning
Parametrar

abstrakt
apply(configProperties)

Skapa och returnera en pipeline som innehåller logik och algoritm för modellutbildning och poängsättning

  • configProperties: Konfigurationsegenskaper

abstrakt
getParamMap(configProperties, sparkSession)

Hämta och returnera parametermappning från konfigurationsegenskaper

  • configProperties: Konfigurationsegenskaper
  • sparkSession: Spark-session

MLEvaluator mlevaluator

Klassen MLEvaluator innehåller metoder för att definiera mätvärden för utvärdering och för att fastställa data för utbildning och testning.

PySpark

I följande tabell beskrivs klassmetoderna för en PySpark MLEvaluator:

Metod och beskrivning
Parametrar

abstrakt
split(self, configProperties, dataframe)

Delar in datauppsättningen i underuppsättningar för utbildning och testning

  • self: Självreferens
  • configProperties: Konfigurationsegenskaper
  • dataframe: Indatauppsättning som ska delas

abstrakt
evaluate(self, dataframe, model, configProperties)

Utvärderar en utbildad modell och returnerar utvärderingsresultaten

  • self: Självreferens
  • dataframe: En DataFrame som består av utbildnings- och testdata
  • model: En tränad modell
  • configProperties: Konfigurationsegenskaper

Spark (Scala)

I följande tabell beskrivs klassmetoderna för en Spark MLEvaluator:

Metod och beskrivning
Parametrar

abstrakt
split(configProperties, data)

Delar in datauppsättningen i underuppsättningar för utbildning och testning

  • configProperties: Konfigurationsegenskaper
  • data: Indatauppsättning som ska delas

abstrakt
evaluate(configProperties, model, data)

Utvärderar en utbildad modell och returnerar utvärderingsresultaten

  • configProperties: Konfigurationsegenskaper
  • model: En tränad modell
  • data: En DataFrame som består av utbildnings- och testdata
recommendation-more-help
cc79fe26-64da-411e-a6b9-5b650f53e4e9