SDK de criação de modelo

NOTE
O Data Science Workspace não está mais disponível para compra.
Esta documentação destina-se aos clientes existentes com direitos anteriores ao Data Science Workspace.

O SDK de Criação de Modelo permite desenvolver Fórmulas de aprendizado de máquina personalizadas e Pipelines de Recursos que podem ser usados no Data Science Workspace Adobe Experience Platform, fornecendo modelos implementáveis em PySpark e Spark (Scala).

Este documento fornece informações sobre as várias classes encontradas no SDK de criação de modelo.

DataLoader dataloader

A classe DataLoader encapsula tudo relacionado à recuperação, filtragem e retorno de dados brutos de entrada. Exemplos de dados de entrada incluem aqueles para treinamento, pontuação ou engenharia de recursos. Os carregadores de dados estendem a classe abstrata DataLoader e devem substituir o método abstrato load.

PySpark

A tabela a seguir descreve os métodos abstratos de uma classe PySpark Data Loader:

Método e descrição
Parâmetros

load(self, configProperties, spark)

Carregar e retornar dados da Platform como um DataFrame Pandas

  • self: autorreferência
  • configProperties: mapa de propriedades de configuração
  • spark: Sessão do Spark

Spark

A tabela a seguir descreve os métodos abstratos de uma classe de Carregador de Dados Spark:

Método e descrição
Parâmetros

load(configProperties, sparkSession)

Carregar e retornar dados Platform como um DataFrame

  • configProperties: Mapa de propriedades da configuração
  • sparkSession: sessão Spark

Carregar dados de um Platform conjunto de dados load-data-from-a-platform-dataset

O exemplo a seguir recupera dados de Platform por ID e retorna um DataFrame, no qual a ID do conjunto de dados (datasetId) é uma propriedade definida no arquivo de configuração.

PySpark

# PySpark

from sdk.data_loader import DataLoader

class MyDataLoader(DataLoader):
    """
    Implementation of DataLoader which loads a DataFrame and prepares data
    """

    def load_dataset(config_properties, spark, task_id):

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
        PLATFORM_SDK_PQS_INTERACTIVE = "interactive"

        # prepare variables
        service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        dataset_id = str(config_properties.get(task_id))

        # validate variables
        for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        # load dataset through Spark session

        query_options = get_query_options(spark.sparkContext)

        pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
            .option(query_options.datasetId(), dataset_id) \
            .load()
        pd.show()

        # return as DataFrame
        return pd

Spark (Scala)

// Spark

package com.adobe.platform.ml

import java.time.LocalDateTime

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column

/**
 * Implementation of DataLoader which loads a DataFrame and prepares data
 */
class MyDataLoader extends DataLoader {

    final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
    final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
    final val PLATFORM_SDK_PQS_BATCH: String = "batch"

    /**
    *
    * @param configProperties - Configuration Properties map
    * @param sparkSession     - SparkSession
    * @return                 - DataFrame which is loaded for training
    */


  def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {

    require(configProperties != null)
    require(sparkSession != null)

    // Read the configs
    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString

    val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
      .option(QSOption.datasetId, dataSetId)
      .load()
    df.show()
    df
    }
}

DataSaver datasaver

A classe DataSaver encapsula tudo relacionado ao armazenamento de dados de saída, incluindo aqueles da pontuação ou engenharia de recursos. Os salvadores de dados estendem a classe abstrata DataSaver e devem substituir o método abstrato save.

PySpark

A tabela a seguir descreve os métodos abstratos de uma classe de Economizador de Dados PySpark:

Método e descrição
Parâmetros

save(self, configProperties, dataframe)

Recebe dados de saída como um DataFrame e os armazena em um conjunto de dados da plataforma

  • self: auto referenciação
  • configProperties: Mapa de propriedades da configuração
  • dataframe: dados a serem armazenados no formato de um DataFrame

Spark (Scala)

A tabela a seguir descreve os métodos abstratos de uma classe de Economizador de Dados Spark:

Método e descrição
Parâmetros

save(configProperties, dataFrame)

Recebe dados de saída como um DataFrame e os armazena em um conjunto de dados da plataforma

  • configProperties: mapa de propriedades de configuração
  • dataFrame: dados a serem armazenados no formato de um DataFrame

Salvar dados em um conjunto de dados Platform save-data-to-a-platform-dataset

Para armazenar dados em um conjunto de dados Platform, as propriedades devem ser fornecidas ou definidas no arquivo de configuração:

  • Uma ID de conjunto de dados Platform válida para a qual os dados serão armazenados
  • A ID do locatário pertencente à sua organização

Os exemplos a seguir armazenam dados (prediction) em um conjunto de dados Platform, em que a ID do conjunto de dados (datasetId) e a ID do locatário (tenantId) são propriedades definidas no arquivo de configuração.

PySpark

# PySpark

from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *


class MyDataSaver(DataSaver):
    """
    Implementation of DataSaver which stores a DataFrame to a Platform dataset
    """

    def save(self, config_properties, prediction):

        # Spark context
        sparkContext = prediction._sc

        # preliminary checks
        if config_properties is None:
            raise ValueError("config_properties parameter is null")
        if prediction is None:
            raise ValueError("prediction parameter is null")
        if sparkContext is None:
            raise ValueError("sparkContext parameter is null")

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"

        # prepare variables
        scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
        tenant_id = str(config_properties.get("tenant_id"))
        timestamp = "2019-01-01 00:00:00"

        service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        # validate variables
       for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        scored_df = prediction.withColumn("date", col("date").cast(StringType()))
        scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
        scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
        scored_df = scored_df.withColumn("_id", lit("empty"))
        scored_df = scored_df.withColumn("eventType", lit("empty")

        # store data into dataset

        query_options = get_query_options(sparkContext)

        scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.datasetId(), scored_dataset_id) \
            .save()

Spark (Scala)

// Spark

package com.adobe.platform.ml

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

/**
 * Implementation of DataSaver which stores a DataFrame to a Platform dataset
 */

class ScoringDataSaver extends DataSaver {

  final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
  final val PLATFORM_SDK_PQS_BATCH: String = "batch"

  /**
    * Method that saves the scoring data into a dataframe
    * @param configProperties  - Configuration Properties map
    * @param dataFrame         - Dataframe with the scoring results
    */

  override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit =  {

    require(configProperties != null)
    require(dataFrame != null)

    val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
    val sparkSession = dataFrame.sparkSession

    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val tenantId:String = configProperties.get("tenantId").getOrElse("")
    val timestamp:String = "2019-01-01 00:00:00"

    val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
    import sparkSession.implicits._

    var df = dataFrame.withColumn("date", $"date".cast("String"))

    var scored_df  = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
    scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
    scored_df = scored_df.withColumn("_id", lit("empty"))
    scored_df = scored_df.withColumn("eventType", lit("empty"))

    scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .save()
    }
}

DatasetTransformer datasettransformer

A classe DatasetTransformer modifica e transforma a estrutura de um conjunto de dados. O Sensei Machine Learning Runtime não requer que esse componente seja definido e é implementado com base em seus requisitos.

No que diz respeito a um pipeline de recursos, os transformadores de conjunto de dados podem ser usados em conjunto com uma fábrica de pipelines de recursos para preparar dados para a engenharia de recursos.

PySpark

A tabela a seguir descreve os métodos de classe de uma classe de transformador do conjunto de dados do PySpark:

Método e descrição
Parâmetros

abstract
transform(self, configProperties, dataset)

Usa um conjunto de dados como entrada e saída para um novo conjunto de dados derivado

  • self: autorreferência
  • configProperties: mapa de propriedades de configuração
  • dataset: o conjunto de dados de entrada para transformação

Spark (Scala)

A tabela a seguir descreve os métodos abstratos de uma classe de transformador do conjunto de dados Spark:

Método e descrição
Parâmetros

transform(configProperties, dataset)

Usa um conjunto de dados como entrada e saída para um novo conjunto de dados derivado

  • configProperties: mapa de propriedades de configuração
  • dataset: o conjunto de dados de entrada para transformação

FeaturePipelineFactory featurepipelinefactory

A classe FeaturePipelineFactory contém algoritmos de extração de recursos e define os estágios de um Pipeline de recursos do início ao fim.

PySpark

A tabela a seguir descreve os métodos de classe de um FeaturePipelineFactory PySpark:

Método e descrição
Parâmetros

abstract
create_pipeline(self, configProperties)

Criar e retornar um Spark Pipeline que contenha uma série de Spark Transformers

  • self: autorreferência
  • configProperties: mapa de propriedades de configuração

abstrair
get_param_map(self, configProperties, sparkSession)

Recuperar e retornar o mapa do parâmetro a partir das propriedades de configuração

  • self: auto referenciação
  • configProperties: propriedades de configuração
  • sparkSession: Sessão do Spark

Spark (Scala)

A tabela a seguir descreve os métodos de classe de uma FeaturePipelineFactory Spark:

Método e descrição
Parâmetros

abstract
createPipeline(configProperties)

Criar e retornar um pipeline que contenha uma série de transformadores

  • configProperties: mapa de propriedades de configuração

abstract
getParamMap(configProperties, sparkSession)

Recuperar e retornar o mapa de parâmetros das propriedades de configuração

  • configProperties: propriedades de configuração
  • sparkSession: Sessão do Spark

PipelineFactory pipelinefactory

A classe PipelineFactory encapsula métodos e definições para modelos de treinamento e pontuação, onde treinamento lógica e algoritmos são definidos na forma de um Spark Pipeline.

PySpark

A tabela a seguir descreve os métodos de classe de um PySpark PipelineFactory:

Método e descrição
Parâmetros

abstract
apply(self, configProperties)

Criar e retornar um Spark Pipeline que contenha a lógica e o algoritmo para treinamento e pontuação do modelo

  • self: autorreferência
  • configProperties: propriedades de configuração

abstract
train(self, configProperties, dataframe)

Retorne um Pipeline personalizado que contenha a lógica e o algoritmo para treinar um modelo. Este método não é necessário se um pipeline Spark for usado

  • self: autorreferência
  • configProperties: propriedades de configuração
  • dataframe: conjunto de dados de recursos para entrada de treinamento

abstract
score(self, configProperties, dataframe, model)

Pontuar usando o modelo treinado e retornar os resultados

  • self: autorreferência
  • configProperties: propriedades de configuração
  • dataframe: Conjunto de dados de entrada para pontuação
  • model: um modelo treinado usado para pontuação

abstract
get_param_map(self, configProperties, sparkSession)

Recuperar e retornar o mapa de parâmetros das propriedades de configuração

  • self: autorreferência
  • configProperties: propriedades de configuração
  • sparkSession: Sessão do Spark

Spark (Scala)

A tabela a seguir descreve os métodos de classe de um PipelineFactory Spark:

Método e descrição
Parâmetros

abstract
apply(configProperties)

Criar e retornar um pipeline que contenha a lógica e o algoritmo para treinamento e pontuação do modelo

  • configProperties: propriedades de configuração

abstract
getParamMap(configProperties, sparkSession)

Recuperar e retornar o mapa de parâmetros das propriedades de configuração

  • configProperties: propriedades de configuração
  • sparkSession: Sessão do Spark

MLEvaluator mlevaluator

A classe MLEvaluator fornece métodos para definir métricas de avaliação e determinar conjuntos de dados de treinamento e teste.

PySpark

A tabela a seguir descreve os métodos de classe de um MLEvaluator PySpark:

Método e descrição
Parâmetros

abstract
split(self, configProperties, dataframe)

Divide o conjunto de dados de entrada em subconjuntos de treinamento e teste

  • self: auto referenciação
  • configProperties: propriedades de configuração
  • dataframe: Conjunto de dados de entrada a ser dividido

abstract
evaluate(self, dataframe, model, configProperties)

Avalia um modelo treinado e retorna os resultados da avaliação

  • self: autorreferência
  • dataframe: um DataFrame que consiste em dados de treinamento e teste
  • model: um modelo treinado
  • configProperties: propriedades de configuração

Spark (Scala)

A tabela a seguir descreve os métodos de classe de um MLEvaluator Spark:

Método e descrição
Parâmetros

abstract
split(configProperties, data)

Divide o conjunto de dados de entrada em subconjuntos de treinamento e teste

  • configProperties: propriedades de configuração
  • data: a entrada conjunto de dados ser dividida

abstrair
evaluate(configProperties, model, data)

Avalia um modelo treinado e retorna os resultados da avaliação

  • configProperties: propriedades de configuração
  • model: um modelo treinado
  • data: um DataFrame que consiste em dados de treinamento e teste
recommendation-more-help
cc79fe26-64da-411e-a6b9-5b650f53e4e9