SDK de criação de modelo
O SDK de Criação de Modelo permite desenvolver Fórmulas de aprendizado de máquina personalizadas e Pipelines de Recursos que podem ser usados no Data Science Workspace Adobe Experience Platform, fornecendo modelos implementáveis em PySpark e Spark (Scala).
Este documento fornece informações sobre as várias classes encontradas no SDK de criação de modelo.
DataLoader dataloader
A classe DataLoader encapsula tudo relacionado à recuperação, filtragem e retorno de dados brutos de entrada. Exemplos de dados de entrada incluem aqueles para treinamento, pontuação ou engenharia de recursos. Os carregadores de dados estendem a classe abstrata DataLoader
e devem substituir o método abstrato load
.
PySpark
A tabela a seguir descreve os métodos abstratos de uma classe PySpark Data Loader:
load(self, configProperties, spark)
Carregar e retornar dados da Platform como um DataFrame Pandas
self
: autorreferênciaconfigProperties
: mapa de propriedades de configuraçãospark
: Sessão do Spark
Spark
A tabela a seguir descreve os métodos abstratos de uma classe de Carregador de Dados Spark:
load(configProperties, sparkSession)
Carregar e retornar dados Platform como um DataFrame
configProperties
: Mapa de propriedades da configuraçãosparkSession
: sessão Spark
Carregar dados de um Platform conjunto de dados load-data-from-a-platform-dataset
O exemplo a seguir recupera dados de Platform por ID e retorna um DataFrame, no qual a ID do conjunto de dados (datasetId
) é uma propriedade definida no arquivo de configuração.
PySpark
# PySpark
from sdk.data_loader import DataLoader
class MyDataLoader(DataLoader):
"""
Implementation of DataLoader which loads a DataFrame and prepares data
"""
def load_dataset(config_properties, spark, task_id):
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
PLATFORM_SDK_PQS_INTERACTIVE = "interactive"
# prepare variables
service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
dataset_id = str(config_properties.get(task_id))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
# load dataset through Spark session
query_options = get_query_options(spark.sparkContext)
pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
.option(query_options.datasetId(), dataset_id) \
.load()
pd.show()
# return as DataFrame
return pd
Spark (Scala)
// Spark
package com.adobe.platform.ml
import java.time.LocalDateTime
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column
/**
* Implementation of DataLoader which loads a DataFrame and prepares data
*/
class MyDataLoader extends DataLoader {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
*
* @param configProperties - Configuration Properties map
* @param sparkSession - SparkSession
* @return - DataFrame which is loaded for training
*/
def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
require(configProperties != null)
require(sparkSession != null)
// Read the configs
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
.option(QSOption.datasetId, dataSetId)
.load()
df.show()
df
}
}
DataSaver datasaver
A classe DataSaver encapsula tudo relacionado ao armazenamento de dados de saída, incluindo aqueles da pontuação ou engenharia de recursos. Os salvadores de dados estendem a classe abstrata DataSaver
e devem substituir o método abstrato save
.
PySpark
A tabela a seguir descreve os métodos abstratos de uma classe de Economizador de Dados PySpark:
save(self, configProperties, dataframe)
Recebe dados de saída como um DataFrame e os armazena em um conjunto de dados da plataforma
self
: auto referenciaçãoconfigProperties
: Mapa de propriedades da configuraçãodataframe
: dados a serem armazenados no formato de um DataFrame
Spark (Scala)
A tabela a seguir descreve os métodos abstratos de uma classe de Economizador de Dados Spark:
save(configProperties, dataFrame)
Recebe dados de saída como um DataFrame e os armazena em um conjunto de dados da plataforma
configProperties
: mapa de propriedades de configuraçãodataFrame
: dados a serem armazenados no formato de um DataFrame
Salvar dados em um conjunto de dados Platform save-data-to-a-platform-dataset
Para armazenar dados em um conjunto de dados Platform, as propriedades devem ser fornecidas ou definidas no arquivo de configuração:
- Uma ID de conjunto de dados Platform válida para a qual os dados serão armazenados
- A ID do locatário pertencente à sua organização
Os exemplos a seguir armazenam dados (prediction
) em um conjunto de dados Platform, em que a ID do conjunto de dados (datasetId
) e a ID do locatário (tenantId
) são propriedades definidas no arquivo de configuração.
PySpark
# PySpark
from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *
class MyDataSaver(DataSaver):
"""
Implementation of DataSaver which stores a DataFrame to a Platform dataset
"""
def save(self, config_properties, prediction):
# Spark context
sparkContext = prediction._sc
# preliminary checks
if config_properties is None:
raise ValueError("config_properties parameter is null")
if prediction is None:
raise ValueError("prediction parameter is null")
if sparkContext is None:
raise ValueError("sparkContext parameter is null")
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
# prepare variables
scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
tenant_id = str(config_properties.get("tenant_id"))
timestamp = "2019-01-01 00:00:00"
service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
scored_df = prediction.withColumn("date", col("date").cast(StringType()))
scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty")
# store data into dataset
query_options = get_query_options(sparkContext)
scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.datasetId(), scored_dataset_id) \
.save()
Spark (Scala)
// Spark
package com.adobe.platform.ml
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType
/**
* Implementation of DataSaver which stores a DataFrame to a Platform dataset
*/
class ScoringDataSaver extends DataSaver {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
* Method that saves the scoring data into a dataframe
* @param configProperties - Configuration Properties map
* @param dataFrame - Dataframe with the scoring results
*/
override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit = {
require(configProperties != null)
require(dataFrame != null)
val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
val sparkSession = dataFrame.sparkSession
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val tenantId:String = configProperties.get("tenantId").getOrElse("")
val timestamp:String = "2019-01-01 00:00:00"
val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
import sparkSession.implicits._
var df = dataFrame.withColumn("date", $"date".cast("String"))
var scored_df = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty"))
scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.save()
}
}
DatasetTransformer datasettransformer
A classe DatasetTransformer modifica e transforma a estrutura de um conjunto de dados. O Sensei Machine Learning Runtime não requer que esse componente seja definido e é implementado com base em seus requisitos.
No que diz respeito a um pipeline de recursos, os transformadores de conjunto de dados podem ser usados em conjunto com uma fábrica de pipelines de recursos para preparar dados para a engenharia de recursos.
PySpark
A tabela a seguir descreve os métodos de classe de uma classe de transformador do conjunto de dados do PySpark:
abstracttransform(self, configProperties, dataset)
Usa um conjunto de dados como entrada e saída para um novo conjunto de dados derivado
self
: autorreferênciaconfigProperties
: mapa de propriedades de configuraçãodataset
: o conjunto de dados de entrada para transformação
Spark (Scala)
A tabela a seguir descreve os métodos abstratos de uma classe de transformador do conjunto de dados Spark:
transform(configProperties, dataset)
Usa um conjunto de dados como entrada e saída para um novo conjunto de dados derivado
configProperties
: mapa de propriedades de configuraçãodataset
: o conjunto de dados de entrada para transformação
FeaturePipelineFactory featurepipelinefactory
A classe FeaturePipelineFactory contém algoritmos de extração de recursos e define os estágios de um Pipeline de recursos do início ao fim.
PySpark
A tabela a seguir descreve os métodos de classe de um FeaturePipelineFactory PySpark:
abstractcreate_pipeline(self, configProperties)
Criar e retornar um Spark Pipeline que contenha uma série de Spark Transformers
self
: autorreferênciaconfigProperties
: mapa de propriedades de configuração
abstrairget_param_map(self, configProperties, sparkSession)
Recuperar e retornar o mapa do parâmetro a partir das propriedades de configuração
self
: auto referenciaçãoconfigProperties
: propriedades de configuraçãosparkSession
: Sessão do Spark
Spark (Scala)
A tabela a seguir descreve os métodos de classe de uma FeaturePipelineFactory Spark:
abstractcreatePipeline(configProperties)
Criar e retornar um pipeline que contenha uma série de transformadores
configProperties
: mapa de propriedades de configuração
abstractgetParamMap(configProperties, sparkSession)
Recuperar e retornar o mapa de parâmetros das propriedades de configuração
configProperties
: propriedades de configuraçãosparkSession
: Sessão do Spark
PipelineFactory pipelinefactory
A classe PipelineFactory encapsula métodos e definições para modelos de treinamento e pontuação, onde treinamento lógica e algoritmos são definidos na forma de um Spark Pipeline.
PySpark
A tabela a seguir descreve os métodos de classe de um PySpark PipelineFactory:
abstractapply(self, configProperties)
Criar e retornar um Spark Pipeline que contenha a lógica e o algoritmo para treinamento e pontuação do modelo
self
: autorreferênciaconfigProperties
: propriedades de configuração
abstracttrain(self, configProperties, dataframe)
Retorne um Pipeline personalizado que contenha a lógica e o algoritmo para treinar um modelo. Este método não é necessário se um pipeline Spark for usado
self
: autorreferênciaconfigProperties
: propriedades de configuraçãodataframe
: conjunto de dados de recursos para entrada de treinamento
abstractscore(self, configProperties, dataframe, model)
Pontuar usando o modelo treinado e retornar os resultados
self
: autorreferênciaconfigProperties
: propriedades de configuraçãodataframe
: Conjunto de dados de entrada para pontuaçãomodel
: um modelo treinado usado para pontuação
abstractget_param_map(self, configProperties, sparkSession)
Recuperar e retornar o mapa de parâmetros das propriedades de configuração
self
: autorreferênciaconfigProperties
: propriedades de configuraçãosparkSession
: Sessão do Spark
Spark (Scala)
A tabela a seguir descreve os métodos de classe de um PipelineFactory Spark:
abstractapply(configProperties)
Criar e retornar um pipeline que contenha a lógica e o algoritmo para treinamento e pontuação do modelo
configProperties
: propriedades de configuração
abstractgetParamMap(configProperties, sparkSession)
Recuperar e retornar o mapa de parâmetros das propriedades de configuração
configProperties
: propriedades de configuraçãosparkSession
: Sessão do Spark
MLEvaluator mlevaluator
A classe MLEvaluator fornece métodos para definir métricas de avaliação e determinar conjuntos de dados de treinamento e teste.
PySpark
A tabela a seguir descreve os métodos de classe de um MLEvaluator PySpark:
abstractsplit(self, configProperties, dataframe)
Divide o conjunto de dados de entrada em subconjuntos de treinamento e teste
self
: auto referenciaçãoconfigProperties
: propriedades de configuraçãodataframe
: Conjunto de dados de entrada a ser dividido
abstractevaluate(self, dataframe, model, configProperties)
Avalia um modelo treinado e retorna os resultados da avaliação
self
: autorreferênciadataframe
: um DataFrame que consiste em dados de treinamento e testemodel
: um modelo treinadoconfigProperties
: propriedades de configuração
Spark (Scala)
A tabela a seguir descreve os métodos de classe de um MLEvaluator Spark:
abstractsplit(configProperties, data)
Divide o conjunto de dados de entrada em subconjuntos de treinamento e teste
configProperties
: propriedades de configuraçãodata
: a entrada conjunto de dados ser dividida
abstrairevaluate(configProperties, model, data)
Avalia um modelo treinado e retorna os resultados da avaliação
configProperties
: propriedades de configuraçãomodel
: um modelo treinadodata
: um DataFrame que consiste em dados de treinamento e teste