Export data to external ML environments
This document demonstrates how to share a prepared training dataset created with Data Distiller to a cloud storage location which your ML environment can read for training and scoring your model. The example here exports the training dataset to the Data Landing Zone (DLZ). You can change the storage destination as needed to work with your machine learning environment.
The Flow Service for Destinations is used to complete the feature pipeline by landing a dataset of computed features into an appropriate cloud storage location.
Create the source connection create-source-connection
The source connection is responsible for configuring the connection to your Adobe Experience Platform dataset so that the resulting flow knows exactly where to look for the data and in which format.
from aepp import flowservice
flow_conn = flowservice.FlowService()
training_dataset_id = <YOUR_TRAINING_DATASET_ID>
source_res = flow_conn.createSourceConnectionDataLake(
name=f"[CMLE] Featurized Dataset source connection created by {username}",
dataset_ids=[training_dataset_id],
format="parquet"
)
source_connection_id = source_res["id"]
Create the target connection create-target-connection
The target connection is responsible for connecting to the destination filesystem. This requires first creating a base connection to cloud storage account (the Data Landing Zone in this example), then a target connection to a specific file path with specified compression and format options.
The available cloud storage destinations are each identified by a connection spec ID:
connection_spec_id = "10440537-2a7b-4583-ac39-ed38d4b848e8"
base_connection_res = flow_conn.createConnection(data={
"name": "Base Connection to DLZ created by",
"auth": None,
"connectionSpec": {
"id": connection_spec_id,
"version": "1.0"
}
})
base_connection_id = base_connection_res["id"]
target_res = flow_conn.createTargetConnection(
data={
"name": "Data Landing Zone target connection",
"baseConnectionId": base_connection_id,
"params": {
"mode": "Server-to-server",
"compression": config.get("Cloud", "compression_type"),
"datasetFileType": config.get("Cloud", "data_format"),
"path": config.get("Cloud", "export_path")
},
"connectionSpec": {
"id": connection_spec_id,
"version": "1.0"
}
}
)
target_connection_id = target_res["id"]
Create the data flow create-data-flow
The final step is to create a data flow between the dataset specified in the source connection and the destination file path specified in the target connection.
Each available cloud storage type is identified by a flow spec ID:
The following code creates a data flow with a schedule set to start far into the future. This allows you to trigger ad hoc flows during the model development. Once you have a trained model, you can update the schedule of the data flow to share the feature dataset on the desired schedule.
import time
on_schedule = False
if on_schedule:
schedule_params = {
"interval": 3,
"timeUnit": "hour",
"startTime": int(time.time())
}
else:
schedule_params = {
"interval": 1,
"timeUnit": "day",
"startTime": int(time.time() + 60*60*24*365) # Start the schedule far in the future
}
flow_spec_id = "cd2fc47e-e838-4f38-a581-8fff2f99b63a"
flow_obj = {
"name": "Flow for Feature Dataset to DLZ",
"flowSpec": {
"id": flow_spec_id,
"version": "1.0"
},
"sourceConnectionIds": [
source_connection_id
],
"targetConnectionIds": [
target_connection_id
],
"transformations": [],
"scheduleParams": schedule_params
}
flow_res = flow_conn.createFlow(
obj = flow_obj,
flow_spec_id = flow_spec_id
)
dataflow_id = flow_res["id"]
With the data flow created, you can now trigger an ad hoc flow run to share the feature dataset on demand:
from aepp import connector
connector = connector.AdobeRequest(
config_object=aepp.config.config_object,
header=aepp.config.header,
loggingEnabled=False,
logger=None,
)
endpoint = aepp.config.endpoints["global"] + "/data/core/activation/disflowprovider/adhocrun"
payload = {
"activationInfo": {
"destinations": [
{
"flowId": dataflow_id,
"datasets": [
{"id": created_dataset_id}
]
}
]
}
}
connector.header.update({"Accept":"application/vnd.adobe.adhoc.dataset.activation+json; version=1"})
activation_res = connector.postData(endpoint=endpoint, data=payload)
activation_res
Streamlined sharing to Data Landing Zone
To share a dataset to the Data Landing Zone more easily, the aepp
library provides a exportDatasetToDataLandingZone
function that executes the steps above in a single function call:
from aepp import exportDatasetToDataLandingZone
export = exportDatasetToDataLandingZone.ExportDatasetToDataLandingZone()
dataflow_id = export.createDataFlowRunIfNotExists(
dataset_id = created_dataset_id,
data_format = data_format,
export_path= export_path,
compression_type = compression_type,
on_schedule = False,
config_path = config_path,
entity_name = "Flow for Featurized Dataset to DLZ"
)
This code creates the source connection, the target connection, and data flow based on the provided parameters, and executes an ad hoc run of the data flow in a single step.