Erstellen der Zielverbindung

Die Zielverbindung ist für die Verbindung mit dem Zieldateisystem verantwortlich. Dazu muss zunächst eine Basisverbindung zum Cloud-Speicherkonto (in diesem Beispiel die Data Landing Zone) und dann eine Zielverbindung zu einem bestimmten Dateipfad mit angegebenen Komprimierungs- und Formatoptionen erstellt werden.

Die verfügbaren Cloud-Speicher-Ziele werden jeweils durch eine Verbindungsspezifikations-ID identifiziert:

Cloud-SpeichertypVerbindungsspezifikations-ID
Amazon S34FCE964D-3F37-408F-9778-E597338A21EE
Azur Blob-Speicherung6d6b59bf-fb58-4107-9064-4d246c0e5bb2
Azure Data Lakebe2c3209-53bc-47e7-ab25-145db8b873e1
Data Landing Zone10440537-2a7b-4583-ac39-ed38d4b848e8
Google Cloud Storagec5d93acb-ea8b-4b14-8f53-02138444ae99
SFTP36965A81-B1C6-401B-99F8-22508F1E6A26
connection_spec_id = "10440537-2a7b-4583-ac39-ed38d4b848e8"
base_connection_res = flow_conn.createConnection(data={
    "name": "Base Connection to DLZ created by",
    "auth": None,
    "connectionSpec": {
        "id": connection_spec_id,
        "version": "1.0"
    }
})
base_connection_id = base_connection_res["id"]

target_res = flow_conn.createTargetConnection(
    data={
        "name": "Data Landing Zone target connection",
        "baseConnectionId": base_connection_id,
        "params": {
            "mode": "Server-to-server",
            "compression": config.get("Cloud", "compression_type"),
            "datasetFileType": config.get("Cloud", "data_format"),
            "path": config.get("Cloud", "export_path")
        },
        "connectionSpec": {
            "id": connection_spec_id,
            "version": "1.0"
        }
    }
)
target_connection_id = target_res["id"]

Datenfluss erstellen

Der letzte Schritt besteht darin, einen Datenfluss zwischen dem in der Quellverbindung angegebenen Datensatz und dem in der Zielverbindung angegebenen Zieldateipfad zu erstellen.

Jeder verfügbare Cloud-Speichertyp wird durch eine Flussspezifikations-ID identifiziert:

Cloud-SpeichertypFlussspezifikations-ID
Amazon S3269BA276-16FC-47DB-92B0-C1049A3C131F
Azur Blob-Speicherung95bd8965-fc8a-4119-b9c3-944c2c2df6d2
Azure Data Lake17be2013-2549-41ce-96e7-a70363bec293
Data Landing Zonecd2fc47e-e838-4f38-a581-8fff2f99b63a
Google Cloud Storage585c15c4-6cbf-4126-8f87-e26bff78b657
SFTP354d6aad-4754-46e4-a576-1b384561c440

Der folgende Code erstellt einen Datenfluss mit einem Zeitplan, der weit in der Zukunft beginnen soll. Auf diese Weise können Sie Ad-hoc-Flüsse während der Modellentwicklung mit Triggern versehen. Sobald Sie über ein trainiertes Modell verfügen, können Sie den Zeitplan des Datenflusses aktualisieren, um den Funktionsdatensatz nach dem gewünschten Zeitplan freizugeben.

import time

on_schedule = False
if on_schedule:
    schedule_params = {
        "interval": 3,
        "timeUnit": "hour",
        "startTime": int(time.time())
    }
else:
    schedule_params = {
        "interval": 1,
        "timeUnit": "day",
        "startTime": int(time.time() + 60*60*24*365) # Start the schedule far in the future
    }

flow_spec_id = "cd2fc47e-e838-4f38-a581-8fff2f99b63a"
flow_obj = {
    "name": "Flow for Feature Dataset to DLZ",
    "flowSpec": {
        "id": flow_spec_id,
        "version": "1.0"
    },
    "sourceConnectionIds": [
        source_connection_id
    ],
    "targetConnectionIds": [
        target_connection_id
    ],
    "transformations": [],
    "scheduleParams": schedule_params
}
flow_res = flow_conn.createFlow(
    obj = flow_obj,
    flow_spec_id = flow_spec_id
)
dataflow_id = flow_res["id"]

Nachdem der Datenfluss erstellt wurde, können Sie jetzt einen Trigger für eine Ad-hoc-Flussausführung erstellen, um den Funktionsdatensatz bei Bedarf freizugeben:

from aepp import connector

connector = connector.AdobeRequest(
  config_object=aepp.config.config_object,
  header=aepp.config.header,
  loggingEnabled=False,
  logger=None,
)

endpoint = aepp.config.endpoints["global"] + "/data/core/activation/disflowprovider/adhocrun"

payload = {
    "activationInfo": {
        "destinations": [
            {
                "flowId": dataflow_id,
                "datasets": [
                    {"id": created_dataset_id}
                ]
            }
        ]
    }
}

connector.header.update({"Accept":"application/vnd.adobe.adhoc.dataset.activation+json; version=1"})
activation_res = connector.postData(endpoint=endpoint, data=payload)
activation_res