Exportera data till externa ML-miljöer

I det här dokumentet visas hur du delar en förberedd utbildningsdatauppsättning som skapats med Data Distiller till en molnlagringsplats som din ML-miljö kan läsa för utbildning och bedömning av din modell. Exemplet här exporterar utbildningsdatauppsättningen till DLZ (Data Landing Zone). Du kan ändra lagringsmålet efter behov för att arbeta med maskininlärningsmiljön.

Flödestjänsten för destinationer används för att slutföra funktionsflödet genom att landa en datauppsättning med beräknade funktioner på en lämplig molnlagringsplats.

Skapa källanslutningen create-source-connection

Källanslutningen ansvarar för att konfigurera anslutningen till Adobe Experience Platform datauppsättning så att det resulterande flödet vet exakt var data ska sökas och i vilket format.

from aepp import flowservice
flow_conn = flowservice.FlowService()

training_dataset_id = <YOUR_TRAINING_DATASET_ID>

source_res = flow_conn.createSourceConnectionDataLake(
    name=f"[CMLE] Featurized Dataset source connection created by {username}",
    dataset_ids=[training_dataset_id],
    format="parquet"
)
source_connection_id = source_res["id"]

Skapa målanslutningen create-target-connection

Målanslutningen ansvarar för anslutningen till målfilsystemet. Detta kräver att du först skapar en basanslutning till molnlagringskontot (Data Landing Zone i det här exemplet) och sedan en målanslutning till en specifik filsökväg med angivna komprimerings- och formatalternativ.

De tillgängliga molnlagringsmålen identifieras av ett anslutningsspec-ID:

Lagringstyp i molnet
Anslutningens spec-ID
Amazon S3
4fce964d-3f37-408f-9778-e597338a21ee
Azure Blob Storage
6d6b59bf-fb58-4107-9064-4d246c0e5bb2
Azure Data Lake
be2c3209-53bc-47e7-ab25-145db8b873e1
Datallandningszon
10440537-2a7b-4583-ac39-ed38d4b848e8
Google Cloud-lagring
c5d93acb-ea8b-4b14-8f53-02138444ae99
SFTP
36965a81-b1c6-401b-99f8-22508f1e6a26
connection_spec_id = "10440537-2a7b-4583-ac39-ed38d4b848e8"
base_connection_res = flow_conn.createConnection(data={
    "name": "Base Connection to DLZ created by",
    "auth": None,
    "connectionSpec": {
        "id": connection_spec_id,
        "version": "1.0"
    }
})
base_connection_id = base_connection_res["id"]

target_res = flow_conn.createTargetConnection(
    data={
        "name": "Data Landing Zone target connection",
        "baseConnectionId": base_connection_id,
        "params": {
            "mode": "Server-to-server",
            "compression": config.get("Cloud", "compression_type"),
            "datasetFileType": config.get("Cloud", "data_format"),
            "path": config.get("Cloud", "export_path")
        },
        "connectionSpec": {
            "id": connection_spec_id,
            "version": "1.0"
        }
    }
)
target_connection_id = target_res["id"]

Skapa dataflödet create-data-flow

Det sista steget är att skapa ett dataflöde mellan datauppsättningen som anges i källanslutningen och målfilens sökväg som anges i målanslutningen.

Varje tillgänglig molnlagringstyp identifieras av ett flödespec-ID:

Lagringstyp i molnet
Flödesspekt-ID
Amazon S3
269ba276-16fc-47db-92b0-c1049a3c131f
Azure Blob Storage
95bd8965-fc8a-4119-b9c3-944c2c2df6d2
Azure Data Lake
17be2013-2549-41ce-96e7-a70363bec293
Datallandningszon
cd2fc47e-e838-4f38-a581-8fff2f99b63a
Google Cloud-lagring
585c15c4-6cbf-4126-8f87-e26bff78b657
SFTP
354d6aad-4754-46e4-a576-1b384561c440

I följande kod skapas ett dataflöde med ett schema som börjar långt in i framtiden. Detta gör att du kan utlösa ad hoc-flöden under modellutvecklingen. När du har en tränad modell kan du uppdatera schemat för dataflödet och dela funktionsdatauppsättningen enligt det önskade schemat.

import time

on_schedule = False
if on_schedule:
    schedule_params = {
        "interval": 3,
        "timeUnit": "hour",
        "startTime": int(time.time())
    }
else:
    schedule_params = {
        "interval": 1,
        "timeUnit": "day",
        "startTime": int(time.time() + 60*60*24*365) # Start the schedule far in the future
    }

flow_spec_id = "cd2fc47e-e838-4f38-a581-8fff2f99b63a"
flow_obj = {
    "name": "Flow for Feature Dataset to DLZ",
    "flowSpec": {
        "id": flow_spec_id,
        "version": "1.0"
    },
    "sourceConnectionIds": [
        source_connection_id
    ],
    "targetConnectionIds": [
        target_connection_id
    ],
    "transformations": [],
    "scheduleParams": schedule_params
}
flow_res = flow_conn.createFlow(
    obj = flow_obj,
    flow_spec_id = flow_spec_id
)
dataflow_id = flow_res["id"]

När dataflödet har skapats kan du nu utlösa ett ad hoc-flöde för att dela funktionsuppsättningen på begäran:

from aepp import connector

connector = connector.AdobeRequest(
  config_object=aepp.config.config_object,
  header=aepp.config.header,
  loggingEnabled=False,
  logger=None,
)

endpoint = aepp.config.endpoints["global"] + "/data/core/activation/disflowprovider/adhocrun"

payload = {
    "activationInfo": {
        "destinations": [
            {
                "flowId": dataflow_id,
                "datasets": [
                    {"id": created_dataset_id}
                ]
            }
        ]
    }
}

connector.header.update({"Accept":"application/vnd.adobe.adhoc.dataset.activation+json; version=1"})
activation_res = connector.postData(endpoint=endpoint, data=payload)
activation_res

Smidig delning till Data Landing Zone

Om du enklare vill dela en datauppsättning med Data Landing Zone tillhandahåller biblioteket aepp en exportDatasetToDataLandingZone-funktion som utför stegen ovan i ett enda funktionsanrop:

from aepp import exportDatasetToDataLandingZone

export = exportDatasetToDataLandingZone.ExportDatasetToDataLandingZone()

dataflow_id = export.createDataFlowRunIfNotExists(
    dataset_id = created_dataset_id,
    data_format = data_format,
    export_path= export_path,
    compression_type = compression_type,
    on_schedule = False,
    config_path = config_path,
    entity_name = "Flow for Featurized Dataset to DLZ"
)

Den här koden skapar källanslutningen, målanslutningen och dataflödet baserat på de angivna parametrarna, och kör en ad hoc-körning av dataflödet i ett enda steg.

recommendation-more-help
ccf2b369-4031-483f-af63-a93b5ae5e3fb