El SDK de Creación de modelos le permite desarrollar fórmulas de aprendizaje automático personalizadas y canalizaciones de funciones que se pueden utilizar en Adobe Experience Platform Data Science Workspace, proporcionando plantillas implementables en PySpark y Spark (Scala).
Este documento proporciona información sobre las distintas clases que se encuentran en el SDK de creación de modelos.
La clase DataLoader encapsula cualquier cosa relacionada con la recuperación, el filtrado y la devolución de datos de entrada sin procesar. Algunos ejemplos de datos de entrada son los de formación, puntuación o ingeniería de características. Los cargadores de datos amplían la clase abstracta DataLoader
y deben anular el método abstracto load
.
PySpark
En la tabla siguiente se describen los métodos abstractos de una clase PySpark Data Loader:
Método y descripción | Parámetros |
---|---|
Carga y devolución de datos de Platform como DataFrame de Pandas |
|
Spark
En la tabla siguiente se describen los métodos abstractos de una clase Spark Data Loader:
Método y descripción | Parámetros |
---|---|
Carga y devolución de datos de Platform como DataFrame |
|
En el siguiente ejemplo se recuperan los datos Platform por ID y se devuelve un DataFrame, donde el ID del conjunto de datos (datasetId
) es una propiedad definida en el archivo de configuración.
PySpark
# PySpark
from sdk.data_loader import DataLoader
class MyDataLoader(DataLoader):
"""
Implementation of DataLoader which loads a DataFrame and prepares data
"""
def load_dataset(config_properties, spark, task_id):
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
PLATFORM_SDK_PQS_INTERACTIVE = "interactive"
# prepare variables
service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
dataset_id = str(config_properties.get(task_id))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
# load dataset through Spark session
query_options = get_query_options(spark.sparkContext)
pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
.option(query_options.datasetId(), dataset_id) \
.load()
pd.show()
# return as DataFrame
return pd
Spark (Scala)
// Spark
package com.adobe.platform.ml
import java.time.LocalDateTime
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column
/**
* Implementation of DataLoader which loads a DataFrame and prepares data
*/
class MyDataLoader extends DataLoader {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
*
* @param configProperties - Configuration Properties map
* @param sparkSession - SparkSession
* @return - DataFrame which is loaded for training
*/
def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
require(configProperties != null)
require(sparkSession != null)
// Read the configs
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
.option(QSOption.datasetId, dataSetId)
.load()
df.show()
df
}
}
La clase DataSaver encapsula todo lo relacionado con el almacenamiento de datos de salida, incluidos los procedentes de la puntuación o la ingeniería de características. Los ahorradores de datos amplían la clase abstracta DataSaver
y deben anular el método abstracto save
.
PySpark
En la tabla siguiente se describen los métodos abstractos de una clase PySpark Data Saver:
Método y descripción | Parámetros |
---|---|
Recibir datos de salida como DataFrame y almacenarlos en un conjunto de datos de Platform |
|
Spark (Scala)
En la tabla siguiente se describen los métodos abstractos de una clase Spark Data Saver:
Método y descripción | Parámetros |
---|---|
Recibir datos de salida como DataFrame y almacenarlos en un conjunto de datos de Platform |
|
Para almacenar datos en un conjunto de datos Platform , las propiedades deben proporcionarse o definirse en el archivo de configuración:
Los siguientes ejemplos almacenan datos (prediction
) en un conjunto de datos Platform, donde el ID del conjunto de datos (datasetId
) y el ID del inquilino (tenantId
) son propiedades definidas dentro del archivo de configuración.
PySpark
# PySpark
from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *
class MyDataSaver(DataSaver):
"""
Implementation of DataSaver which stores a DataFrame to a Platform dataset
"""
def save(self, config_properties, prediction):
# Spark context
sparkContext = prediction._sc
# preliminary checks
if config_properties is None:
raise ValueError("config_properties parameter is null")
if prediction is None:
raise ValueError("prediction parameter is null")
if sparkContext is None:
raise ValueError("sparkContext parameter is null")
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
# prepare variables
scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
tenant_id = str(config_properties.get("tenant_id"))
timestamp = "2019-01-01 00:00:00"
service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
scored_df = prediction.withColumn("date", col("date").cast(StringType()))
scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty")
# store data into dataset
query_options = get_query_options(sparkContext)
scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.datasetId(), scored_dataset_id) \
.save()
Spark (Scala)
// Spark
package com.adobe.platform.ml
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType
/**
* Implementation of DataSaver which stores a DataFrame to a Platform dataset
*/
class ScoringDataSaver extends DataSaver {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
* Method that saves the scoring data into a dataframe
* @param configProperties - Configuration Properties map
* @param dataFrame - Dataframe with the scoring results
*/
override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit = {
require(configProperties != null)
require(dataFrame != null)
val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
val sparkSession = dataFrame.sparkSession
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val tenantId:String = configProperties.get("tenantId").getOrElse("")
val timestamp:String = "2019-01-01 00:00:00"
val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
import sparkSession.implicits._
var df = dataFrame.withColumn("date", $"date".cast("String"))
var scored_df = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty"))
scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.save()
}
}
La clase DatasetTransformer modifica y transforma la estructura de un conjunto de datos. El Sensei Machine Learning Runtime no requiere que se defina este componente y se implementa según sus necesidades.
En cuanto a una canalización de funciones, los transformadores de conjuntos de datos se pueden utilizar de forma conjunta con una fábrica de canalización de características para preparar los datos para la ingeniería de características.
PySpark
La siguiente tabla describe los métodos de clase de una clase de transformador de conjuntos de datos PySpark:
Método y descripción | Parámetros |
---|---|
abstract Toma un conjunto de datos como entrada y salida de un nuevo conjunto de datos derivado |
|
Spark (Scala)
La siguiente tabla describe los métodos abstractos de una clase de transformador de conjuntos de datos Spark:
Método y descripción | Parámetros |
---|---|
Toma un conjunto de datos como entrada y salida de un nuevo conjunto de datos derivado |
|
La clase FeaturePipelineFactory contiene algoritmos de extracción de características y define las etapas de una Canalización de funciones de principio a fin.
PySpark
En la tabla siguiente se describen los métodos de clase de PySpark FeaturePipelineFactory:
Método y descripción | Parámetros |
---|---|
abstract Crear y devolver una Canalización de Spark que contenga una serie de Transformadores de Spark |
|
abstract Recuperar y devolver el mapa de parámetros desde las propiedades de configuración |
|
Spark (Scala)
En la tabla siguiente se describen los métodos de clase de Spark FeaturePipelineFactory:
Método y descripción | Parámetros |
---|---|
abstract Crear y devolver una canalización que contenga una serie de transformadores |
|
abstract Recuperar y devolver el mapa de parámetros desde las propiedades de configuración |
|
La clase PipelineFactory encapsula métodos y definiciones para la formación y puntuación del modelo, donde la lógica y los algoritmos de capacitación se definen en forma de Spark Canalización.
PySpark
En la tabla siguiente se describen los métodos de clase de PySpark PipelineFactory:
Método y descripción | Parámetros |
---|---|
abstract Crear y devolver una canalización de Spark que contenga la lógica y el algoritmo para la formación y puntuación del modelo |
|
abstract Devolver una canalización personalizada que contiene la lógica y el algoritmo para entrenar un modelo. Este método no es obligatorio si se utiliza una Canalización de Spark |
|
abstract Puntuación mediante el modelo entrenado y devolución de los resultados |
|
abstract Recuperar y devolver el mapa de parámetros desde las propiedades de configuración |
|
Spark (Scala)
En la tabla siguiente se describen los métodos de clase de un Spark PipelineFactory:
Método y descripción | Parámetros |
---|---|
abstract Crear y devolver una canalización que contenga la lógica y el algoritmo para la formación y puntuación del modelo |
|
abstract Recuperar y devolver el mapa de parámetros desde las propiedades de configuración |
|
La clase MLEvaluator proporciona métodos para definir métricas de evaluación y determinar conjuntos de datos de prueba y capacitación.
PySpark
En la tabla siguiente se describen los métodos de clase de un valor MLEvaluator de PySpark:
Método y descripción | Parámetros |
---|---|
abstract Divide el conjunto de datos de entrada en subconjuntos de prueba y capacitación |
|
abstract Evalúa un modelo entrenado y devuelve los resultados de la evaluación |
|
Spark (Scala)
En la tabla siguiente se describen los métodos de clase de un Spark MLEvaluator:
Método y descripción | Parámetros |
---|---|
abstract Divide el conjunto de datos de entrada en subconjuntos de prueba y capacitación |
|
abstract Evalúa un modelo entrenado y devuelve los resultados de la evaluación |
|