データフローの作成
最後の手順では、ソース接続で指定されたデータセットとターゲット接続で指定された宛先ファイルパスの間でデータフローを作成します。
使用可能な各クラウドストレージタイプは、フロー仕様 ID によって識別されます。
クラウドストレージタイプ | フロー仕様 ID |
---|---|
Amazon S3 | 269ba276-16fc-47db-92b0-c1049a3c131f |
Azure Blob ストレージ | 95bd8965-fc8a-4119-b9c3-944c2c2df6d2 |
Azure データレイク | 17be2013-2549-41ce-96e7-a70363bec293 |
Data Landing Zone | cd2fc47e-e838-4f38-a581-8fff2f99b63a |
Google Cloud Storage | 585c15c4-6cbf-4126-8f87-e26bff78b657 |
SFTP | 354d6aad-4754-46e4-a576-1b384561c440 |
次のコードでは、遠い将来に開始するようにスケジュールが設定されたデータフローを作成します。 これにより、モデル開発中にアドホックフローをトリガー設定できます。 トレーニング済みモデルが完成したら、データフローのスケジュールを更新して、目的のスケジュールで機能データセットを共有できます。
import time
on_schedule = False
if on_schedule:
schedule_params = {
"interval": 3,
"timeUnit": "hour",
"startTime": int(time.time())
}
else:
schedule_params = {
"interval": 1,
"timeUnit": "day",
"startTime": int(time.time() + 60*60*24*365) # Start the schedule far in the future
}
flow_spec_id = "cd2fc47e-e838-4f38-a581-8fff2f99b63a"
flow_obj = {
"name": "Flow for Feature Dataset to DLZ",
"flowSpec": {
"id": flow_spec_id,
"version": "1.0"
},
"sourceConnectionIds": [
source_connection_id
],
"targetConnectionIds": [
target_connection_id
],
"transformations": [],
"scheduleParams": schedule_params
}
flow_res = flow_conn.createFlow(
obj = flow_obj,
flow_spec_id = flow_spec_id
)
dataflow_id = flow_res["id"]
データフローを作成したら、アドホックフロー実行をトリガーして、オンデマンドで機能データセットを共有できるようになりました。
from aepp import connector
connector = connector.AdobeRequest(
config_object=aepp.config.config_object,
header=aepp.config.header,
loggingEnabled=False,
logger=None,
)
endpoint = aepp.config.endpoints["global"] + "/data/core/activation/disflowprovider/adhocrun"
payload = {
"activationInfo": {
"destinations": [
{
"flowId": dataflow_id,
"datasets": [
{"id": created_dataset_id}
]
}
]
}
}
connector.header.update({"Accept":"application/vnd.adobe.adhoc.dataset.activation+json; version=1"})
activation_res = connector.postData(endpoint=endpoint, data=payload)
activation_res