Création de la connexion cible
La connexion cible est responsable de la connexion au système de fichiers de destination. Cela nécessite d’abord la création d’une connexion de base au compte de stockage dans le cloud (la zone d’entrée des données dans cet exemple), puis une connexion cible à un chemin d’accès de fichier spécifique avec des options de compression et de format spécifiées.
Les destinations de stockage dans le cloud disponibles sont chacune identifiées par un identifiant de spécification de connexion :
Type de stockage dans le cloud | Identifiant spécifique de la connexion |
---|---|
Amazon S3 | 4fce964d-3f37-408f-9778-e597338a21ee |
Stockage Azure Blob | 6d6b59bf-fb58-4107-9064-4d246c0e5bb2 |
Azure Data Lake | be2c3209-53bc-47e7-ab25-145db8b873e1 |
Zone d’atterrissage des données | 10440537-2a7b-4583-ac39-ed38d4b848e8 |
Google Cloud Storage | c5d93acb-ea8b-4b14-8f53-02138444ae99 |
SFTP | 36965a81-b1c6-401b-99f8-22508f1e6a26 |
connection_spec_id = "10440537-2a7b-4583-ac39-ed38d4b848e8"
base_connection_res = flow_conn.createConnection(data={
"name": "Base Connection to DLZ created by",
"auth": None,
"connectionSpec": {
"id": connection_spec_id,
"version": "1.0"
}
})
base_connection_id = base_connection_res["id"]
target_res = flow_conn.createTargetConnection(
data={
"name": "Data Landing Zone target connection",
"baseConnectionId": base_connection_id,
"params": {
"mode": "Server-to-server",
"compression": config.get("Cloud", "compression_type"),
"datasetFileType": config.get("Cloud", "data_format"),
"path": config.get("Cloud", "export_path")
},
"connectionSpec": {
"id": connection_spec_id,
"version": "1.0"
}
}
)
target_connection_id = target_res["id"]
Création du flux de données
La dernière étape consiste à créer un flux de données entre le jeu de données spécifié dans la connexion source et le chemin d’accès au fichier de destination spécifié dans la connexion cible.
Chaque type de stockage dans le cloud disponible est identifié par un identifiant de spécification de flux :
Type de stockage dans le cloud | Identifiant de spécification de flux |
---|---|
Amazon S3 | 269ba276-16fc-47db-92b0-c1049a3c131f |
Stockage Azure Blob | 95bd8965-fc8a-4119-b9c3-944c2c2df6d2 |
Azure Data Lake | 17be2013-2549-41ce-96e7-a70363bec293 |
Zone d’atterrissage des données | cd2fc47e-e838-4f38-a581-8fff2f99b63a |
Google Cloud Storage | 585c15c4-6cbf-4126-8f87-e26bff78b657 |
SFTP | 354d6aad-4754-46e4-a576-1b384561c440 |
Le code suivant crée un flux de données avec une planification qui doit démarrer dans un avenir lointain. Cela vous permet de déclencher des flux ad hoc lors du développement du modèle. Une fois que vous disposez d’un modèle formé, vous pouvez mettre à jour le planning du flux de données pour partager le jeu de données de fonctionnalités selon la planification souhaitée.
import time
on_schedule = False
if on_schedule:
schedule_params = {
"interval": 3,
"timeUnit": "hour",
"startTime": int(time.time())
}
else:
schedule_params = {
"interval": 1,
"timeUnit": "day",
"startTime": int(time.time() + 60*60*24*365) # Start the schedule far in the future
}
flow_spec_id = "cd2fc47e-e838-4f38-a581-8fff2f99b63a"
flow_obj = {
"name": "Flow for Feature Dataset to DLZ",
"flowSpec": {
"id": flow_spec_id,
"version": "1.0"
},
"sourceConnectionIds": [
source_connection_id
],
"targetConnectionIds": [
target_connection_id
],
"transformations": [],
"scheduleParams": schedule_params
}
flow_res = flow_conn.createFlow(
obj = flow_obj,
flow_spec_id = flow_spec_id
)
dataflow_id = flow_res["id"]
Une fois le flux de données créé, vous pouvez désormais déclencher une exécution de flux ad hoc pour partager le jeu de données de fonctionnalités à la demande :
from aepp import connector
connector = connector.AdobeRequest(
config_object=aepp.config.config_object,
header=aepp.config.header,
loggingEnabled=False,
logger=None,
)
endpoint = aepp.config.endpoints["global"] + "/data/core/activation/disflowprovider/adhocrun"
payload = {
"activationInfo": {
"destinations": [
{
"flowId": dataflow_id,
"datasets": [
{"id": created_dataset_id}
]
}
]
}
}
connector.header.update({"Accept":"application/vnd.adobe.adhoc.dataset.activation+json; version=1"})
activation_res = connector.postData(endpoint=endpoint, data=payload)
activation_res