模型製作SDK
模型創作 SDK 使你能夠開發可在數據科學工作環境中使用的Adobe Experience Platform自定義機器學習配方和功能管道,並在和 Spark (Scala)中PySpark提供可實施的範本。
此檔提供有關模型編寫 SDK 中找到的各種類的資訊。
DataLoader dataloader
類封裝與檢索、篩選和返回原始輸入數據相關的任何內容。 輸入數據的範例包括用於培訓、評分或特徵工程的數據。 資料載入器會擴充抽象類別DataLoader
,而且必須覆寫抽象方法load
。
PySpark
下表說明PySpark Data Loader類別的抽象方法:
load(self, configProperties, spark)
以Pandas DataFrame載入並傳回Platform資料
self
:自我參照configProperties
:設定屬性對應spark
: Spark工作階段
Spark
下表說明Spark資料載入器類別的抽象方法:
load(configProperties, sparkSession)
以DataFrame形式載入及傳回Platform資料
configProperties
:設定屬性對應sparkSession
: Spark工作階段
從Platform資料集載入資料 load-data-from-a-platform-dataset
下列範例會依ID擷取Platform資料並傳回DataFrame,其中資料集ID (datasetId
)是組態檔中定義的屬性。
PySpark
# PySpark
from sdk.data_loader import DataLoader
class MyDataLoader(DataLoader):
"""
Implementation of DataLoader which loads a DataFrame and prepares data
"""
def load_dataset(config_properties, spark, task_id):
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
PLATFORM_SDK_PQS_INTERACTIVE = "interactive"
# prepare variables
service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
dataset_id = str(config_properties.get(task_id))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
# load dataset through Spark session
query_options = get_query_options(spark.sparkContext)
pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
.option(query_options.datasetId(), dataset_id) \
.load()
pd.show()
# return as DataFrame
return pd
Spark (Scala)
// Spark
package com.adobe.platform.ml
import java.time.LocalDateTime
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column
/**
* Implementation of DataLoader which loads a DataFrame and prepares data
*/
class MyDataLoader extends DataLoader {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
*
* @param configProperties - Configuration Properties map
* @param sparkSession - SparkSession
* @return - DataFrame which is loaded for training
*/
def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
require(configProperties != null)
require(sparkSession != null)
// Read the configs
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
.option(QSOption.datasetId, dataSetId)
.load()
df.show()
df
}
}
DataSaver datasaver
DataSaver類別會封裝與儲存輸出資料相關的任何專案,包括來自評分或功能工程的專案。 資料儲存器會擴充抽象類別DataSaver
,而且必須覆寫抽象方法save
。
PySpark
下表說明PySpark Data Saver類別的抽象方法:
save(self, configProperties, dataframe)
以數據幀的形式接收輸出數據並將其存儲在Platform 資料集中
self
:自我參照configProperties
:設定屬性對應dataframe
:要以DataFrame形式儲存的資料
Spark (Scala)
下表說明Spark Data Saver類別的抽象方法:
save(configProperties, dataFrame)
以DataFrame形式接收輸出資料,並將其儲存在Platform資料集中
configProperties
:設定屬性對應dataFrame
:要以DataFrame形式儲存的資料
將資料儲存至Platform資料集 save-data-to-a-platform-dataset
為了將數據商店到 Platform 資料集上,必須在配置檔中提供或定義屬性:
- 要儲存資料的有效Platform資料集識別碼
- 屬於您組織的租使用者ID
下列範例會將資料(prediction
)儲存至Platform資料集,其中資料集ID (datasetId
)和租使用者ID (tenantId
)是在組態檔中定義的屬性。
PySpark
# PySpark
from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *
class MyDataSaver(DataSaver):
"""
Implementation of DataSaver which stores a DataFrame to a Platform dataset
"""
def save(self, config_properties, prediction):
# Spark context
sparkContext = prediction._sc
# preliminary checks
if config_properties is None:
raise ValueError("config_properties parameter is null")
if prediction is None:
raise ValueError("prediction parameter is null")
if sparkContext is None:
raise ValueError("sparkContext parameter is null")
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
# prepare variables
scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
tenant_id = str(config_properties.get("tenant_id"))
timestamp = "2019-01-01 00:00:00"
service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
# validate variables
for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
scored_df = prediction.withColumn("date", col("date").cast(StringType()))
scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty")
# store data into dataset
query_options = get_query_options(sparkContext)
scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.datasetId(), scored_dataset_id) \
.save()
Spark (Scala)
// Spark
package com.adobe.platform.ml
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType
/**
* Implementation of DataSaver which stores a DataFrame to a Platform dataset
*/
class ScoringDataSaver extends DataSaver {
final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
final val PLATFORM_SDK_PQS_BATCH: String = "batch"
/**
* Method that saves the scoring data into a dataframe
* @param configProperties - Configuration Properties map
* @param dataFrame - Dataframe with the scoring results
*/
override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit = {
require(configProperties != null)
require(dataFrame != null)
val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
val sparkSession = dataFrame.sparkSession
val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val tenantId:String = configProperties.get("tenantId").getOrElse("")
val timestamp:String = "2019-01-01 00:00:00"
val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
import sparkSession.implicits._
var df = dataFrame.withColumn("date", $"date".cast("String"))
var scored_df = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
scored_df = scored_df.withColumn("_id", lit("empty"))
scored_df = scored_df.withColumn("eventType", lit("empty"))
scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.serviceToken, serviceToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.save()
}
}
Datasettransformer datasettransformer
DatasetTransformer類別會修改及轉換資料集的結構。 Sensei Machine Learning Runtime不需要定義此元件,而是根據您的需求實作。
關於功能管道,資料集轉換器可與功能管道工廠搭配使用,為功能工程準備資料。
PySpark
下表說明PySpark資料集轉換器類別的類別方法:
抽象transform(self, configProperties, dataset)
以資料集作為輸入,並輸出新的衍生資料集
self
:自我參照configProperties
:配置屬性映射dataset
:轉換的輸入資料集
Spark (Scala)
下表描述了資料集轉換器類的 Spark 抽象方法:
transform(configProperties, dataset)
以資料集作為輸入,並輸出新的衍生資料集
configProperties
:設定屬性對應dataset
:轉換的輸入資料集
功能管線工廠 featurepipelinefactory
FeaturePipelineFactory類別包含特徵擷取演演算法,並定義特徵管線從開始到結束的階段。
PySpark
下表說明PySpark FeaturePipelineFactory的類別方法:
抽象create_pipeline(self, configProperties)
建立並傳回包含一系列Spark轉換器的Spark配管
self
:自我參照configProperties
:設定屬性對應
抽象get_param_map(self, configProperties, sparkSession)
從設定屬性擷取並傳回引數對應
self
:自我參照configProperties
:設定屬性sparkSession
: Spark工作階段
Spark (Scala)
下表說明Spark FeaturePipelineFactory的類別方法:
抽象createPipeline(configProperties)
建立並返回包含一系列轉換器的管道
configProperties
:配置屬性映射
抽象getParamMap(configProperties, sparkSession)
從配置屬性檢索並返回參數映射
configProperties
:設定屬性sparkSession
: Spark工作階段
Pipelinefactory pipelinefactory
PipelineFactory類別封裝模型訓練和評分的方法和定義,其中訓練邏輯和演演算法是以Spark管道的形式定義。
PySpark
下表說明PySpark PipelineFactory的類別方法:
抽象apply(self, configProperties)
建立並傳回Spark管道,其中包含模型訓練和評分的邏輯和演演算法
self
:自我參照configProperties
:設定屬性
抽象train(self, configProperties, dataframe)
傳回包含用來訓練模型的邏輯和演演算法的自訂Pipeline。 如果使用Spark配管,則不需要此方法
self
:自我參照configProperties
:設定屬性dataframe
:用於訓練輸入的功能資料集
抽象score(self, configProperties, dataframe, model)
使用經過訓練的模型分數並傳回結果
self
:自我參照configProperties
:設定屬性dataframe
:輸入資料集以進行評分model
:用於評分的訓練模型
抽象get_param_map(self, configProperties, sparkSession)
從設定屬性擷取並傳回引數對應
self
:自我參照configProperties
:設定屬性sparkSession
:火花會話
Spark (Scala)
下表描述了 PipelineFactory 的 Spark 類方法:
抽象apply(configProperties)
建立並傳回包含模型訓練和評分的邏輯和演演算法的管道
configProperties
:設定屬性
抽象getParamMap(configProperties, sparkSession)
從設定屬性擷取並傳回引數對應
configProperties
:設定屬性sparkSession
: Spark工作階段
MLEvaluator mlevaluator
MLEvaluator類別提供定義評估度量,以及決定訓練和測試資料集的方法。
PySpark
下表說明PySpark MLEvaluator的類別方法:
抽象split(self, configProperties, dataframe)
將輸入資料集分割成訓練和測試子集
self
:自我參照configProperties
:設定屬性dataframe
:要分割的輸入資料集
抽象evaluate(self, dataframe, model, configProperties)
評估經過訓練的模型並傳回評估結果
self
:自我參照dataframe
:包含訓練和測試資料的DataFramemodel
:訓練有素的模型configProperties
:設定屬性
Spark (Scala)
下表說明Spark MLEvaluator的類別方法:
抽象split(configProperties, data)
將輸入資料集分割成訓練和測試子集
configProperties
:設定屬性data
:要分割的輸入資料集
抽象evaluate(configProperties, model, data)
評估經過訓練的模型並傳回評估結果
configProperties
:設定屬性model
:訓練有素的模型data
:包含訓練和測試資料的DataFrame