Exportera data till externa ML-miljöer

Senaste uppdatering: 2023-11-02
  • Ämnen:
  • Queries
    Visa mer om det här ämnet
  • Skapat för:
  • User
    Developer

I det här dokumentet visas hur du delar en förberedd utbildningsdatauppsättning som skapats med Data Distiller till en molnlagringsplats som din ML-miljö kan läsa för utbildning och bedömning av din modell. Exemplet här exporterar utbildningsdata till DLZ (Data Landing Zone). Du kan ändra lagringsmålet efter behov för att arbeta med maskininlärningsmiljön.

The Flödestjänst för destinationer används för att slutföra funktionspipelinen genom att landa en datauppsättning med beräknade funktioner på en lämplig molnlagringsplats.

Skapa källanslutningen

Källanslutningen ansvarar för att konfigurera anslutningen till Adobe Experience Platform datauppsättning så att det resulterande flödet vet exakt var data ska sökas och i vilket format.

from aepp import flowservice
flow_conn = flowservice.FlowService()

training_dataset_id = <YOUR_TRAINING_DATASET_ID>

source_res = flow_conn.createSourceConnectionDataLake(
    name=f"[CMLE] Featurized Dataset source connection created by {username}",
    dataset_ids=[training_dataset_id],
    format="parquet"
)
source_connection_id = source_res["id"]

Skapa målanslutningen

Målanslutningen ansvarar för anslutningen till målfilsystemet. Detta kräver att du först skapar en basanslutning till molnlagringskontot (Data Landing Zone i det här exemplet) och sedan en målanslutning till en specifik filsökväg med angivna komprimerings- och formatalternativ.

De tillgängliga molnlagringsmålen identifieras av ett anslutningsspec-ID:

Lagringstyp i molnet Anslutningens spec-ID
Amazon S3 4fce964d-3f37-408f-9778-e597338a21ee
Azure Blob Storage 6d6b59bf-fb58-4107-9064-4d246c0e5bb2
Azure Data Lake be2c3209-53bc-47e7-ab25-145db8b873e1
Datallandningszon 10440537-2a7b-4583-ac39-ed38d4b848e8
Google Cloud-lagring c5d93acb-ea8b-4b14-8f53-02138444ae99
SFTP 36965a81-b1c6-401b-99f8-22508f1e6a26
connection_spec_id = "10440537-2a7b-4583-ac39-ed38d4b848e8"
base_connection_res = flow_conn.createConnection(data={
    "name": "Base Connection to DLZ created by",
    "auth": None,
    "connectionSpec": {
        "id": connection_spec_id,
        "version": "1.0"
    }
})
base_connection_id = base_connection_res["id"]

target_res = flow_conn.createTargetConnection(
    data={
        "name": "Data Landing Zone target connection",
        "baseConnectionId": base_connection_id,
        "params": {
            "mode": "Server-to-server",
            "compression": config.get("Cloud", "compression_type"),
            "datasetFileType": config.get("Cloud", "data_format"),
            "path": config.get("Cloud", "export_path")
        },
        "connectionSpec": {
            "id": connection_spec_id,
            "version": "1.0"
        }
    }
)
target_connection_id = target_res["id"]

Skapa dataflödet

Det sista steget är att skapa ett dataflöde mellan datauppsättningen som anges i källanslutningen och målfilens sökväg som anges i målanslutningen.

Varje tillgänglig molnlagringstyp identifieras av ett flödespec-ID:

Lagringstyp i molnet Flödesspekt-ID
Amazon S3 269ba276-16fc-47db-92b0-c1049a3c131f
Azure Blob Storage 95bd8965-fc8a-4119-b9c3-944c2c2df6d2
Azure Data Lake 17be2013-2549-41ce-96e7-a70363bec293
Datallandningszon cd2fc47e-e838-4f38-a581-8fff2f99b63a
Google Cloud-lagring 585c15c4-6cbf-4126-8f87-e26bff78b657
SFTP 354d6aad-4754-46e4-a576-1b384561c440

I följande kod skapas ett dataflöde med ett schema som börjar långt in i framtiden. Detta gör att du kan utlösa ad hoc-flöden under modellutvecklingen. När du har en tränad modell kan du uppdatera schemat för dataflödet och dela funktionsdatauppsättningen enligt det önskade schemat.

import time

on_schedule = False
if on_schedule:
    schedule_params = {
        "interval": 3,
        "timeUnit": "hour",
        "startTime": int(time.time())
    }
else:
    schedule_params = {
        "interval": 1,
        "timeUnit": "day",
        "startTime": int(time.time() + 60*60*24*365) # Start the schedule far in the future
    }

flow_spec_id = "cd2fc47e-e838-4f38-a581-8fff2f99b63a"
flow_obj = {
    "name": "Flow for Feature Dataset to DLZ",
    "flowSpec": {
        "id": flow_spec_id,
        "version": "1.0"
    },
    "sourceConnectionIds": [
        source_connection_id
    ],
    "targetConnectionIds": [
        target_connection_id
    ],
    "transformations": [],
    "scheduleParams": schedule_params
}
flow_res = flow_conn.createFlow(
    obj = flow_obj,
    flow_spec_id = flow_spec_id
)
dataflow_id = flow_res["id"]

När dataflödet har skapats kan du nu utlösa ett ad hoc-flöde för att dela funktionsuppsättningen på begäran:

from aepp import connector

connector = connector.AdobeRequest(
  config_object=aepp.config.config_object,
  header=aepp.config.header,
  loggingEnabled=False,
  logger=None,
)

endpoint = aepp.config.endpoints["global"] + "/data/core/activation/disflowprovider/adhocrun"

payload = {
    "activationInfo": {
        "destinations": [
            {
                "flowId": dataflow_id,
                "datasets": [
                    {"id": created_dataset_id}
                ]
            }
        ]
    }
}

connector.header.update({"Accept":"application/vnd.adobe.adhoc.dataset.activation+json; version=1"})
activation_res = connector.postData(endpoint=endpoint, data=payload)
activation_res

Smidig delning till Data Landing Zone

För att enklare kunna dela en datauppsättning med Data Landing Zone aepp biblioteket innehåller en exportDatasetToDataLandingZone funktion som kör stegen ovan i ett enda funktionsanrop:

from aepp import exportDatasetToDataLandingZone

export = exportDatasetToDataLandingZone.ExportDatasetToDataLandingZone()

dataflow_id = export.createDataFlowRunIfNotExists(
    dataset_id = created_dataset_id,
    data_format = data_format,
    export_path= export_path,
    compression_type = compression_type,
    on_schedule = False,
    config_path = config_path,
    entity_name = "Flow for Featurized Dataset to DLZ"
)

Den här koden skapar källanslutningen, målanslutningen och dataflödet baserat på de angivna parametrarna, och kör en ad hoc-körning av dataflödet i ett enda steg.

På denna sida