Erstellen der Zielverbindung
Die Zielverbindung ist für die Verbindung mit dem Zieldateisystem verantwortlich. Dazu muss zunächst eine Basisverbindung zum Cloud-Speicherkonto (in diesem Beispiel die Data Landing Zone) und dann eine Zielverbindung zu einem bestimmten Dateipfad mit angegebenen Komprimierungs- und Formatoptionen erstellt werden.
Die verfügbaren Cloud-Speicher-Ziele werden jeweils durch eine Verbindungsspezifikations-ID identifiziert:
Cloud-Speichertyp | Verbindungsspezifikations-ID |
---|---|
Amazon S3 | 4FCE964D-3F37-408F-9778-E597338A21EE |
Azur Blob-Speicherung | 6d6b59bf-fb58-4107-9064-4d246c0e5bb2 |
Azure Data Lake | be2c3209-53bc-47e7-ab25-145db8b873e1 |
Data Landing Zone | 10440537-2a7b-4583-ac39-ed38d4b848e8 |
Google Cloud Storage | c5d93acb-ea8b-4b14-8f53-02138444ae99 |
SFTP | 36965A81-B1C6-401B-99F8-22508F1E6A26 |
connection_spec_id = "10440537-2a7b-4583-ac39-ed38d4b848e8"
base_connection_res = flow_conn.createConnection(data={
"name": "Base Connection to DLZ created by",
"auth": None,
"connectionSpec": {
"id": connection_spec_id,
"version": "1.0"
}
})
base_connection_id = base_connection_res["id"]
target_res = flow_conn.createTargetConnection(
data={
"name": "Data Landing Zone target connection",
"baseConnectionId": base_connection_id,
"params": {
"mode": "Server-to-server",
"compression": config.get("Cloud", "compression_type"),
"datasetFileType": config.get("Cloud", "data_format"),
"path": config.get("Cloud", "export_path")
},
"connectionSpec": {
"id": connection_spec_id,
"version": "1.0"
}
}
)
target_connection_id = target_res["id"]
Datenfluss erstellen
Der letzte Schritt besteht darin, einen Datenfluss zwischen dem in der Quellverbindung angegebenen Datensatz und dem in der Zielverbindung angegebenen Zieldateipfad zu erstellen.
Jeder verfügbare Cloud-Speichertyp wird durch eine Flussspezifikations-ID identifiziert:
Cloud-Speichertyp | Flussspezifikations-ID |
---|---|
Amazon S3 | 269BA276-16FC-47DB-92B0-C1049A3C131F |
Azur Blob-Speicherung | 95bd8965-fc8a-4119-b9c3-944c2c2df6d2 |
Azure Data Lake | 17be2013-2549-41ce-96e7-a70363bec293 |
Data Landing Zone | cd2fc47e-e838-4f38-a581-8fff2f99b63a |
Google Cloud Storage | 585c15c4-6cbf-4126-8f87-e26bff78b657 |
SFTP | 354d6aad-4754-46e4-a576-1b384561c440 |
Der folgende Code erstellt einen Datenfluss mit einem Zeitplan, der weit in der Zukunft beginnen soll. Auf diese Weise können Sie Ad-hoc-Flüsse während der Modellentwicklung mit Triggern versehen. Sobald Sie über ein trainiertes Modell verfügen, können Sie den Zeitplan des Datenflusses aktualisieren, um den Funktionsdatensatz nach dem gewünschten Zeitplan freizugeben.
import time
on_schedule = False
if on_schedule:
schedule_params = {
"interval": 3,
"timeUnit": "hour",
"startTime": int(time.time())
}
else:
schedule_params = {
"interval": 1,
"timeUnit": "day",
"startTime": int(time.time() + 60*60*24*365) # Start the schedule far in the future
}
flow_spec_id = "cd2fc47e-e838-4f38-a581-8fff2f99b63a"
flow_obj = {
"name": "Flow for Feature Dataset to DLZ",
"flowSpec": {
"id": flow_spec_id,
"version": "1.0"
},
"sourceConnectionIds": [
source_connection_id
],
"targetConnectionIds": [
target_connection_id
],
"transformations": [],
"scheduleParams": schedule_params
}
flow_res = flow_conn.createFlow(
obj = flow_obj,
flow_spec_id = flow_spec_id
)
dataflow_id = flow_res["id"]
Nachdem der Datenfluss erstellt wurde, können Sie jetzt einen Trigger für eine Ad-hoc-Flussausführung erstellen, um den Funktionsdatensatz bei Bedarf freizugeben:
from aepp import connector
connector = connector.AdobeRequest(
config_object=aepp.config.config_object,
header=aepp.config.header,
loggingEnabled=False,
logger=None,
)
endpoint = aepp.config.endpoints["global"] + "/data/core/activation/disflowprovider/adhocrun"
payload = {
"activationInfo": {
"destinations": [
{
"flowId": dataflow_id,
"datasets": [
{"id": created_dataset_id}
]
}
]
}
}
connector.header.update({"Accept":"application/vnd.adobe.adhoc.dataset.activation+json; version=1"})
activation_res = connector.postData(endpoint=endpoint, data=payload)
activation_res