Exportación de datos a entornos XML externos
Este documento muestra cómo compartir un conjunto de datos de formación preparado creado con Data Distiller en una ubicación de almacenamiento en la nube que su entorno de ML puede leer para entrenar y puntuar el modelo. El ejemplo aquí exporta el conjunto de datos de aprendizaje a la zona de aterrizaje de datos (DLZ). Puede cambiar el destino de almacenamiento según sea necesario para trabajar con su entorno de aprendizaje automático.
Flow Service for Destinations se usa para completar la canalización de características al aterrizar un conjunto de datos de características calculadas en una ubicación de almacenamiento de nube adecuada.
Crear la conexión de origen create-source-connection
La conexión de origen es responsable de configurar la conexión con el conjunto de datos de Adobe Experience Platform para que el flujo resultante sepa exactamente dónde buscar los datos y en qué formato.
from aepp import flowservice
flow_conn = flowservice.FlowService()
training_dataset_id = <YOUR_TRAINING_DATASET_ID>
source_res = flow_conn.createSourceConnectionDataLake(
name=f"[CMLE] Featurized Dataset source connection created by {username}",
dataset_ids=[training_dataset_id],
format="parquet"
)
source_connection_id = source_res["id"]
Creación de la conexión de destino create-target-connection
La conexión de destino es responsable de conectarse al sistema de archivos de destino. Esto requiere primero crear una conexión base a la cuenta de almacenamiento en la nube (la zona de aterrizaje de datos en este ejemplo) y luego una conexión de destino a una ruta de archivo específica con opciones de compresión y formato especificadas.
Cada uno de los destinos de almacenamiento en la nube disponibles se identifica mediante un ID de especificación de conexión:
connection_spec_id = "10440537-2a7b-4583-ac39-ed38d4b848e8"
base_connection_res = flow_conn.createConnection(data={
"name": "Base Connection to DLZ created by",
"auth": None,
"connectionSpec": {
"id": connection_spec_id,
"version": "1.0"
}
})
base_connection_id = base_connection_res["id"]
target_res = flow_conn.createTargetConnection(
data={
"name": "Data Landing Zone target connection",
"baseConnectionId": base_connection_id,
"params": {
"mode": "Server-to-server",
"compression": config.get("Cloud", "compression_type"),
"datasetFileType": config.get("Cloud", "data_format"),
"path": config.get("Cloud", "export_path")
},
"connectionSpec": {
"id": connection_spec_id,
"version": "1.0"
}
}
)
target_connection_id = target_res["id"]
Creación del flujo de datos create-data-flow
El paso final es crear un flujo de datos entre el conjunto de datos especificado en la conexión de origen y la ruta de acceso del archivo de destino especificada en la conexión de destino.
Cada tipo de almacenamiento en la nube disponible se identifica con un ID de especificación de flujo:
El siguiente código crea un flujo de datos con una programación configurada para iniciarse en un futuro lejano. Esto le permite almacenar en déclencheur flujos ad hoc durante el desarrollo del modelo. Una vez que tenga un modelo entrenado, puede actualizar la programación del flujo de datos para compartir el conjunto de datos de funciones en la programación deseada.
import time
on_schedule = False
if on_schedule:
schedule_params = {
"interval": 3,
"timeUnit": "hour",
"startTime": int(time.time())
}
else:
schedule_params = {
"interval": 1,
"timeUnit": "day",
"startTime": int(time.time() + 60*60*24*365) # Start the schedule far in the future
}
flow_spec_id = "cd2fc47e-e838-4f38-a581-8fff2f99b63a"
flow_obj = {
"name": "Flow for Feature Dataset to DLZ",
"flowSpec": {
"id": flow_spec_id,
"version": "1.0"
},
"sourceConnectionIds": [
source_connection_id
],
"targetConnectionIds": [
target_connection_id
],
"transformations": [],
"scheduleParams": schedule_params
}
flow_res = flow_conn.createFlow(
obj = flow_obj,
flow_spec_id = flow_spec_id
)
dataflow_id = flow_res["id"]
Con el flujo de datos creado, ahora puede almacenar en déclencheur una ejecución de flujo ad hoc para compartir el conjunto de datos de funciones bajo demanda:
from aepp import connector
connector = connector.AdobeRequest(
config_object=aepp.config.config_object,
header=aepp.config.header,
loggingEnabled=False,
logger=None,
)
endpoint = aepp.config.endpoints["global"] + "/data/core/activation/disflowprovider/adhocrun"
payload = {
"activationInfo": {
"destinations": [
{
"flowId": dataflow_id,
"datasets": [
{"id": created_dataset_id}
]
}
]
}
}
connector.header.update({"Accept":"application/vnd.adobe.adhoc.dataset.activation+json; version=1"})
activation_res = connector.postData(endpoint=endpoint, data=payload)
activation_res
Uso compartido optimizado en la zona de aterrizaje de datos
Para compartir un conjunto de datos en la zona de aterrizaje de datos más fácilmente, la biblioteca aepp
proporciona una función exportDatasetToDataLandingZone
que ejecuta los pasos anteriores en una sola llamada de función:
from aepp import exportDatasetToDataLandingZone
export = exportDatasetToDataLandingZone.ExportDatasetToDataLandingZone()
dataflow_id = export.createDataFlowRunIfNotExists(
dataset_id = created_dataset_id,
data_format = data_format,
export_path= export_path,
compression_type = compression_type,
on_schedule = False,
config_path = config_path,
entity_name = "Flow for Featurized Dataset to DLZ"
)
Este código crea la conexión de origen, la conexión de destino y el flujo de datos en función de los parámetros proporcionados, y ejecuta una ejecución ad hoc del flujo de datos en un solo paso.