機能パイプラインは、現在、API 経由でのみ使用できます。
Adobe Experience Platformでは、カスタム機能パイプラインを構築して作成し、Sensei Machine Learning Framework Runtime(以下「Runtime」)を通じて、大規模な機能エンジニアリングを実行できます。
このドキュメントでは、機能パイプラインに含まれる様々なクラスについて説明し、を使用してカスタム機能パイプラインを作成する手順を示すチュートリアルを提供します。 モデルオーサリング SDK PySpark 内。
機能パイプラインの実行時には、次のワークフローが実行されます。
任意の組織でレシピを実行するには、次が必要です。
上記のすべてのデータセットを、 Platform UI これを設定するには、Adobeが提供する ブートストラップスクリプト.
次の表に、機能パイプラインを構築するために拡張する必要がある主な抽象クラスを示します。
抽象クラス | 説明 |
---|---|
DataLoader | DataLoader クラスは、入力データを取得するための実装を提供します。 |
DatasetTransformer | DatasetTransformer クラスは、入力データセットを変換するための実装を提供します。DatasetTransformer クラスを指定する代わりに、FeaturePipelineFactory クラス内に機能エンジニアリングロジックを実装することもできます。 |
FeaturePipelineFactory | FeaturePipelineFactory クラスは、一連の Spark Transformer から成り、機能エンジニアリングを実施する Spark パイプラインを構築します。FeaturePipelineFactory クラスを指定する代わりに、DatasetTransformer クラス内に機能エンジニアリングロジックを実装することもできます。 |
DataSaver | DataSaver クラスは、機能データセットのストレージのロジックを提供します。 |
機能パイプラインジョブが開始されると、Runtime は、まず DataLoader を実行して入力データを DataFrame として読み込み、次に DatasetTransformer、FeaturePipelineFactory、またはその両方を実行して DataFrame を変更します。 最後に、生成された機能データセットが DataSaver を通して保存されます。
次のフローチャートに、Runtime の実行順序を示します。
以下の節では、機能パイプラインに必要なクラスの説明とその実装例を示します。
設定 JSON ファイルはキーと値のペアで構成され、後から実行時に定義する変数を指定することを目的としています。これらのキーと値のペアでは、入力データセットの場所、出力データセット ID、テナント ID、列ヘッダーなどのプロパティを定義できます。
次の例は、設定ファイル内にあるキーと値のペアを示しています。
設定 JSON の例
[
{
"name": "fp",
"parameters": [
{
"key": "dataset_id",
"value": "000"
},
{
"key": "featureDatasetId",
"value": "111"
},
{
"key": "tenantId",
"value": "_tenantid"
}
]
}
]
設定 JSON にアクセスするには、任意のクラスメソッドで config_properties
をパラメーターとして定義します。以下に例を示します。
PySpark
dataset_id = str(config_properties.get(dataset_id))
詳しくは、 pipeline.json Data Science Workspace によって提供されるファイルを参照してください。
DataLoader では、入力データの取得とフィルタリングをおこないます。DataLoader の実装では、抽象クラス DataLoader
を拡張し、抽象メソッド load
をオーバーライドする必要があります。
次の例では、 Platform データセットを ID でデータセット化し、DataFrame として返します。このデータセット ID(dataset_id
) は、設定ファイル内で定義されたプロパティです。
PySpark の例
# PySpark
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
import logging
class MyDataLoader(DataLoader):
def load_dataset(config_properties, spark, tenant_id, dataset_id):
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
PLATFORM_SDK_PQS_INTERACTIVE = "interactive"
service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
dataset_id = str(config_properties.get(dataset_id))
for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
query_options = get_query_options(spark.sparkContext)
pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
.option(query_options.datasetId(), dataset_id) \
.load()
pd.show()
# Get the distinct values of the dataframe
pd = pd.distinct()
# Flatten the data
if tenant_id in pd.columns:
pd = pd.select(col(tenant_id + ".*"))
return pd
DatasetTransformer は、入力 DataFrame を変換するロジックを提供し、新しい派生 DataFrame を返します。このクラスは、FeaturePipelineFactory と協働するように、または唯一の機能エンジニアリングコンポーネントとして動作するように実装できます。また、このクラスを実装しないことも可能です。
次の例では、 DatasetTransformer クラスが拡張されています。
PySpark の例
# PySpark
from sdk.dataset_transformer import DatasetTransformer
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import unix_timestamp, from_unixtime, to_date, lit, lag, udf, date_format, lower, col, split, explode
from pyspark.sql import Window
from .helper import setupLogger
class MyDatasetTransformer(DatasetTransformer):
logger = setupLogger(__name__)
def transform(self, config_properties, dataset):
tenant_id = str(config_properties.get("tenantId"))
# Flatten the data
if tenant_id in dataset.columns:
self.logger.info("Flatten the data before transformation")
dataset = dataset.select(col(tenant_id + ".*"))
dataset.show()
# Convert isHoliday boolean value to Int
# Rename the column to holiday and drop isHoliday
pd = dataset.withColumn("holiday", col("isHoliday").cast(IntegerType())).drop("isHoliday")
pd.show()
# Get the week and year from date
pd = pd.withColumn("week", date_format(to_date("date", "MM/dd/yy"), "w").cast(IntegerType()))
pd = pd.withColumn("year", date_format(to_date("date", "MM/dd/yy"), "Y").cast(IntegerType()))
# Convert the date to TimestampType
pd = pd.withColumn("date", to_date(unix_timestamp(pd["date"], "MM/dd/yy").cast("timestamp")))
# Convert categorical data
indexer = StringIndexer(inputCol="storeType", outputCol="storeTypeIndex")
pd = indexer.fit(pd).transform(pd)
# Get the WeeklySalesAhead and WeeklySalesLag column values
window = Window.orderBy("date").partitionBy("store")
pd = pd.withColumn("weeklySalesLag", lag("weeklySales", 1).over(window)).na.drop(subset=["weeklySalesLag"])
pd = pd.withColumn("weeklySalesAhead", lag("weeklySales", -1).over(window)).na.drop(subset=["weeklySalesAhead"])
pd = pd.withColumn("weeklySalesScaled", lag("weeklySalesAhead", -1).over(window)).na.drop(subset=["weeklySalesScaled"])
pd = pd.withColumn("weeklySalesDiff", (pd['weeklySales'] - pd['weeklySalesLag'])/pd['weeklySalesLag'])
pd = pd.na.drop()
self.logger.debug("Transformed dataset count is %s " % pd.count())
# return transformed dataframe
return pd
FeaturePipelineFactory を使用すると、機能エンジニアリングロジックを実装できます。そのためには、Spark パイプラインを通じて、一連の Spark Transformer を定義および連結します。このクラスは、DatasetTransformer と協働するように、または唯一の機能エンジニアリングコンポーネントとして動作するように実装できます。また、このクラスを実装しないことも可能です。
次の例では、FeaturePipelineFactory クラスを拡張しています。
PySpark の例
# PySpark
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler
import numpy as np
from sdk.pipeline_factory import PipelineFactory
class MyFeaturePipelineFactory(FeaturePipelineFactory):
def apply(self, config_properties):
if config_properties is None:
raise ValueError("config_properties parameter is null")
tenant_id = str(config_properties.get("tenantId"))
input_features = str(config_properties.get("ACP_DSW_INPUT_FEATURES"))
if input_features is None:
raise ValueError("input_features parameter is null")
if input_features.startswith(tenant_id):
input_features = input_features.replace(tenant_id + ".", "")
learning_rate = float(config_properties.get("learning_rate"))
n_estimators = int(config_properties.get("n_estimators"))
max_depth = int(config_properties.get("max_depth"))
feature_list = list(input_features.split(","))
feature_list.remove("date")
feature_list.remove("storeType")
cols = np.array(feature_list)
# Gradient-boosted tree estimator
gbt = GBTRegressor(featuresCol='features', labelCol='weeklySalesAhead', predictionCol='prediction',
maxDepth=max_depth, maxBins=n_estimators, stepSize=learning_rate)
# Assemble the fields to a vector
assembler = VectorAssembler(inputCols=cols, outputCol="features")
# Construct the pipeline
pipeline = Pipeline(stages=[assembler, gbt])
return pipeline
def train(self, config_properties, dataframe):
pass
def score(self, config_properties, dataframe, model):
pass
def getParamMap(self, config_properties, sparkSession):
return None
DataSaver は、結果の機能データセットをストレージの場所に保存します。 DataSaver の実装では、抽象クラス DataSaver
を拡張し、抽象メソッド save
をオーバーライドする必要があります。
次の例では、データを Platform データセットを ID で指定し、ここで、データセット ID(featureDatasetId
) およびテナント ID (tenantId
) は、設定で定義されたプロパティです。
PySpark の例
# PySpark
from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
class MyDataSaver(DataSaver):
def save(self, configProperties, data_feature):
# Spark context
sparkContext = data_features._sc
# preliminary checks
if configProperties is None:
raise ValueError("configProperties parameter is null")
if data_features is None:
raise ValueError("data_features parameter is null")
if sparkContext is None:
raise ValueError("sparkContext parameter is null")
# prepare variables
timestamp = "2019-01-01 00:00:00"
output_dataset_id = str(
configProperties.get("featureDatasetId"))
tenant_id = str(
configProperties.get("tenantId"))
service_token = str(
sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(
sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(
sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(
sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
# validate variables
for arg in ['output_dataset_id', 'tenant_id', 'service_token', 'user_token', 'org_id', 'api_key']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
# create and prepare DataFrame with valid columns
output_df = data_features.withColumn("date", col("date").cast(StringType()))
output_df = output_df.withColumn(tenant_id, struct(col("date"), col("store"), col("features")))
output_df = output_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
output_df = output_df.withColumn("_id", lit("empty"))
output_df = output_df.withColumn("eventType", lit("empty"))
# store data into dataset
output_df.select(tenant_id, "_id", "eventType", "timestamp") \
.write.format("com.adobe.platform.dataset") \
.option('orgId', org_id) \
.option('serviceToken', service_token) \
.option('userToken', user_token) \
.option('serviceApiKey', api_key) \
.save(output_dataset_id)
機能パイプラインのクラスを定義して実装したら、YAML アプリケーションのファイルでクラスの名前を指定する必要があります。
次の例では、実装済みのクラス名を指定します。
PySpark の例
#Name of the class which contains implementation to get the input data.
feature.dataLoader: InputDataLoaderForFeaturePipeline
#Name of the class which contains implementation to get the transformed data.
feature.dataset.transformer: MyDatasetTransformer
#Name of the class which contains implementation to save the transformed data.
feature.dataSaver: DatasetSaverForTransformedData
#Name of the class which contains implementation to get the training data
training.dataLoader: TrainingDataLoader
#Name of the class which contains pipeline. It should implement PipelineFactory.scala
pipeline.class: TrainPipeline
#Name of the class which contains implementation for evaluation metrics.
evaluator: Evaluator
evaluateModel: True
#Name of the class which contains implementation to get the scoring data.
scoring.dataLoader: ScoringDataLoader
#Name of the class which contains implementation to save the scoring data.
scoring.dataSaver: MyDatasetSaver
機能パイプラインを作成したら、Docker イメージを作成して、 Sensei Machine Learning API 機能パイプラインエンドポイントを呼び出すには、Docker 画像 URL が必要です。
Docker URL がない場合は、 ソースファイルのレシピへのパッケージ化 Docker ホスト URL の作成手順を示すチュートリアルです。
また、次のPostmanコレクションを使用して、機能パイプライン API ワークフローの完了に役立てることもできます。
https://www.postman.com/collections/c5fc0d1d5805a5ddd41a
Docker イメージの場所を取得したら、次の操作を実行できます。 フィーチャパイプラインエンジンを作成する の使用 Sensei Machine Learning API を使用するには、次の操作に対してPOSTを実行します。 /engines
. 機能パイプラインエンジンが正常に作成されると、エンジンの一意の識別子 (id
) をクリックします。 続行する前に、この値を必ず保存してください。
新しく作成したを使用 engineID
、 MLIstance の作成 に対してPOSTリクエストを行う /mlInstance
endpoint. 正常な応答は、新たに作成された MLInstance の一意の識別子 (id
) が次の API 呼び出しで使用されます。
次に、 実験の作成. 実験を作成するには、MLIstance の一意の識別子 (id
) をクリックし、 /experiment
endpoint. 成功応答は、新たに作成された実験の詳細(一意の識別子を含む)を含むペイロードを返します (id
) が次の API 呼び出しで使用されます。
実験を作成した後、実験のモードをに変更する必要があります。 featurePipeline
. モードを変更するには、次の手順で追加のPOSTを行います。 experiments/{EXPERIMENT_ID}/runs
を EXPERIMENT_ID
そして死体の中に { "mode":"featurePipeline"}
をクリックして、機能パイプラインの Experiment Run を指定します。
完了したら、に対してGETリクエストを実行します。 /experiments/{EXPERIMENT_ID}
から 実験ステータスの取得 と、Experiment のステータスが更新されて完了するまで待ちます。
次に、 トレーニング実行タスクの指定. POSTする experiments/{EXPERIMENT_ID}/runs
本文で、モードを train
およびトレーニングパラメーターを含むタスクの配列を送信します。 成功応答は、要求された実験の詳細を含むペイロードを返します。
完了したら、に対してGETリクエストを実行します。 /experiments/{EXPERIMENT_ID}
から 実験ステータスの取得 と、Experiment のステータスが更新されて完了するまで待ちます。
この手順を完了するには、1 つ以上の成功したトレーニング実行を実験に関連付ける必要があります。
トレーニングを正常に実行した後、次の操作を行う必要があります。 スコアリング実行タスクの指定. POSTする experiments/{EXPERIMENT_ID}/runs
そして体に mode
属性を「score」に設定します。 これにより、スコアリング Experiment Run が開始されます。
完了したら、に対してGETリクエストを実行します。 /experiments/{EXPERIMENT_ID}
から 実験ステータスの取得 と、Experiment のステータスが更新されて完了するまで待ちます。
スコアリングが完了したら、機能パイプラインが動作可能になります。
このドキュメントでは、Model Authoring SDK を使用して機能パイプラインをオーサリングし、Docker 画像を作成し、Docker 画像 URL を使用して、 Sensei Machine Learning API これで、データセットの変換を続行し、 Sensei Machine Learning API.