SDK de criação de modelo

O SDK de criação de modelo permite desenvolver receitas e pipelines de recursos de aprendizado de máquina personalizados que podem ser usados no Adobe Experience Platform Data Science Workspace, fornecendo modelos implementáveis no PySpark e Spark (Scala).

Este documento fornece informações sobre as várias classes encontradas no SDK de criação de modelo.

DataLoader

A classe DataLoader encapsula tudo relacionado à recuperação, filtragem e retorno de dados brutos de entrada. Exemplos de dados de entrada incluem aqueles para treinamento, pontuação ou engenharia de recursos. Carregadores de dados estendem a classe abstrata DataLoader e devem substituir o método abstrato load.

PySpark

A tabela a seguir descreve os métodos abstratos de uma classe PySpark Data Loader:

Método e descrição Parâmetros

load(self, configProperties, spark)

Carregar e retornar dados da Platform como um DataFrame Pandas

  • self: autorreferência
  • configProperties: mapa de propriedades de configuração
  • spark: Sessão do Spark

Spark

A tabela a seguir descreve os métodos abstratos de um Spark Classe do Carregador de Dados:

Método e descrição Parâmetros

load(configProperties, sparkSession)

Carregar e retornar dados da Platform como um DataFrame

  • configProperties: mapa de propriedades de configuração
  • sparkSession: Sessão do Spark

Carregar dados de um Platform conjunto de dados

O exemplo a seguir recupera Platform dados por ID e retorna um DataFrame, onde a ID do conjunto de dados (datasetId) é uma propriedade definida no arquivo de configuração.

PySpark

# PySpark

from sdk.data_loader import DataLoader

class MyDataLoader(DataLoader):
    """
    Implementation of DataLoader which loads a DataFrame and prepares data
    """

    def load_dataset(config_properties, spark, task_id):

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
        PLATFORM_SDK_PQS_INTERACTIVE = "interactive"

        # prepare variables
        service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        dataset_id = str(config_properties.get(task_id))

        # validate variables
        for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        # load dataset through Spark session

        query_options = get_query_options(spark.sparkContext)

        pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
            .option(query_options.datasetId(), dataset_id) \
            .load()
        pd.show()

        # return as DataFrame
        return pd

Spark (Scala)

// Spark

package com.adobe.platform.ml

import java.time.LocalDateTime

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column

/**
 * Implementation of DataLoader which loads a DataFrame and prepares data
 */
class MyDataLoader extends DataLoader {

    final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
    final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
    final val PLATFORM_SDK_PQS_BATCH: String = "batch"

    /**
    *
    * @param configProperties - Configuration Properties map
    * @param sparkSession     - SparkSession
    * @return                 - DataFrame which is loaded for training
    */


  def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {

    require(configProperties != null)
    require(sparkSession != null)

    // Read the configs
    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString

    val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
      .option(QSOption.datasetId, dataSetId)
      .load()
    df.show()
    df
    }
}

DataSaver

A classe DataSaver encapsula tudo relacionado ao armazenamento de dados de saída, incluindo aqueles da pontuação ou engenharia de recursos. Os economizadores de dados estendem a classe abstrata DataSaver e devem substituir o método abstrato save.

PySpark

A tabela a seguir descreve os métodos abstratos de um PySpark Classe de Economia de Dados:

Método e descrição Parâmetros

save(self, configProperties, dataframe)

Recebe dados de saída como um DataFrame e os armazena em um conjunto de dados da plataforma

  • self: autorreferência
  • configProperties: mapa de propriedades de configuração
  • dataframe: dados a serem armazenados no formato de um DataFrame

Spark (Scala)

A tabela a seguir descreve os métodos abstratos de um Spark Classe de Economia de Dados:

Método e descrição Parâmetros

save(configProperties, dataFrame)

Recebe dados de saída como um DataFrame e os armazena em um conjunto de dados da plataforma

  • configProperties: mapa de propriedades de configuração
  • dataFrame: dados a serem armazenados no formato de um DataFrame

Salvar dados em um Platform conjunto de dados

Para armazenar dados em um Platform conjunto de dados, as propriedades devem ser fornecidas ou definidas no arquivo de configuração:

  • Um válido Platform ID do conjunto de dados no qual os dados serão armazenados
  • A ID do locatário pertencente à sua organização

Os exemplos a seguir armazenam dados (prediction) em um Platform conjunto de dados, em que a ID do conjunto de dados (datasetId) e ID do locatário (tenantId) são propriedades definidas no arquivo de configuração.

PySpark

# PySpark

from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *


class MyDataSaver(DataSaver):
    """
    Implementation of DataSaver which stores a DataFrame to a Platform dataset
    """

    def save(self, config_properties, prediction):

        # Spark context
        sparkContext = prediction._sc

        # preliminary checks
        if config_properties is None:
            raise ValueError("config_properties parameter is null")
        if prediction is None:
            raise ValueError("prediction parameter is null")
        if sparkContext is None:
            raise ValueError("sparkContext parameter is null")

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"

        # prepare variables
        scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
        tenant_id = str(config_properties.get("tenant_id"))
        timestamp = "2019-01-01 00:00:00"

        service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        # validate variables
       for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        scored_df = prediction.withColumn("date", col("date").cast(StringType()))
        scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
        scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
        scored_df = scored_df.withColumn("_id", lit("empty"))
        scored_df = scored_df.withColumn("eventType", lit("empty")

        # store data into dataset

        query_options = get_query_options(sparkContext)

        scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.datasetId(), scored_dataset_id) \
            .save()

Spark (Scala)

// Spark

package com.adobe.platform.ml

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

/**
 * Implementation of DataSaver which stores a DataFrame to a Platform dataset
 */

class ScoringDataSaver extends DataSaver {

  final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
  final val PLATFORM_SDK_PQS_BATCH: String = "batch"

  /**
    * Method that saves the scoring data into a dataframe
    * @param configProperties  - Configuration Properties map
    * @param dataFrame         - Dataframe with the scoring results
    */

  override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit =  {

    require(configProperties != null)
    require(dataFrame != null)

    val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
    val sparkSession = dataFrame.sparkSession

    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val tenantId:String = configProperties.get("tenantId").getOrElse("")
    val timestamp:String = "2019-01-01 00:00:00"

    val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
    import sparkSession.implicits._

    var df = dataFrame.withColumn("date", $"date".cast("String"))

    var scored_df  = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
    scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
    scored_df = scored_df.withColumn("_id", lit("empty"))
    scored_df = scored_df.withColumn("eventType", lit("empty"))

    scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .save()
    }
}

DatasetTransformer

A classe DatasetTransformer modifica e transforma a estrutura de um conjunto de dados. A variável Sensei Machine Learning Runtime não requer que esse componente seja definido e é implementado com base em suas necessidades.

No que diz respeito a um pipeline de recursos, os transformadores de conjunto de dados podem ser usados em conjunto com uma fábrica de pipelines de recursos para preparar dados para a engenharia de recursos.

PySpark

A tabela a seguir descreve os métodos de classe de uma classe de transformador do conjunto de dados do PySpark:

Método e descrição Parâmetros

abstract
transform(self, configProperties, dataset)

Usa um conjunto de dados como entrada e saída para um novo conjunto de dados derivado

  • self: autorreferência
  • configProperties: mapa de propriedades de configuração
  • dataset: o conjunto de dados de entrada para transformação

Spark (Scala)

A tabela a seguir descreve os métodos abstratos de um Spark classe do transformador do conjunto de dados:

Método e descrição Parâmetros

transform(configProperties, dataset)

Usa um conjunto de dados como entrada e saída para um novo conjunto de dados derivado

  • configProperties: mapa de propriedades de configuração
  • dataset: o conjunto de dados de entrada para transformação

FeaturePipelineFactory

A classe FeaturePipelineFactory contém algoritmos de extração de recursos e define os estágios de um Pipeline de recursos do início ao fim.

PySpark

A tabela a seguir descreve os métodos de classe de um FeaturePipelineFactory PySpark:

Método e descrição Parâmetros

abstract
create_pipeline(self, configProperties)

Criar e retornar um Spark Pipeline que contenha uma série de Spark Transformers

  • self: autorreferência
  • configProperties: mapa de propriedades de configuração

abstract
get_param_map(self, configProperties, sparkSession)

Recuperar e retornar o mapa de parâmetros das propriedades de configuração

  • self: autorreferência
  • configProperties: propriedades de configuração
  • sparkSession: Sessão do Spark

Spark (Scala)

A tabela a seguir descreve os métodos de classe de um Spark FeaturePipelineFactory:

Método e descrição Parâmetros

abstract
createPipeline(configProperties)

Criar e retornar um pipeline que contenha uma série de transformadores

  • configProperties: mapa de propriedades de configuração

abstract
getParamMap(configProperties, sparkSession)

Recuperar e retornar o mapa de parâmetros das propriedades de configuração

  • configProperties: propriedades de configuração
  • sparkSession: Sessão do Spark

PipelineFactory

A classe PipelineFactory encapsula métodos e definições para treinamento e pontuação de modelos, em que a lógica e os algoritmos de treinamento são definidos no formato de um Spark Pipeline.

PySpark

A tabela a seguir descreve os métodos de classe de um PipelineFactory PySpark:

Método e descrição Parâmetros

abstract
apply(self, configProperties)

Criar e retornar um Spark Pipeline que contenha a lógica e o algoritmo para treinamento e pontuação do modelo

  • self: autorreferência
  • configProperties: propriedades de configuração

abstract
train(self, configProperties, dataframe)

Retorne um Pipeline personalizado que contenha a lógica e o algoritmo para treinar um modelo. Este método não é necessário se um pipeline Spark for usado

  • self: autorreferência
  • configProperties: propriedades de configuração
  • dataframe: conjunto de dados de recurso para entrada de treinamento

abstract
score(self, configProperties, dataframe, model)

Pontuar usando o modelo treinado e retornar os resultados

  • self: autorreferência
  • configProperties: propriedades de configuração
  • dataframe: Conjunto de dados de entrada para pontuação
  • model: um modelo treinado usado para pontuação

abstract
get_param_map(self, configProperties, sparkSession)

Recuperar e retornar o mapa de parâmetros das propriedades de configuração

  • self: autorreferência
  • configProperties: propriedades de configuração
  • sparkSession: Sessão do Spark

Spark (Scala)

A tabela a seguir descreve os métodos de classe de um Spark PipelineFactory:

Método e descrição Parâmetros

abstract
apply(configProperties)

Criar e retornar um pipeline que contenha a lógica e o algoritmo para treinamento e pontuação do modelo

  • configProperties: propriedades de configuração

abstract
getParamMap(configProperties, sparkSession)

Recuperar e retornar o mapa de parâmetros das propriedades de configuração

  • configProperties: propriedades de configuração
  • sparkSession: Sessão do Spark

MLEvaluator

A classe MLEvaluator fornece métodos para definir métricas de avaliação e determinar conjuntos de dados de treinamento e teste.

PySpark

A tabela a seguir descreve os métodos de classe de um MLEvaluator PySpark:

Método e descrição Parâmetros

abstract
split(self, configProperties, dataframe)

Divide o conjunto de dados de entrada em subconjuntos de treinamento e teste

  • self: autorreferência
  • configProperties: propriedades de configuração
  • dataframe: Conjunto de dados de entrada a ser dividido

abstract
evaluate(self, dataframe, model, configProperties)

Avalia um modelo treinado e retorna os resultados da avaliação

  • self: autorreferência
  • dataframe: um DataFrame que consiste em dados de treinamento e teste
  • model: um modelo treinado
  • configProperties: propriedades de configuração

Spark (Scala)

A tabela a seguir descreve os métodos de classe de um Spark Avaliador MLE:

Método e descrição Parâmetros

abstract
split(configProperties, data)

Divide o conjunto de dados de entrada em subconjuntos de treinamento e teste

  • configProperties: propriedades de configuração
  • data: Conjunto de dados de entrada a ser dividido

abstract
evaluate(configProperties, model, data)

Avalia um modelo treinado e retorna os resultados da avaliação

  • configProperties: propriedades de configuração
  • model: um modelo treinado
  • data: um DataFrame que consiste em dados de treinamento e teste

Nesta página