모델 작성 SDK

모델 작성 SDK를 사용하면 사용자 정의 기계 학습 레서피 및 기능 파이프라인을 개발할 수 있습니다 Adobe Experience Platform Data Science Workspace에서 다음을 수행하여 구현 가능한 템플릿을 제공합니다. PySpark 및 Spark (Scala).

이 문서에서는 모델 작성 SDK 내에 있는 다양한 클래스에 대한 정보를 제공합니다.

DataLoader

DataLoader 클래스는 원시 입력 데이터의 검색, 필터링 및 반환과 관련된 모든 것을 캡슐화합니다. 입력 데이터의 예로는 교육, 점수 책정 또는 기능 엔지니어링 등이 있습니다. 데이터 로더는 추상 클래스를 확장합니다. DataLoader abstract 메서드를 재정의해야 합니다. load.

PySpark

다음 표에서는 PySpark Data Loader 클래스의 추상 메서드를 설명합니다.

방법 및 설명 매개 변수

load(self, configProperties, spark)

플랫폼 데이터를 Fanda DataFrame으로 로드 및 반환

  • self: 자체 참조
  • configProperties: 구성 속성 맵
  • spark: 스파크 세션

스파크

다음 표에서는 Spark Data Loader 클래스:

방법 및 설명 매개 변수

load(configProperties, sparkSession)

플랫폼 데이터를 DataFrame으로 로드 및 반환

  • configProperties: 구성 속성 맵
  • sparkSession: 스파크 세션

에서 데이터 로드 Platform 데이터 세트

다음 예제는 Platform 데이터를 ID로 반환하고 데이터 세트 ID가 인 DataFrame을 반환합니다(datasetId)는 구성 파일에서 정의된 속성입니다.

PySpark

# PySpark

from sdk.data_loader import DataLoader

class MyDataLoader(DataLoader):
    """
    Implementation of DataLoader which loads a DataFrame and prepares data
    """

    def load_dataset(config_properties, spark, task_id):

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
        PLATFORM_SDK_PQS_INTERACTIVE = "interactive"

        # prepare variables
        service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        dataset_id = str(config_properties.get(task_id))

        # validate variables
        for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        # load dataset through Spark session

        query_options = get_query_options(spark.sparkContext)

        pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
            .option(query_options.datasetId(), dataset_id) \
            .load()
        pd.show()

        # return as DataFrame
        return pd

스파크(스칼라)

// Spark

package com.adobe.platform.ml

import java.time.LocalDateTime

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column

/**
 * Implementation of DataLoader which loads a DataFrame and prepares data
 */
class MyDataLoader extends DataLoader {

    final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
    final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
    final val PLATFORM_SDK_PQS_BATCH: String = "batch"

    /**
    *
    * @param configProperties - Configuration Properties map
    * @param sparkSession     - SparkSession
    * @return                 - DataFrame which is loaded for training
    */


  def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {

    require(configProperties != null)
    require(sparkSession != null)

    // Read the configs
    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString

    val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
      .option(QSOption.datasetId, dataSetId)
      .load()
    df.show()
    df
    }
}

DataSaver

DataSaver 클래스는 점수 책정 또는 기능 엔지니어링 등의 출력 데이터 저장과 관련된 모든 것을 캡슐화합니다. 데이터 저장기는 추상 클래스를 확장합니다. DataSaver abstract 메서드를 재정의해야 합니다. save.

PySpark

다음 표에서는 PySpark 데이터 보호기 클래스:

방법 및 설명 매개 변수

save(self, configProperties, dataframe)

출력 데이터를 DataFrame으로 수신하여 Platform 데이터 세트에 저장합니다

  • self: 자체 참조
  • configProperties: 구성 속성 맵
  • dataframe: DataFrame 형식으로 저장할 데이터

스파크(스칼라)

다음 표에서는 Spark 데이터 보호기 클래스:

방법 및 설명 매개 변수

save(configProperties, dataFrame)

출력 데이터를 DataFrame으로 수신하여 Platform 데이터 세트에 저장합니다

  • configProperties: 구성 속성 맵
  • dataFrame: DataFrame 형식으로 저장할 데이터

에 데이터 저장 Platform 데이터 세트

데이터를 Platform 데이터 집합에서 속성을 제공하거나 구성 파일에서 정의해야 합니다.

  • 유효한 Platform 데이터를 저장할 데이터 세트 ID
  • 조직에 속하는 테넌트 ID

다음 예제에서는 데이터를 저장합니다(prediction) 위에 Platform 데이터 세트, 여기서 데이터 세트 ID(datasetId) 및 테넌트 ID(tenantId)는 구성 파일 내에 정의된 속성입니다.

PySpark

# PySpark

from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *


class MyDataSaver(DataSaver):
    """
    Implementation of DataSaver which stores a DataFrame to a Platform dataset
    """

    def save(self, config_properties, prediction):

        # Spark context
        sparkContext = prediction._sc

        # preliminary checks
        if config_properties is None:
            raise ValueError("config_properties parameter is null")
        if prediction is None:
            raise ValueError("prediction parameter is null")
        if sparkContext is None:
            raise ValueError("sparkContext parameter is null")

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"

        # prepare variables
        scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
        tenant_id = str(config_properties.get("tenant_id"))
        timestamp = "2019-01-01 00:00:00"

        service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        # validate variables
       for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        scored_df = prediction.withColumn("date", col("date").cast(StringType()))
        scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
        scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
        scored_df = scored_df.withColumn("_id", lit("empty"))
        scored_df = scored_df.withColumn("eventType", lit("empty")

        # store data into dataset

        query_options = get_query_options(sparkContext)

        scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.datasetId(), scored_dataset_id) \
            .save()

스파크(스칼라)

// Spark

package com.adobe.platform.ml

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

/**
 * Implementation of DataSaver which stores a DataFrame to a Platform dataset
 */

class ScoringDataSaver extends DataSaver {

  final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
  final val PLATFORM_SDK_PQS_BATCH: String = "batch"

  /**
    * Method that saves the scoring data into a dataframe
    * @param configProperties  - Configuration Properties map
    * @param dataFrame         - Dataframe with the scoring results
    */

  override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit =  {

    require(configProperties != null)
    require(dataFrame != null)

    val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
    val sparkSession = dataFrame.sparkSession

    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val tenantId:String = configProperties.get("tenantId").getOrElse("")
    val timestamp:String = "2019-01-01 00:00:00"

    val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
    import sparkSession.implicits._

    var df = dataFrame.withColumn("date", $"date".cast("String"))

    var scored_df  = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
    scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
    scored_df = scored_df.withColumn("_id", lit("empty"))
    scored_df = scored_df.withColumn("eventType", lit("empty"))

    scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .save()
    }
}

데이터 집합 변압기

DatasetTransformer 클래스는 데이터 집합의 구조를 수정하고 변환합니다. 다음 Sensei Machine Learning Runtime 은 이 구성 요소를 정의할 필요가 없으며, 요구 사항을 기반으로 구현됩니다.

피쳐 파이프라인과 관련하여 피쳐 파이프라인 팩토리와 함께 데이터 세트 트랜스포머를 사용하여 피쳐 엔지니어링에 대한 데이터를 준비할 수 있습니다.

PySpark

다음 표에서는 PySpark 데이터 세트 변압기 클래스의 클래스 메서드를 설명합니다.

방법 및 설명 매개 변수

abstract
transform(self, configProperties, dataset)

데이터 세트를 입력으로 취하여 새로운 파생된 데이터 집합을 출력합니다

  • self: 자체 참조
  • configProperties: 구성 속성 맵
  • dataset: 변형을 위한 입력 데이터 세트

스파크(스칼라)

다음 표에서는 Spark 데이터 세트 변압기 클래스:

방법 및 설명 매개 변수

transform(configProperties, dataset)

데이터 세트를 입력으로 취하여 새로운 파생된 데이터 집합을 출력합니다

  • configProperties: 구성 속성 맵
  • dataset: 변형을 위한 입력 데이터 세트

FeaturePipelineFactory

FeaturePipelineFactory 클래스에는 피쳐 추출 알고리즘이 포함되어 있으며 피쳐 파이프라인의 단계를 처음부터 끝까지 정의합니다.

PySpark

다음 표에서는 PySpark FeaturePipelineFactory의 클래스 메서드를 설명합니다.

방법 및 설명 매개 변수

abstract
create_pipeline(self, configProperties)

일련의 스파크 트랜스포머가 포함된 스파크 파이프라인을 생성하고 반환합니다

  • self: 자체 참조
  • configProperties: 구성 속성 맵

abstract
get_param_map(self, configProperties, sparkSession)

구성 속성에서 매개 변수 맵 검색 및 반환

  • self: 자체 참조
  • configProperties: 구성 속성
  • sparkSession: 스파크 세션

스파크(스칼라)

다음 표에서는 Spark FeaturePipelineFactory:

방법 및 설명 매개 변수

abstract
createPipeline(configProperties)

일련의 트랜스포머가 포함된 파이프라인을 생성하고 반환합니다

  • configProperties: 구성 속성 맵

abstract
getParamMap(configProperties, sparkSession)

구성 속성에서 매개 변수 맵 검색 및 반환

  • configProperties: 구성 속성
  • sparkSession: 스파크 세션

PipelineFactory

PipelineFactory 클래스는 교육 논리 및 알고리즘이 Spark 파이프라인.

PySpark

다음 표에서는 PySpark PipelineFactory의 클래스 메서드를 설명합니다.

방법 및 설명 매개 변수

abstract
apply(self, configProperties)

모델 교육 및 점수 책정 논리 및 알고리즘을 포함하는 스파크 파이프라인 만들기 및 반환

  • self: 자체 참조
  • configProperties: 구성 속성

abstract
train(self, configProperties, dataframe)

모델을 교육할 로직과 알고리즘이 포함된 사용자 지정 파이프라인을 반환합니다. 이 메서드는 스파크 파이프라인을 사용하는 경우에는 필요하지 않습니다

  • self: 자체 참조
  • configProperties: 구성 속성
  • dataframe: 교육 입력을 위한 기능 데이터 세트

abstract
score(self, configProperties, dataframe, model)

숙련된 모델을 사용하여 점수를 매기고 결과를 반환합니다

  • self: 자체 참조
  • configProperties: 구성 속성
  • dataframe: 점수를 위한 입력 데이터 세트
  • model: 점수를 매기는데 사용되는 훈련된 모델

abstract
get_param_map(self, configProperties, sparkSession)

구성 속성에서 매개 변수 맵 검색 및 반환

  • self: 자체 참조
  • configProperties: 구성 속성
  • sparkSession: 스파크 세션

스파크(스칼라)

다음 표에서는 Spark PipelineFactory:

방법 및 설명 매개 변수

abstract
apply(configProperties)

모델 교육 및 점수 책정 논리 및 알고리즘을 포함하는 파이프라인 만들기 및 반환

  • configProperties: 구성 속성

abstract
getParamMap(configProperties, sparkSession)

구성 속성에서 매개 변수 맵 검색 및 반환

  • configProperties: 구성 속성
  • sparkSession: 스파크 세션

MLEvaluator

MLEvaluator 클래스는 평가 지표를 정의하고 교육 및 테스트 데이터 세트를 결정하는 방법을 제공합니다.

PySpark

다음 표에서는 PySpark MLEvaluator의 클래스 메서드를 설명합니다.

방법 및 설명 매개 변수

abstract
split(self, configProperties, dataframe)

입력 데이터 세트를 교육 및 테스트 하위 집합으로 분할합니다.

  • self: 자체 참조
  • configProperties: 구성 속성
  • dataframe: 분할할 입력 데이터 세트

abstract
evaluate(self, dataframe, model, configProperties)

숙련된 모델을 평가하고 평가 결과를 반환합니다

  • self: 자체 참조
  • dataframe: 교육 및 테스트 데이터로 구성된 DataFrame
  • model: 훈련된 모델
  • configProperties: 구성 속성

스파크(스칼라)

다음 표에서는 Spark MLEvaluator:

방법 및 설명 매개 변수

abstract
split(configProperties, data)

입력 데이터 세트를 교육 및 테스트 하위 집합으로 분할합니다.

  • configProperties: 구성 속성
  • data: 분할할 입력 데이터 세트

abstract
evaluate(configProperties, model, data)

숙련된 모델을 평가하고 평가 결과를 반환합니다

  • configProperties: 구성 속성
  • model: 훈련된 모델
  • data: 교육 및 테스트 데이터로 구성된 DataFrame

이 페이지에서는