Create the data flow

The final step is to create a data flow between the dataset specified in the source connection and the destination file path specified in the target connection.

Each available cloud storage type is identified by a flow spec ID:

Cloud Storage TypeFlow Spec ID
Amazon S3269ba276-16fc-47db-92b0-c1049a3c131f
Azure Blob Storage95bd8965-fc8a-4119-b9c3-944c2c2df6d2
Azure Data Lake17be2013-2549-41ce-96e7-a70363bec293
Data Landing Zonecd2fc47e-e838-4f38-a581-8fff2f99b63a
Google Cloud Storage585c15c4-6cbf-4126-8f87-e26bff78b657
SFTP354d6aad-4754-46e4-a576-1b384561c440

The following code creates a data flow with a schedule set to start far into the future. This allows you to trigger ad hoc flows during the model development. Once you have a trained model, you can update the schedule of the data flow to share the feature dataset on the desired schedule.

import time

on_schedule = False
if on_schedule:
    schedule_params = {
        "interval": 3,
        "timeUnit": "hour",
        "startTime": int(time.time())
    }
else:
    schedule_params = {
        "interval": 1,
        "timeUnit": "day",
        "startTime": int(time.time() + 60*60*24*365) # Start the schedule far in the future
    }

flow_spec_id = "cd2fc47e-e838-4f38-a581-8fff2f99b63a"
flow_obj = {
    "name": "Flow for Feature Dataset to DLZ",
    "flowSpec": {
        "id": flow_spec_id,
        "version": "1.0"
    },
    "sourceConnectionIds": [
        source_connection_id
    ],
    "targetConnectionIds": [
        target_connection_id
    ],
    "transformations": [],
    "scheduleParams": schedule_params
}
flow_res = flow_conn.createFlow(
    obj = flow_obj,
    flow_spec_id = flow_spec_id
)
dataflow_id = flow_res["id"]

With the data flow created, you can now trigger an ad hoc flow run to share the feature dataset on demand:

from aepp import connector

connector = connector.AdobeRequest(
  config_object=aepp.config.config_object,
  header=aepp.config.header,
  loggingEnabled=False,
  logger=None,
)

endpoint = aepp.config.endpoints["global"] + "/data/core/activation/disflowprovider/adhocrun"

payload = {
    "activationInfo": {
        "destinations": [
            {
                "flowId": dataflow_id,
                "datasets": [
                    {"id": created_dataset_id}
                ]
            }
        ]
    }
}

connector.header.update({"Accept":"application/vnd.adobe.adhoc.dataset.activation+json; version=1"})
activation_res = connector.postData(endpoint=endpoint, data=payload)
activation_res