Atualmente, os Pipelines de recursos só estão disponíveis por meio da API.
O Adobe Experience Platform permite criar e criar pipelines de recursos personalizados para executar engenharia de recursos em escala por meio do Tempo de execução da estrutura de aprendizado de máquina do Sensei (a seguir denominado "Tempo de execução").
Este documento descreve as várias classes encontradas em um pipeline de recursos e fornece um tutorial passo a passo para a criação de um pipeline de recursos personalizados usando o SDK de criação de modelo no PySpark.
O fluxo de trabalho a seguir ocorre quando um pipeline de recursos é executado:
Para executar uma fórmula em qualquer organização, é necessário:
Todos os conjuntos de dados acima precisam ser carregados no Platform IU. Para configurar, use o plug-in fornecido pelo Adobe script de inicialização.
A tabela a seguir descreve as principais classes abstratas que você deve estender para criar um pipeline de recurso:
Classe abstrata | Descrição |
---|---|
DataLoader | Uma classe DataLoader fornece implementação para a recuperação de dados de entrada. |
DatasetTransformer | Uma classe DatasetTransformer fornece implementações para transformar o conjunto de dados de entrada. Você pode optar por não fornecer uma classe DatasetTransformer e implementar sua lógica de engenharia de recursos na classe FeaturePipelineFactory. |
FeaturePipelineFactory | Uma classe FeaturePipelineFactory cria um Spark Pipeline que consiste em uma série de Spark Transformers para executar a engenharia de recursos. Você pode optar por não fornecer uma classe FeaturePipelineFactory e implementar sua lógica de engenharia de recursos na classe DatasetTransformer. |
DataSaver | Uma classe DataSaver fornece a lógica para o armazenamento de um conjunto de dados de recurso. |
Quando um trabalho do Pipeline de recurso é iniciado, o Tempo de execução primeiro executa o DataLoader para carregar dados de entrada como um DataFrame e, em seguida, modifica o DataFrame executando o DatasetTransformer, o FeaturePipelineFactory ou ambos. Por fim, o conjunto de dados de recurso resultante é armazenado por meio do DataSaver.
O fluxograma a seguir mostra a ordem de execução do Tempo de execução:
As seções a seguir fornecem detalhes e exemplos sobre a implementação das classes necessárias para um Pipeline de recurso.
O arquivo JSON de configuração consiste em pares de valores chave e tem como objetivo especificar quaisquer variáveis a serem definidas posteriormente durante o tempo de execução. Esses pares de valor-chave podem definir propriedades, como localização do conjunto de dados de entrada, ID do conjunto de dados de saída, ID do locatário, cabeçalhos de coluna e assim por diante.
O exemplo a seguir demonstra pares de valores chave encontrados em um arquivo de configuração:
Exemplo de configuração JSON
[
{
"name": "fp",
"parameters": [
{
"key": "dataset_id",
"value": "000"
},
{
"key": "featureDatasetId",
"value": "111"
},
{
"key": "tenantId",
"value": "_tenantid"
}
]
}
]
Você pode acessar a configuração JSON por meio de qualquer método de classe que defina config_properties
como parâmetro. Por exemplo:
PySpark
dataset_id = str(config_properties.get(dataset_id))
Consulte a pipeline.json arquivo fornecido pelo Data Science Workspace para obter um exemplo de configuração mais detalhado.
O DataLoader é responsável pela recuperação e filtragem dos dados de entrada. Sua implementação do DataLoader deve estender a classe abstrata DataLoader
e substituir o método abstrato load
.
O exemplo a seguir recupera um Platform por ID e o retorna como um DataFrame, onde a ID do conjunto de dados (dataset_id
) é uma propriedade definida no arquivo de configuração.
Exemplo do PySpark
# PySpark
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
import logging
class MyDataLoader(DataLoader):
def load_dataset(config_properties, spark, tenant_id, dataset_id):
PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
PLATFORM_SDK_PQS_INTERACTIVE = "interactive"
service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
dataset_id = str(config_properties.get(dataset_id))
for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
query_options = get_query_options(spark.sparkContext)
pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
.option(query_options.userToken(), user_token) \
.option(query_options.serviceToken(), service_token) \
.option(query_options.imsOrg(), org_id) \
.option(query_options.apiKey(), api_key) \
.option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
.option(query_options.datasetId(), dataset_id) \
.load()
pd.show()
# Get the distinct values of the dataframe
pd = pd.distinct()
# Flatten the data
if tenant_id in pd.columns:
pd = pd.select(col(tenant_id + ".*"))
return pd
Um DatasetTransformer fornece a lógica para transformar um DataFrame de entrada e retorna um novo DataFrame derivado. Esta classe pode ser implementada para trabalhar de forma cooperativa com um FeaturePipelineFactory, funcionar como o único componente de engenharia de recursos ou você pode optar por não implementar esta classe.
O exemplo a seguir estende a classe DatasetTransformer:
Exemplo do PySpark
# PySpark
from sdk.dataset_transformer import DatasetTransformer
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import unix_timestamp, from_unixtime, to_date, lit, lag, udf, date_format, lower, col, split, explode
from pyspark.sql import Window
from .helper import setupLogger
class MyDatasetTransformer(DatasetTransformer):
logger = setupLogger(__name__)
def transform(self, config_properties, dataset):
tenant_id = str(config_properties.get("tenantId"))
# Flatten the data
if tenant_id in dataset.columns:
self.logger.info("Flatten the data before transformation")
dataset = dataset.select(col(tenant_id + ".*"))
dataset.show()
# Convert isHoliday boolean value to Int
# Rename the column to holiday and drop isHoliday
pd = dataset.withColumn("holiday", col("isHoliday").cast(IntegerType())).drop("isHoliday")
pd.show()
# Get the week and year from date
pd = pd.withColumn("week", date_format(to_date("date", "MM/dd/yy"), "w").cast(IntegerType()))
pd = pd.withColumn("year", date_format(to_date("date", "MM/dd/yy"), "Y").cast(IntegerType()))
# Convert the date to TimestampType
pd = pd.withColumn("date", to_date(unix_timestamp(pd["date"], "MM/dd/yy").cast("timestamp")))
# Convert categorical data
indexer = StringIndexer(inputCol="storeType", outputCol="storeTypeIndex")
pd = indexer.fit(pd).transform(pd)
# Get the WeeklySalesAhead and WeeklySalesLag column values
window = Window.orderBy("date").partitionBy("store")
pd = pd.withColumn("weeklySalesLag", lag("weeklySales", 1).over(window)).na.drop(subset=["weeklySalesLag"])
pd = pd.withColumn("weeklySalesAhead", lag("weeklySales", -1).over(window)).na.drop(subset=["weeklySalesAhead"])
pd = pd.withColumn("weeklySalesScaled", lag("weeklySalesAhead", -1).over(window)).na.drop(subset=["weeklySalesScaled"])
pd = pd.withColumn("weeklySalesDiff", (pd['weeklySales'] - pd['weeklySalesLag'])/pd['weeklySalesLag'])
pd = pd.na.drop()
self.logger.debug("Transformed dataset count is %s " % pd.count())
# return transformed dataframe
return pd
Um FeaturePipelineFactory permite implementar sua lógica de engenharia de recursos definindo e encadeando uma série de Transformadores Spark por meio de um Pipeline Spark. Esta classe pode ser implementada para trabalhar de forma cooperativa com um DatasetTransformer, funcionar como o único componente de engenharia de recursos ou você pode optar por não implementar esta classe.
O exemplo a seguir estende a classe FeaturePipelineFactory:
Exemplo do PySpark
# PySpark
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler
import numpy as np
from sdk.pipeline_factory import PipelineFactory
class MyFeaturePipelineFactory(FeaturePipelineFactory):
def apply(self, config_properties):
if config_properties is None:
raise ValueError("config_properties parameter is null")
tenant_id = str(config_properties.get("tenantId"))
input_features = str(config_properties.get("ACP_DSW_INPUT_FEATURES"))
if input_features is None:
raise ValueError("input_features parameter is null")
if input_features.startswith(tenant_id):
input_features = input_features.replace(tenant_id + ".", "")
learning_rate = float(config_properties.get("learning_rate"))
n_estimators = int(config_properties.get("n_estimators"))
max_depth = int(config_properties.get("max_depth"))
feature_list = list(input_features.split(","))
feature_list.remove("date")
feature_list.remove("storeType")
cols = np.array(feature_list)
# Gradient-boosted tree estimator
gbt = GBTRegressor(featuresCol='features', labelCol='weeklySalesAhead', predictionCol='prediction',
maxDepth=max_depth, maxBins=n_estimators, stepSize=learning_rate)
# Assemble the fields to a vector
assembler = VectorAssembler(inputCols=cols, outputCol="features")
# Construct the pipeline
pipeline = Pipeline(stages=[assembler, gbt])
return pipeline
def train(self, config_properties, dataframe):
pass
def score(self, config_properties, dataframe, model):
pass
def getParamMap(self, config_properties, sparkSession):
return None
O DataSaver é responsável por armazenar os conjuntos de dados de recursos resultantes em um local de armazenamento. Sua implementação do DataSaver deve estender a classe abstrata DataSaver
e substituir o método abstrato save
.
O exemplo a seguir estende a classe DataSaver, que armazena dados em uma Platform conjunto de dados por ID, onde a ID do conjunto de dados (featureDatasetId
) e ID do locatário (tenantId
) são propriedades definidas na configuração.
Exemplo do PySpark
# PySpark
from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
class MyDataSaver(DataSaver):
def save(self, configProperties, data_feature):
# Spark context
sparkContext = data_features._sc
# preliminary checks
if configProperties is None:
raise ValueError("configProperties parameter is null")
if data_features is None:
raise ValueError("data_features parameter is null")
if sparkContext is None:
raise ValueError("sparkContext parameter is null")
# prepare variables
timestamp = "2019-01-01 00:00:00"
output_dataset_id = str(
configProperties.get("featureDatasetId"))
tenant_id = str(
configProperties.get("tenantId"))
service_token = str(
sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
user_token = str(
sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
org_id = str(
sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
api_key = str(
sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))
# validate variables
for arg in ['output_dataset_id', 'tenant_id', 'service_token', 'user_token', 'org_id', 'api_key']:
if eval(arg) == 'None':
raise ValueError("%s is empty" % arg)
# create and prepare DataFrame with valid columns
output_df = data_features.withColumn("date", col("date").cast(StringType()))
output_df = output_df.withColumn(tenant_id, struct(col("date"), col("store"), col("features")))
output_df = output_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
output_df = output_df.withColumn("_id", lit("empty"))
output_df = output_df.withColumn("eventType", lit("empty"))
# store data into dataset
output_df.select(tenant_id, "_id", "eventType", "timestamp") \
.write.format("com.adobe.platform.dataset") \
.option('orgId', org_id) \
.option('serviceToken', service_token) \
.option('userToken', user_token) \
.option('serviceApiKey', api_key) \
.save(output_dataset_id)
Agora que suas classes de pipeline de recurso estão definidas e implementadas, você deve especificar os nomes de suas classes no arquivo YAML da aplicação.
Os exemplos a seguir especificam nomes de classe implementados:
Exemplo do PySpark
#Name of the class which contains implementation to get the input data.
feature.dataLoader: InputDataLoaderForFeaturePipeline
#Name of the class which contains implementation to get the transformed data.
feature.dataset.transformer: MyDatasetTransformer
#Name of the class which contains implementation to save the transformed data.
feature.dataSaver: DatasetSaverForTransformedData
#Name of the class which contains implementation to get the training data
training.dataLoader: TrainingDataLoader
#Name of the class which contains pipeline. It should implement PipelineFactory.scala
pipeline.class: TrainPipeline
#Name of the class which contains implementation for evaluation metrics.
evaluator: Evaluator
evaluateModel: True
#Name of the class which contains implementation to get the scoring data.
scoring.dataLoader: ScoringDataLoader
#Name of the class which contains implementation to save the scoring data.
scoring.dataSaver: MyDatasetSaver
Agora que você criou o pipeline de recursos, é necessário criar uma imagem do Docker para fazer uma chamada para os pontos de extremidade do pipeline de recursos no Sensei Machine Learning API. Você precisa de um URL de imagem do Docker para fazer uma chamada para os pontos de extremidade do pipeline de recursos.
Se você não tiver um URL Docker, visite o Compactar arquivos de origem em uma fórmula tutorial para obter uma apresentação passo a passo sobre a criação de um URL de host do Docker.
Como opção, você também pode usar a seguinte coleção do Postman para ajudar a concluir o fluxo de trabalho da API do pipeline de recursos:
https://www.postman.com/collections/c5fc0d1d5805a5ddd41a
Depois de ter a localização da imagem do Docker, você poderá criar um mecanismo de pipeline de recursos usando o Sensei Machine Learning API executando um POST para /engines
. A criação bem-sucedida de um mecanismo de pipeline de recurso fornece um identificador exclusivo de Mecanismo (id
). Salve este valor antes de continuar.
Usar o recém-criado engineID
, é necessário criar uma MLIstance fazendo uma solicitação POST para o /mlInstance
terminal. Uma resposta bem-sucedida retorna uma carga contendo os detalhes da MLInstance recém-criada, incluindo seu identificador exclusivo (id
) usada na próxima chamada de API.
Em seguida, é necessário criar um experimento. Para criar um experimento, você precisa ter seu identificador exclusivo de MLIstance (id
) e faça uma solicitação POST para o /experiment
terminal. Uma resposta bem-sucedida retorna uma carga contendo os detalhes do Experimento recém-criado, incluindo seu identificador exclusivo (id
) usada na próxima chamada de API.
Depois de criar um experimento, é necessário alterar o modo do experimento para featurePipeline
. Para alterar o modo, faça um POST adicional em experiments/{EXPERIMENT_ID}/runs
com o seu EXPERIMENT_ID
e no corpo enviar { "mode":"featurePipeline"}
para especificar uma execução de experimento do pipeline de recursos.
Após a conclusão, faça uma solicitação ao GET para /experiments/{EXPERIMENT_ID}
para recuperar o status do experimento e aguarde o status do Experimento ser atualizado para concluído.
Em seguida, é necessário especificar a tarefa de execução de treinamento. Faça um POST para experiments/{EXPERIMENT_ID}/runs
e no corpo defina o modo como train
e enviar uma série de tarefas que contêm seus parâmetros de treinamento. Uma resposta bem-sucedida retorna uma carga contendo os detalhes do experimento solicitado.
Após a conclusão, faça uma solicitação ao GET para /experiments/{EXPERIMENT_ID}
para recuperar o status do experimento e aguarde o status do Experimento ser atualizado para concluído.
Para concluir esta etapa, você precisa ter pelo menos uma execução de treinamento bem-sucedida associada ao seu Experimento.
Após uma execução de treinamento bem-sucedida, é necessário especificar a tarefa de execução de pontuação. Faça um POST para experiments/{EXPERIMENT_ID}/runs
e no corpo definir o mode
atributo para "score". Isso inicia a execução do Experimento de pontuação.
Após a conclusão, faça uma solicitação ao GET para /experiments/{EXPERIMENT_ID}
para recuperar o status do experimento e aguarde o status do Experimento ser atualizado para concluído.
Depois que a pontuação for concluída, o pipeline de recursos deverá estar operacional.
Ao ler este documento, você criou um pipeline de recursos usando o SDK de criação de modelo, criou uma imagem do Docker e usou o URL da imagem do Docker para criar um modelo de pipeline de recursos usando o Sensei Machine Learning API. Agora você está pronto para continuar transformando conjuntos de dados e extraindo recursos de dados em escala usando o Sensei Machine Learning API.