Le pipeline di funzioni sono attualmente disponibili solo tramite API.
Adobe Experience Platform consente di creare e creare pipeline di funzioni personalizzate per eseguire l’ingegneria delle funzioni su larga scala tramite il runtime Sensei Machine Learning Framework (in seguito denominato "Runtime").
Questo documento descrive le varie classi presenti in una pipeline di funzioni e fornisce un’esercitazione dettagliata per la creazione di una pipeline di funzioni personalizzata tramite l’ SDK per l’authoring dei modelli in PySpark.
Il flusso di lavoro seguente viene eseguito quando viene eseguita una pipeline di funzioni:
Per eseguire una ricetta in qualsiasi organizzazione, è necessario quanto segue:
Tutti i set di dati di cui sopra devono essere caricati nell’ Platform interfaccia utente. Per configurarlo, utilizza lo script di avvio bootstrap fornito dall'Adobe.
La tabella seguente descrive le principali classi astratte che è necessario estendere per creare una pipeline di funzioni:
Classe astratta | Descrizione |
---|---|
DataLoader | Una classe DataLoader fornisce l'implementazione per il recupero dei dati di input. |
DatasetTransformer | Una classe DatasetTransformer fornisce implementazioni per trasformare il set di dati di input. È possibile scegliere di non fornire una classe DatasetTransformer e implementare la logica di ingegneria delle funzioni all'interno della classe FeaturePipelineFactory. |
FeaturePipelineFactory | Una classe FeaturePipelineFactory crea una pipeline Spark costituita da una serie di trasformatori Spark per eseguire l'ingegneria delle funzioni. È possibile scegliere di non fornire una classe FeaturePipelineFactory e implementare la logica di ingegneria delle funzioni all'interno della classe DatasetTransformer. |
DataSaver | Una classe DataSaver fornisce la logica per l'archiviazione di un set di dati di una funzione. |
Quando viene avviato un processo di pipeline di funzionalità, Runtime esegue prima il DataLoader per caricare i dati di input come DataFrame e quindi modifica il DataFrame eseguendo DatasetTransformer, FeaturePipelineFactory o entrambi. Infine, il set di dati della funzione risultante viene memorizzato tramite DataSaver.
Il seguente diagramma di flusso mostra l’ordine di esecuzione di Runtime:
Le sezioni seguenti forniscono dettagli ed esempi sull’implementazione delle classi richieste per una pipeline di funzionalità.
Il file JSON di configurazione è costituito da coppie chiave-valore e ha lo scopo di specificare qualsiasi variabile da definire successivamente durante il runtime. Queste coppie chiave-valore possono definire proprietà quali la posizione del set di dati di input, l’ID del set di dati di output, l’ID tenant, le intestazioni di colonna e così via.
L'esempio seguente illustra le coppie chiave-valore presenti all'interno di un file di configurazione:
Esempio JSON di configurazione
[
{
"name": "fp",
"parameters": [
{
"key": "dataset_id",
"value": "000"
},
{
"key": "featureDatasetId",
"value": "111"
},
{
"key": "tenantId",
"value": "_tenantid"
}
]
}
]
Puoi accedere alla configurazione JSON tramite qualsiasi metodo di classe che definisce config_properties
come parametro. Esempio:
PySpark
dataset_id = str(config_properties.get(dataset_id))
Per un esempio di configurazione più approfondito, consulta il file pipeline.json fornito da Data Science Workspace .
DataLoader è responsabile del recupero e del filtraggio dei dati di input. L'implementazione di DataLoader deve estendere la classe astratta DataLoader
e sovrascrivere il metodo astratto load
.
L'esempio seguente recupera un set di dati Platform per ID e lo restituisce come DataFrame, dove l'ID del set di dati (dataset_id
) è una proprietà definita nel file di configurazione.
Esempio di PySpark
# PySpark
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
import logging
class MyDataLoader(DataLoader):
def load_dataset(config_properties, spark, tenant_id, dataset_id):
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
PLATFORM_SDK_PQS_INTERACTIVE = "interactive"
service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
dataset_id = str(config_properties.get(dataset_id))
for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
query_options = get_query_options(spark.sparkContext)
pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
.option(query_options.datasetId(), dataset_id) \
.load()
pd.show()
# Get the distinct values of the dataframe
pd = pd.distinct()
# Flatten the data
if tenant_id in pd.columns:
pd = pd.select(col(tenant_id + ".*"))
return pd
Un DatasetTransformer fornisce la logica necessaria per trasformare un DataFrame di input e restituisce un nuovo DataFrame derivato. Questa classe può essere implementata in modo cooperativo con FeaturePipelineFactory, funzionare come l'unico componente di ingegneria delle funzioni, oppure è possibile scegliere di non implementare questa classe.
L'esempio seguente estende la classe DatasetTransformer:
Esempio di PySpark
# PySpark
from sdk.dataset_transformer import DatasetTransformer
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import unix_timestamp, from_unixtime, to_date, lit, lag, udf, date_format, lower, col, split, explode
from pyspark.sql import Window
from .helper import setupLogger
class MyDatasetTransformer(DatasetTransformer):
logger = setupLogger(__name__)
def transform(self, config_properties, dataset):
tenant_id = str(config_properties.get("tenantId"))
# Flatten the data
if tenant_id in dataset.columns:
self.logger.info("Flatten the data before transformation")
dataset = dataset.select(col(tenant_id + ".*"))
dataset.show()
# Convert isHoliday boolean value to Int
# Rename the column to holiday and drop isHoliday
pd = dataset.withColumn("holiday", col("isHoliday").cast(IntegerType())).drop("isHoliday")
pd.show()
# Get the week and year from date
pd = pd.withColumn("week", date_format(to_date("date", "MM/dd/yy"), "w").cast(IntegerType()))
pd = pd.withColumn("year", date_format(to_date("date", "MM/dd/yy"), "Y").cast(IntegerType()))
# Convert the date to TimestampType
pd = pd.withColumn("date", to_date(unix_timestamp(pd["date"], "MM/dd/yy").cast("timestamp")))
# Convert categorical data
indexer = StringIndexer(inputCol="storeType", outputCol="storeTypeIndex")
pd = indexer.fit(pd).transform(pd)
# Get the WeeklySalesAhead and WeeklySalesLag column values
window = Window.orderBy("date").partitionBy("store")
pd = pd.withColumn("weeklySalesLag", lag("weeklySales", 1).over(window)).na.drop(subset=["weeklySalesLag"])
pd = pd.withColumn("weeklySalesAhead", lag("weeklySales", -1).over(window)).na.drop(subset=["weeklySalesAhead"])
pd = pd.withColumn("weeklySalesScaled", lag("weeklySalesAhead", -1).over(window)).na.drop(subset=["weeklySalesScaled"])
pd = pd.withColumn("weeklySalesDiff", (pd['weeklySales'] - pd['weeklySalesLag'])/pd['weeklySalesLag'])
pd = pd.na.drop()
self.logger.debug("Transformed dataset count is %s " % pd.count())
# return transformed dataframe
return pd
Una FeaturePipelineFactory consente di implementare la logica di ingegneria delle funzioni definendo e concatenando una serie di trasformatori di scintille attraverso una pipeline di scintilla. Questa classe può essere implementata per lavorare in collaborazione con un DatasetTransformer, funzionare come unico componente di ingegneria delle funzioni, oppure è possibile scegliere di non implementare questa classe.
L’esempio seguente estende la classe FeaturePipelineFactory :
Esempio di PySpark
# PySpark
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler
import numpy as np
from sdk.pipeline_factory import PipelineFactory
class MyFeaturePipelineFactory(FeaturePipelineFactory):
def apply(self, config_properties):
if config_properties is None:
raise ValueError("config_properties parameter is null")
tenant_id = str(config_properties.get("tenantId"))
input_features = str(config_properties.get("ACP_DSW_INPUT_FEATURES"))
if input_features is None:
raise ValueError("input_features parameter is null")
if input_features.startswith(tenant_id):
input_features = input_features.replace(tenant_id + ".", "")
learning_rate = float(config_properties.get("learning_rate"))
n_estimators = int(config_properties.get("n_estimators"))
max_depth = int(config_properties.get("max_depth"))
feature_list = list(input_features.split(","))
feature_list.remove("date")
feature_list.remove("storeType")
cols = np.array(feature_list)
# Gradient-boosted tree estimator
gbt = GBTRegressor(featuresCol='features', labelCol='weeklySalesAhead', predictionCol='prediction',
maxDepth=max_depth, maxBins=n_estimators, stepSize=learning_rate)
# Assemble the fields to a vector
assembler = VectorAssembler(inputCols=cols, outputCol="features")
# Construct the pipeline
pipeline = Pipeline(stages=[assembler, gbt])
return pipeline
def train(self, config_properties, dataframe):
pass
def score(self, config_properties, dataframe, model):
pass
def getParamMap(self, config_properties, sparkSession):
return None
DataSaver è responsabile della memorizzazione dei set di dati delle funzioni risultanti in una posizione di archiviazione. L'implementazione di DataSaver deve estendere la classe astratta DataSaver
e sovrascrivere il metodo astratto save
.
L'esempio seguente estende la classe DataSaver che memorizza i dati in un set di dati Platform per ID, dove l'ID set di dati (featureDatasetId
) e l'ID tenant (tenantId
) sono proprietà definite nella configurazione.
Esempio di PySpark
# PySpark
from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
class MyDataSaver(DataSaver):
def save(self, configProperties, data_feature):
# Spark context
sparkContext = data_features._sc
# preliminary checks
if configProperties is None:
raise ValueError("configProperties parameter is null")
if data_features is None:
raise ValueError("data_features parameter is null")
if sparkContext is None:
raise ValueError("sparkContext parameter is null")
# prepare variables
timestamp = "2019-01-01 00:00:00"
output_dataset_id = str(
configProperties.get("featureDatasetId"))
tenant_id = str(
configProperties.get("tenantId"))
service_token = str(
sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(
sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(
sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(
sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
# validate variables
for arg in ['output_dataset_id', 'tenant_id', 'service_token', 'user_token', 'org_id', 'api_key']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
# create and prepare DataFrame with valid columns
output_df = data_features.withColumn("date", col("date").cast(StringType()))
output_df = output_df.withColumn(tenant_id, struct(col("date"), col("store"), col("features")))
output_df = output_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
output_df = output_df.withColumn("_id", lit("empty"))
output_df = output_df.withColumn("eventType", lit("empty"))
# store data into dataset
output_df.select(tenant_id, "_id", "eventType", "timestamp") \
.write.format("com.adobe.platform.dataset") \
.option('orgId', org_id) \
.option('serviceToken', service_token) \
.option('userToken', user_token) \
.option('serviceApiKey', api_key) \
.save(output_dataset_id)
Una volta definite e implementate le classi della pipeline di caratteristiche, è necessario specificare i nomi delle classi nel file YAML dell’applicazione.
Gli esempi seguenti specificano i nomi di classe implementati:
Esempio di PySpark
#Name of the class which contains implementation to get the input data.
feature.dataLoader: InputDataLoaderForFeaturePipeline
#Name of the class which contains implementation to get the transformed data.
feature.dataset.transformer: MyDatasetTransformer
#Name of the class which contains implementation to save the transformed data.
feature.dataSaver: DatasetSaverForTransformedData
#Name of the class which contains implementation to get the training data
training.dataLoader: TrainingDataLoader
#Name of the class which contains pipeline. It should implement PipelineFactory.scala
pipeline.class: TrainPipeline
#Name of the class which contains implementation for evaluation metrics.
evaluator: Evaluator
evaluateModel: True
#Name of the class which contains implementation to get the scoring data.
scoring.dataLoader: ScoringDataLoader
#Name of the class which contains implementation to save the scoring data.
scoring.dataSaver: MyDatasetSaver
Dopo aver creato la pipeline di funzioni, devi creare un’immagine Docker per effettuare una chiamata agli endpoint della pipeline di funzioni nell’ API Sensei Machine Learning . È necessario un URL immagine Docker per effettuare una chiamata agli endpoint della pipeline di funzionalità.
Se non disponi di un URL Docker, visita il tutorial Crea un pacchetto di file sorgente in una ricetta per una guida dettagliata alla creazione di un URL host Docker.
Facoltativamente, puoi anche utilizzare la seguente raccolta Postman per aiutare a completare il flusso di lavoro API della pipeline di funzioni:
https://www.postman.com/collections/c5fc0d1d5805a5ddd41a
Una volta raggiunta la posizione dell'immagine Docker, è possibile creare un motore di pipeline delle funzioni utilizzando l'API Sensei Machine Learning eseguendo un POST su /engines
. La creazione corretta di un motore di pipeline delle funzioni fornisce un identificatore univoco del motore (id
). Assicurati di salvare questo valore prima di continuare.
Utilizzando il engineID
appena creato, è necessario creare un MLIposition effettuando una richiesta POST all'endpoint /mlInstance
. Una risposta corretta restituisce un payload contenente i dettagli dell’istanza MLI appena creata, incluso l’identificatore univoco (id
) utilizzato nella successiva chiamata API.
Successivamente, devi creare un esperimento. Per creare un esperimento è necessario disporre dell'identificatore univoco MLIposition (id
) e inviare una richiesta POST all'endpoint /experiment
. Una risposta corretta restituisce un payload contenente i dettagli dell’esperimento appena creato, incluso l’identificatore univoco (id
) utilizzato nella chiamata API successiva.
Dopo aver creato un esperimento, devi modificare la modalità dell’esperimento su featurePipeline
. Per modificare la modalità , imposta un POST aggiuntivo su experiments/{EXPERIMENT_ID}/runs
con il EXPERIMENT_ID
e nel corpo invia { "mode":"featurePipeline"}
per specificare un’esecuzione di esperimento della pipeline di funzionalità.
Al termine, effettua una richiesta di GET a /experiments/{EXPERIMENT_ID}
a recupera lo stato dell'esperimento e attendi che lo stato dell'esperimento venga aggiornato per essere completato.
Successivamente, è necessario specificare l'attività di esecuzione della formazione. Imposta un POST su experiments/{EXPERIMENT_ID}/runs
e nel corpo imposta la modalità su train
e invia un array di attività che contengono i parametri di formazione. Una risposta corretta restituisce un payload contenente i dettagli dell’esperimento richiesto.
Al termine, effettua una richiesta di GET a /experiments/{EXPERIMENT_ID}
a recupera lo stato dell'esperimento e attendi che lo stato dell'esperimento venga aggiornato per essere completato.
Per completare questo passaggio è necessario che almeno un'esecuzione di formazione di successo sia associata al tuo esperimento.
Dopo un'esecuzione di formazione riuscita, è necessario specificare l'attività di esecuzione del punteggio. Imposta un POST su experiments/{EXPERIMENT_ID}/runs
e nel corpo imposta l'attributo mode
su "score". Viene avviato l’esecuzione dell’esperimento di punteggio.
Al termine, effettua una richiesta di GET a /experiments/{EXPERIMENT_ID}
a recupera lo stato dell'esperimento e attendi che lo stato dell'esperimento venga aggiornato per essere completato.
Al termine del punteggio, la pipeline delle funzioni deve essere operativa.
Leggendo questo documento, hai creato una pipeline di feature utilizzando l’SDK per l’authoring del modello, hai creato un’immagine Docker e hai utilizzato l’URL dell’immagine Docker per creare un modello di pipeline di feature utilizzando l’ API Sensei Machine Learning . Ora puoi continuare a trasformare i set di dati ed estrarre le funzioni di dati in scala utilizzando Sensei Machine Learning API.