Exportar dados para ambientes externos de ML
Este documento demonstra como compartilhar um conjunto de dados de treinamento preparado criado com o Data Distiller em um local de armazenamento na nuvem que seu ambiente de aprendizado de máquina pode ler para treinamento e pontuação em seu modelo. O exemplo aqui exporta o conjunto de dados de treinamento para a Data Landing Zone (DLZ). Você pode alterar o destino de armazenamento conforme necessário para trabalhar com seu ambiente de aprendizado de máquina.
O Serviço de Fluxo para Destinos é usado para concluir o pipeline de recursos, colocando um conjunto de dados de recursos computados em um local de armazenamento na nuvem apropriado.
Criar a conexão de origem create-source-connection
A conexão de origem é responsável por configurar a conexão com seu conjunto de dados do Adobe Experience Platform para que o fluxo resultante saiba exatamente onde procurar os dados e em que formato.
from aepp import flowservice
flow_conn = flowservice.FlowService()
training_dataset_id = <YOUR_TRAINING_DATASET_ID>
source_res = flow_conn.createSourceConnectionDataLake(
name=f"[CMLE] Featurized Dataset source connection created by {username}",
dataset_ids=[training_dataset_id],
format="parquet"
)
source_connection_id = source_res["id"]
Criar a conexão de destino create-target-connection
A conexão de destino é responsável pela conexão com o sistema de arquivos de destino. Isso requer primeiro a criação de uma conexão básica com a conta de armazenamento na nuvem (a Data Landing Zone neste exemplo) e, em seguida, uma conexão de destino com um caminho de arquivo específico com opções de compactação e formato especificadas.
Os destinos de armazenamento em nuvem disponíveis são identificados por uma ID de especificação de conexão:
connection_spec_id = "10440537-2a7b-4583-ac39-ed38d4b848e8"
base_connection_res = flow_conn.createConnection(data={
"name": "Base Connection to DLZ created by",
"auth": None,
"connectionSpec": {
"id": connection_spec_id,
"version": "1.0"
}
})
base_connection_id = base_connection_res["id"]
target_res = flow_conn.createTargetConnection(
data={
"name": "Data Landing Zone target connection",
"baseConnectionId": base_connection_id,
"params": {
"mode": "Server-to-server",
"compression": config.get("Cloud", "compression_type"),
"datasetFileType": config.get("Cloud", "data_format"),
"path": config.get("Cloud", "export_path")
},
"connectionSpec": {
"id": connection_spec_id,
"version": "1.0"
}
}
)
target_connection_id = target_res["id"]
Criar o fluxo de dados create-data-flow
A etapa final é criar um fluxo de dados entre o conjunto de dados especificado na conexão de origem e o caminho do arquivo de destino especificado na conexão de destino.
Cada tipo de armazenamento na nuvem disponível é identificado por uma ID de especificação de fluxo:
O código a seguir cria um fluxo de dados com uma programação definida para iniciar no futuro. Isso permite acionar fluxos ad hoc durante o desenvolvimento do modelo. Depois de ter um modelo treinado, você pode atualizar o agendamento do fluxo de dados para compartilhar o conjunto de dados do recurso no agendamento desejado.
import time
on_schedule = False
if on_schedule:
schedule_params = {
"interval": 3,
"timeUnit": "hour",
"startTime": int(time.time())
}
else:
schedule_params = {
"interval": 1,
"timeUnit": "day",
"startTime": int(time.time() + 60*60*24*365) # Start the schedule far in the future
}
flow_spec_id = "cd2fc47e-e838-4f38-a581-8fff2f99b63a"
flow_obj = {
"name": "Flow for Feature Dataset to DLZ",
"flowSpec": {
"id": flow_spec_id,
"version": "1.0"
},
"sourceConnectionIds": [
source_connection_id
],
"targetConnectionIds": [
target_connection_id
],
"transformations": [],
"scheduleParams": schedule_params
}
flow_res = flow_conn.createFlow(
obj = flow_obj,
flow_spec_id = flow_spec_id
)
dataflow_id = flow_res["id"]
Com o fluxo de dados criado, agora é possível acionar uma execução de fluxo ad hoc para compartilhar o conjunto de dados do recurso sob demanda:
from aepp import connector
connector = connector.AdobeRequest(
config_object=aepp.config.config_object,
header=aepp.config.header,
loggingEnabled=False,
logger=None,
)
endpoint = aepp.config.endpoints["global"] + "/data/core/activation/disflowprovider/adhocrun"
payload = {
"activationInfo": {
"destinations": [
{
"flowId": dataflow_id,
"datasets": [
{"id": created_dataset_id}
]
}
]
}
}
connector.header.update({"Accept":"application/vnd.adobe.adhoc.dataset.activation+json; version=1"})
activation_res = connector.postData(endpoint=endpoint, data=payload)
activation_res
Compartilhamento simplificado na Data Landing Zone
Para compartilhar um conjunto de dados com a Data Landing Zone mais facilmente, a biblioteca aepp
fornece uma função exportDatasetToDataLandingZone
que executa as etapas acima em uma única chamada de função:
from aepp import exportDatasetToDataLandingZone
export = exportDatasetToDataLandingZone.ExportDatasetToDataLandingZone()
dataflow_id = export.createDataFlowRunIfNotExists(
dataset_id = created_dataset_id,
data_format = data_format,
export_path= export_path,
compression_type = compression_type,
on_schedule = False,
config_path = config_path,
entity_name = "Flow for Featurized Dataset to DLZ"
)
Esse código cria a conexão de origem, a conexão de destino e o fluxo de dados com base nos parâmetros fornecidos, e executa uma execução ad hoc do fluxo de dados em uma única etapa.