Creare un flusso di dati in streaming per i dati non elaborati utilizzando Flow Service API

Questa esercitazione illustra i passaggi necessari per recuperare i dati non elaborati da un connettore di origine streaming e portarli ad Experience Platform utilizzando Flow Service API.

Introduzione

Questo tutorial richiede una buona conoscenza dei seguenti componenti di Adobe Experience Platform:

  • Experience Data Model (XDM) System: framework standardizzato tramite il quale Experienci Platform organizza i dati sull’esperienza del cliente.

    • Nozioni di base sulla composizione dello schema: scopri gli elementi di base degli schemi XDM, compresi i principi chiave e le best practice nella composizione dello schema.
    • Guida per gli sviluppatori del registro dello schema: include informazioni importanti che è necessario conoscere per eseguire correttamente le chiamate all’API Schema Registry. Ciò include {TENANT_ID}, il concetto di "contenitori" e le intestazioni necessarie per effettuare le richieste (con particolare attenzione all’intestazione Accept e ai suoi possibili valori).
  • Catalog Service: catalogo è il sistema di registrazione per la posizione e la derivazione dei dati in Experienci Platform.

  • Streaming ingestion: l’acquisizione in streaming per Platform fornisce agli utenti un metodo per inviare dati da dispositivi lato client e lato server all’Experience Platform in tempo reale.

  • Sandbox: Experienci Platform fornisce sandbox virtuali che permettono di suddividere una singola istanza Platform in ambienti virtuali separati, utili per le attività di sviluppo e aggiornamento delle applicazioni di esperienza digitale.

Utilizzo delle API di Platform

Per informazioni su come effettuare correttamente chiamate alle API di Platform, consulta la guida su introduzione alle API di Platform.

Creare una connessione sorgente source

Questo tutorial richiede anche un ID di connessione sorgente valido per un connettore di streaming. Se non disponi di queste informazioni, consulta le seguenti esercitazioni su come creare una connessione sorgente di streaming prima di provare questa esercitazione:

Creare uno schema XDM di destinazione target-schema

Per utilizzare i dati sorgente in Platform, è necessario creare uno schema di destinazione che strutturi i dati sorgente in base alle tue esigenze. Lo schema di destinazione viene quindi utilizzato per creare un set di dati di Platform in cui sono contenuti i dati di origine. Questo schema XDM di destinazione estende anche XDM Individual Profile classe.

Per creare uno schema XDM di destinazione, effettua una richiesta POST al /schemas endpoint del Schema Registry API.

Formato API

POST /tenant/schemas

Richiesta

L’esempio di richiesta seguente crea uno schema XDM che estende XDM Individual Profile classe.

curl -X POST \
    'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "type": "object",
        "title": "Sample schema for a streaming connector",
        "description": "Sample schema for a streaming connector",
        "allOf": [
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
            }
        ],
        "meta:containerId": "tenant",
        "meta:resourceType": "schemas",
        "meta:xdmType": "object",
        "meta:class": "https://ns.adobe.com/xdm/context/profile"
    }'

Risposta

In caso di esito positivo, la risposta restituisce i dettagli del nuovo schema creato, incluso il relativo identificatore univoco ($id). Questo ID è necessario nei passaggi successivi per creare un set di dati di destinazione, una mappatura e un flusso di dati.

{
    "$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
    "meta:altId": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
    "meta:resourceType": "schemas",
    "version": "1.0",
    "title": "Sample schema for a streaming connector",
    "type": "object",
    "description": "Sample schema for a streaming connector",
    "allOf": [
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
            "type": "object",
            "meta:xdmType": "object"
        }
    ],
    "refs": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "imsOrg": "{ORG_ID}",
    "meta:extensible": false,
    "meta:abstract": false,
    "meta:extends": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/common/auditable",
        "https://ns.adobe.com/xdm/data/record",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "meta:xdmType": "object",
    "meta:registryMetadata": {
        "repo:createdDate": 1604960074752,
        "repo:lastModifiedDate": 1604960074752,
        "xdm:createdClientId": "{CREATED_CLIENT_ID}",
        "xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
        "xdm:createdUserId": "{CREATED_USER_ID}",
        "xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
        "eTag": "8522a151effd974429518ed90c3eaf6efc9bf6ffb6644087a85c6d4455dcd045",
        "meta:globalLibVersion": "1.16.1"
    },
    "meta:class": "https://ns.adobe.com/xdm/context/profile",
    "meta:containerId": "tenant",
    "meta:sandboxId": "{SANDBOX_ID}",
    "meta:sandboxType": "production",
    "meta:tenantNamespace": "_{TENANT_ID}"
}

Creare un set di dati di destinazione

Con uno schema XDM di destinazione creato e il relativo univoco $id ora puoi creare un set di dati di destinazione per contenere i dati di origine. Per creare un set di dati di destinazione, effettua una richiesta POST al dataSets endpoint del API Catalog Service, fornendo l’ID dello schema di destinazione all’interno del payload.

Formato API

POST /catalog/dataSets

Richiesta

curl -X POST \
    'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Test streaming dataset",
        "schemaRef": {
            "id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
            "contentType": "application/vnd.adobe.xed-full-notext+json; version=1"
        },
        "tags": {
            "identity": [
            "enabled:true"
            ],
            "profile": [
            "enabled:true"
            ]
        }
    }'
Proprietà
Descrizione
name
Nome del set di dati da creare.
schemaRef.id
URI $id per lo schema XDM, il set di dati sarà basato su.
schemaRef.contentType
Versione dello schema. Questo valore deve essere impostato su application/vnd.adobe.xed-full-notext+json;version=1, che restituisce la versione secondaria più recente dello schema. Consulta la sezione su controllo delle versioni dello schema per ulteriori informazioni, consulta la guida dell’API XDM.

Risposta

In caso di esito positivo, la risposta restituisce un array contenente l’ID del set di dati appena creato nel formato "@/datasets/{DATASET_ID}". L’ID del set di dati è una stringa di sola lettura generata dal sistema e utilizzata per fare riferimento al set di dati nelle chiamate API. L’ID del set di dati di destinazione è richiesto nei passaggi successivi per creare una connessione di destinazione e un flusso di dati.

[
    "@/dataSets/5f7187bac6d00f194fb937c0"
]

Creare una connessione di destinazione target-connection

Le connessioni di Target creano e gestiscono una connessione di destinazione a Platform o a qualsiasi posizione in cui verranno recapitati i dati trasferiti. Le connessioni di Target contengono informazioni relative alla destinazione dei dati, al formato dei dati e all’ID della connessione di Target necessari per creare un flusso di dati. Le istanze di connessione di Target sono specifiche per un tenant e un’organizzazione.

Per creare una connessione di destinazione, effettua una richiesta POST al /targetConnections endpoint del Flow Service API. Come parte della richiesta, devi fornire il formato dati, il dataSetId recuperato nel passaggio precedente e l'ID della specifica di connessione fissa associato a Data Lake. Questo ID è c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Formato API

POST /targetConnections

Richiesta

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Streaming target connection",
        "description": "Streaming target connection",
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        },
        "data": {
            "format": "parquet_xdm",
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "5f7187bac6d00f194fb937c0"
        }
    }'
Proprietà
Descrizione
data.format
Il formato specificato dei dati che si stanno portando al data lake.
params.dataSetId
ID del set di dati di destinazione generato nel passaggio precedente. Nota: devi fornire un ID set di dati valido durante la creazione di una connessione di destinazione. Un ID di set di dati non valido genererà un errore.
connectionSpec.id
ID della specifica di connessione utilizzato per connettersi al data lake. Questo ID è: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Risposta

In caso di esito positivo, la risposta restituisce l’identificatore univoco della nuova connessione di destinazione (id). Questo ID è richiesto nei passaggi successivi.

{
    "id": "d9300194-6a82-4163-b001-946a821163b8",
    "etag": "\"4006d3e4-0000-0200-0000-5f7189220000\""
}

Creare una mappatura mapping

Per poter acquisire i dati di origine in un set di dati di destinazione, è necessario prima mapparli sullo schema di destinazione a cui il set di dati di destinazione aderisce.

Per creare un set di mappatura, effettua una richiesta POST al mappingSets endpoint del Data Prep API mentre fornisci lo schema XDM di destinazione $id e i dettagli dei set di mappatura da creare.

Formato API

POST /mappingSets

Richiesta

curl -X POST \
    'https://platform.adobe.io/data/foundation/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
        "xdmVersion": "1.0",
        "mappings": [
            {
                "destinationXdmPath": "person.name.firstName",
                "sourceAttribute": "firstName",
                "identity": false,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.lastName",
                "sourceAttribute": "lastName",
                "identity": false,
                "version": 0
            }
        ]
    }'
Proprietà
Descrizione
xdmSchema
Il $id dello schema XDM di destinazione.

Risposta

In caso di esito positivo, la risposta restituisce i dettagli della mappatura appena creata, compreso l’identificatore univoco (id). Questo ID è necessario in un passaggio successivo per creare un flusso di dati.

{
    "id": "380b032b445a46008e77585e046efe5e",
    "version": 0,
    "createdDate": 1604960750613,
    "modifiedDate": 1604960750613,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

Recuperare un elenco di specifiche del flusso di dati specs

Un flusso di dati è responsabile della raccolta dei dati dalle origini e della loro introduzione in Platform. Per creare un flusso di dati, devi prima ottenere le specifiche del flusso di dati eseguendo una richiesta di GET al Flow Service API.

Formato API

GET /flowSpecs

Richiesta

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

Risposta

In caso di esito positivo, la risposta restituisce un elenco di specifiche del flusso di dati. ID della specifica del flusso di dati da recuperare per creare un flusso di dati utilizzando uno di Amazon Kinesis, Azure Event Hubs, o Google PubSub, è d69717ba-71b4-4313-b654-49f9cf126d7a.

{
    "items": [
        {
            "id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
            "name": "Stream data with optional transformation",
            "providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
            "version": "1.0",
            "sourceConnectionSpecIds": [
                "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
                "bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
                "86043421-563b-46ec-8e6c-e23184711bf6",
                "70116022-a743-464a-bbfe-e226a7f8210c"
            ],
            "targetConnectionSpecIds": [
                "bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
                "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
                "db4fe783-ef79-4a12-bda9-32b2b1bc3b2c"
            ],
            "transformationSpecs": [
                {
                    "name": "Mapping",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "description": "defines various params required for different mapping from source to target",
                        "properties": {
                            "mappingId": {
                                "type": "string"
                            }
                        }
                    }
                }
            ],
            "attributes": {
                "uiAttributes": {
                    "apiFeatures": {
                        "flowRunsSupported": false
                    }
                }
            },
            "permissionsInfo": {
                "view": [
                    {
                        "@type": "lowLevel",
                        "name": "StreamingSource",
                        "permissions": [
                            "read"
                        ]
                    }
                ],
                "manage": [
                    {
                        "@type": "lowLevel",
                        "name": "StreamingSource",
                        "permissions": [
                            "write"
                        ]
                    }
                ]
            }
        },
    ]
}

Creare un flusso di dati

L’ultimo passaggio per la raccolta dei dati in streaming è la creazione di un flusso di dati. A questo punto sono stati preparati i seguenti valori obbligatori:

Un flusso di dati è responsabile della pianificazione e della raccolta di dati da un’origine. Puoi creare un flusso di dati eseguendo una richiesta POST e fornendo i valori precedentemente menzionati all’interno del payload.

Formato API

POST /flows

Richiesta

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Streaming dataflow",
        "description": "Streaming dataflow",
        "flowSpec": {
            "id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "e96d6135-4b50-446e-922c-6dd66672b6b2"
        ],
        "targetConnectionIds": [
            "723222e2-6ab9-4b0b-b222-e26ab9bb0bc2"
        ],
        "transformations": [
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "380b032b445a46008e77585e046efe5e",
                    "mappingVersion": 0
                }
            }
        ]
    }'
Proprietà
Descrizione
flowSpec.id
Il ID specifica di flusso recuperato nel passaggio precedente.
sourceConnectionIds
Il ID connessione sorgente recuperato in un passaggio precedente.
targetConnectionIds
Il ID connessione di destinazione recuperato in un passaggio precedente.
transformations.params.mappingId
Il ID mappatura recuperato in un passaggio precedente.

Risposta

In caso di esito positivo, la risposta restituisce l’ID (id) del flusso di dati appena creato.

{
    "id": "1f086c23-2ea8-4d06-886c-232ea8bd061d",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

Pubblica dati per l’acquisizione

Per esempi di codice JSON non elaborato o conforme a XDM da inviare per l’acquisizione, visualizza il payload di esempio riportato di seguito.

NOTE
È necessario aggiungere un ritardo di almeno ~5 minuti tra la creazione del flusso di dati e l’acquisizione di eventuali dati in streaming. Questo consente di abilitare completamente il flusso di dati prima dell’acquisizione di qualsiasi dato.

Gli esempi seguenti si applicano a tutti:

Dati non elaborati
code language-json
'{
      "name": "Johnson Smith",
      "location": {
          "city": "Seattle",
          "country": "United State of America",
          "address": "3692 Main Street"
      },
      "gender": "Male",
      "birthday": {
          "year": 1984,
          "month": 6,
          "day": 9
      }
  }'
Dati XDM
code language-json
{
  "header": {
    "schemaRef": {
      "id": "https://ns.adobe.com/aepstreamingservicesint/schemas/73cae7e6db06ebca535cd973e3ece85e66253962f504e7d8",
      "contentType": "application/vnd.adobe.xed-full-notext+json; version=1.0"
    }
  },
  "body": {
    "xdmMeta": {
      "schemaRef": {
        "id": "https://ns.adobe.com/aepstreamingservicesint/schemas/73cae7e6db06ebca535cd973e3ece85e66253962f504e7d8",
        "contentType": "application/vnd.adobe.xed-full-notext+json; version=1.0"
      }
    },
    "xdmEntity": {
      "_id": "acme",
      "workEmail": {
        "address": "mike@acme.com",
        "primary": true,
        "type": "work",
        "status": "active"
      },
      "person": {
        "gender": "male",
        "name": {
          "firstName": "Mike",
          "lastName": "Wazowski"
        },
        "birthDate": "1985-01-01"
      },
      "identityMap": {
        "ecid": [
          {
            "id": "01262118050522051420082102000000000000"
          }
        ]
      }
    }
  }
}

Passaggi successivi

Seguendo questa esercitazione, hai creato un flusso di dati per raccogliere i dati di streaming dal connettore di streaming. I dati in arrivo possono ora essere utilizzati da servizi Platform a valle come Real-Time Customer Profile e Data Science Workspace. Per ulteriori informazioni, consulta i seguenti documenti:

recommendation-more-help
337b99bb-92fb-42ae-b6b7-c7042161d089