Mit dem Model Authoring SDK können Sie benutzerdefinierte Rezepte für maschinelles Lernen und Feature Pipelines entwickeln, die in verwendet werden können Adobe Experience Platform Data Science Workspace, Bereitstellung implementierbarer Vorlagen in PySpark und Spark (Scala).
Dieses Dokument enthält Informationen zu den verschiedenen Klassen im Model Authoring SDK.
Die DataLoader-Klasse enthält alles, was zum Abrufen, Filtern und Zurückgeben von rohen Eingabedaten benötigt wird. Beispiele für Eingabedaten sind Daten für Training, Scoring oder Funktionsentwicklung. Datenlader erweitern die abstrakte Klasse DataLoader
und müssen die abstrakte Methode load
überschreiben.
PySpark
In der folgenden Tabelle werden die abstrakten Methoden einer PySpark-Datenlader-Klasse beschrieben:
Methode und Beschreibung | Parameter |
---|---|
Platform-Daten als Pandas-DataFrame laden und zurückgeben |
|
Spark
In der folgenden Tabelle werden die abstrakten Methoden eines Spark Data Loader-Klasse:
Methode und Beschreibung | Parameter |
---|---|
Platform-Daten als DataFrame laden und zurückgeben |
|
Im folgenden Beispiel werden Platform Daten nach ID und gibt einen DataFrame zurück, wobei die Datensatz-ID (datasetId
) ist eine definierte Eigenschaft in der Konfigurationsdatei.
PySpark
# PySpark
from sdk.data_loader import DataLoader
class MyDataLoader(DataLoader):
"""
Implementation of DataLoader which loads a DataFrame and prepares data
"""
def load_dataset(config_properties, spark, task_id):
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
PLATFORM_SDK_PQS_INTERACTIVE = "interactive"
# prepare variables
service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
dataset_id = str(config_properties.get(task_id))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
# load dataset through Spark session
query_options = get_query_options(spark.sparkContext)
pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
.option(query_options.datasetId(), dataset_id) \
.load()
pd.show()
# return as DataFrame
return pd
Spark (Scala)
// Spark
package com.adobe.platform.ml
import java.time.LocalDateTime
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column
/**
* Implementation of DataLoader which loads a DataFrame and prepares data
*/
class MyDataLoader extends DataLoader {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
*
* @param configProperties - Configuration Properties map
* @param sparkSession - SparkSession
* @return - DataFrame which is loaded for training
*/
def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
require(configProperties != null)
require(sparkSession != null)
// Read the configs
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
.option(QSOption.datasetId, dataSetId)
.load()
df.show()
df
}
}
Die DataSaver-Klasse enthält alles, was mit dem Speichern von Ausgabedaten zu tun hat, einschließlich Daten für Scoring oder Funktionsentwicklung. Data Saver erweitern die abstrakte Klasse DataSaver
und müssen die abstrakte Methode save
überschreiben.
PySpark
In der folgenden Tabelle werden die abstrakten Methoden eines PySpark Data Saver-Klasse:
Methode und Beschreibung | Parameter |
---|---|
Ausgabedaten als DataFrame empfangen und in einem Platform-Datensatz speichern |
|
Spark (Scala)
In der folgenden Tabelle werden die abstrakten Methoden eines Spark Data Saver-Klasse:
Methode und Beschreibung | Parameter |
---|---|
Ausgabedaten als DataFrame empfangen und in einem Platform-Datensatz speichern |
|
So speichern Sie Daten in einem Platform -Datensatz, müssen die Eigenschaften entweder bereitgestellt oder in der Konfigurationsdatei definiert werden:
Die folgenden Beispiele speichern Daten (prediction
) auf einen Platform Datensatz, wobei die Datensatz-ID (datasetId
) und Mandanten-ID (tenantId
) sind definierte Eigenschaften in der Konfigurationsdatei.
PySpark
# PySpark
from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *
class MyDataSaver(DataSaver):
"""
Implementation of DataSaver which stores a DataFrame to a Platform dataset
"""
def save(self, config_properties, prediction):
# Spark context
sparkContext = prediction._sc
# preliminary checks
if config_properties is None:
raise ValueError("config_properties parameter is null")
if prediction is None:
raise ValueError("prediction parameter is null")
if sparkContext is None:
raise ValueError("sparkContext parameter is null")
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
# prepare variables
scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
tenant_id = str(config_properties.get("tenant_id"))
timestamp = "2019-01-01 00:00:00"
service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
scored_df = prediction.withColumn("date", col("date").cast(StringType()))
scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty")
# store data into dataset
query_options = get_query_options(sparkContext)
scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.datasetId(), scored_dataset_id) \
.save()
Spark (Scala)
// Spark
package com.adobe.platform.ml
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType
/**
* Implementation of DataSaver which stores a DataFrame to a Platform dataset
*/
class ScoringDataSaver extends DataSaver {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
* Method that saves the scoring data into a dataframe
* @param configProperties - Configuration Properties map
* @param dataFrame - Dataframe with the scoring results
*/
override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit = {
require(configProperties != null)
require(dataFrame != null)
val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
val sparkSession = dataFrame.sparkSession
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val tenantId:String = configProperties.get("tenantId").getOrElse("")
val timestamp:String = "2019-01-01 00:00:00"
val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
import sparkSession.implicits._
var df = dataFrame.withColumn("date", $"date".cast("String"))
var scored_df = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty"))
scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.save()
}
}
Die DatasetTransformer-Klasse ändert und transformiert die Struktur eines Datensatzes. Die Sensei Machine Learning Runtime erfordert nicht, dass diese Komponente definiert wird, und wird entsprechend Ihren Anforderungen implementiert.
Im Hinblick auf eine Funktions-Pipeline können DataSet Transformer gemeinsam mit einer Feature-Pipeline-Factory zur Vorbereitung von Daten für die Funktionsentwicklung genutzt werden.
PySpark
In der folgenden Tabelle werden die Klassenmethoden einer PySpark-DataSet Transformer-Klasse beschrieben:
Methode und Beschreibung | Parameter |
---|---|
abstract Nimmt einen Datensatz als Eingabe und gibt einen neuen abgeleiteten Datensatz aus |
|
Spark (Scala)
In der folgenden Tabelle werden die abstrakten Methoden eines Spark dataset transformer-Klasse:
Methode und Beschreibung | Parameter |
---|---|
Nimmt einen Datensatz als Eingabe und gibt einen neuen abgeleiteten Datensatz aus |
|
Die FeaturePipelineFactory-Klasse enthält Extraktionsalgorithmen für Funktionen und definiert die Phasen einer Funktions-Pipeline von Anfang bis Ende.
PySpark
In der folgenden Tabelle werden die Klassenmethoden einer PySpark-FeaturePipelineFactory beschrieben:
Methode und Beschreibung | Parameter |
---|---|
abstract Spark-Pipeline erstellen und zurückgeben, die eine Reihe von Spark-Transformern enthält |
|
abstract Parameter-Mapping aus Konfigurationseigenschaften abrufen und zurückgeben |
|
Spark (Scala)
In der folgenden Tabelle werden die Klassenmethoden eines Spark FeaturePipelineFactory:
Methode und Beschreibung | Parameter |
---|---|
abstract Pipeline mit einer Reihe von Umwandlern erstellen und zurückgeben |
|
abstract Parameter-Mapping aus Konfigurationseigenschaften abrufen und zurückgeben |
|
Die PipelineFactory-Klasse enthält Methoden und Definitionen für Modellschulung und -bewertung, wobei Trainings-Logik und -Algorithmen in Form einer Spark Pipeline.
PySpark
Die folgende Tabelle beschreibt die Klassenmethoden einer PySpark-PipelineFactory:
Methode und Beschreibung | Parameter |
---|---|
abstract Spark-Pipeline erstellen und zurückgeben, die die Logik und den Algorithmus für das Trainieren und Bewerten von Modellen enthält |
|
abstract Benutzerdefinierte Pipeline zurückgeben, die die Logik und den Algorithmus zum Trainieren eines Modells enthält. Diese Methode ist nicht erforderlich, wenn eine Spark-Pipeline verwendet wird |
|
abstract Mit dem trainierten Modell bewerten und die Ergebnisse zurückgeben |
|
abstract Parameter-Mapping aus Konfigurationseigenschaften abrufen und zurückgeben |
|
Spark (Scala)
In der folgenden Tabelle werden die Klassenmethoden eines Spark PipelineFactory:
Methode und Beschreibung | Parameter |
---|---|
abstract Pipeline erstellen und zurückgeben, die die Logik und den Algorithmus für das Trainieren und Bewerten von Modellen enthält |
|
abstract Parameter-Mapping aus Konfigurationseigenschaften abrufen und zurückgeben |
|
Die MLEvaluator-Klasse stellt Methoden zum Definieren von Bewertungsmetriken und zum Festlegen von Trainings- und Testdatensätzen bereit.
PySpark
In der folgenden Tabelle werden die Klassenmethoden eines PySpark-MLEvaluator beschrieben:
Methode und Beschreibung | Parameter |
---|---|
abstract Teilt den Eingabedatensatz in Trainings- und Testuntergruppen |
|
abstract Wertet ein trainiertes Modell aus und gibt die Auswertungsergebnisse zurück |
|
Spark (Scala)
In der folgenden Tabelle werden die Klassenmethoden eines Spark MLEvaluator:
Methode und Beschreibung | Parameter |
---|---|
abstract Teilt den Eingabedatensatz in Trainings- und Testuntergruppen |
|
abstract Wertet ein trainiertes Modell aus und gibt die Auswertungsergebnisse zurück |
|