Creare un flusso di dati per le origini di database utilizzando Flow Service API

Questa esercitazione descrive i passaggi per recuperare i dati da un’origine di database e portarli a Platform utilizzando Flow Service API.

NOTA

Per creare un flusso di dati, è necessario disporre già di un ID di connessione di base valido con un'origine del database. Se non disponi di questo ID, consulta la sezione panoramica di origini per un elenco delle origini di database con cui è possibile creare una connessione di base.

Introduzione

Questa esercitazione richiede una buona comprensione dei seguenti componenti di Adobe Experience Platform:

  • Experience Data Model (XDM) System: Il framework standardizzato in base al quale l’Experience Platform organizza i dati sulla customer experience.
    • Nozioni di base sulla composizione dello schema: Scopri i blocchi di base degli schemi XDM, inclusi i principi chiave e le best practice nella composizione dello schema.
    • Guida per gli sviluppatori del Registro di sistema dello schema: Include informazioni importanti da conoscere per eseguire correttamente le chiamate all’API del Registro di sistema dello schema. Questo include {TENANT_ID}, il concetto di "contenitori" e le intestazioni richieste per effettuare richieste (con particolare attenzione all’intestazione Accept e ai suoi possibili valori).
  • Catalog Service: Catalogo è il sistema di registrazione per la posizione dei dati e la derivazione all'interno di Experience Platform.
  • Batch ingestion: L’API di acquisizione in batch consente di inserire dati in Experience Platform come file batch.
  • Sandbox: Experience Platform fornisce sandbox virtuali che suddividono una singola istanza di Platform in ambienti virtuali separati per sviluppare e sviluppare applicazioni di esperienza digitale.

Utilizzo delle API di Platform

Per informazioni su come effettuare correttamente le chiamate alle API di Platform, consulta la guida su guida introduttiva alle API di Platform.

Creazione di una connessione sorgente

È possibile creare una connessione sorgente effettuando una richiesta di POST al Flow Service API. Una connessione di origine è costituita da un ID connessione, un percorso del file di dati di origine e un ID della specifica di connessione.

Per creare una connessione di origine, è inoltre necessario definire un valore enum per l'attributo del formato dati.

Utilizza i seguenti valori enum per i connettori basati su file:

Formato dati Valore Enum
Delimitato delimited
JSON json
Parquet parquet

Per tutti i connettori basati su tabella, imposta il valore su tabular.

Formato API

POST /sourceConnections

Richiesta

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database source connection",
        "baseConnectionId": "6990abad-977d-41b9-a85d-17ea8cf1c0e4",
        "description": "Database source connection",
        "data": {
            "format": "tabular"
        },
        "params": {
            "tableName": "test1.Mytable",
            "columns": [
                {
                    "name": "TestID",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "Name",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "Datefield",
                    "type": "string",
                    "meta:xdmType": "date-time",
                    "xdm": {
                        "type": "string",
                        "format": "date-time"
                    }
                }
            ]
        },
        "connectionSpec": {
            "id": "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
            "version": "1.0"
        }
    }'
Proprietà Descrizione
baseConnectionId ID di connessione dell'origine del database.
params.path Percorso del file di origine.
connectionSpec.id ID della specifica di connessione dell'origine del database. Consulta la sezione Appendice per un elenco degli ID delle specifiche del database.

Risposta

Una risposta corretta restituisce l'identificatore univoco (id) della nuova connessione sorgente creata. Questo ID è necessario nei passaggi successivi per creare una connessione di destinazione.

{
    "id": "b7581b59-c603-4df1-a689-d23d7ac440f3",
    "etag": "\"ef05d265-0000-0200-0000-6019e0080000\""
}

Creare uno schema XDM di destinazione

Affinché i dati di origine possano essere utilizzati in Platform, è necessario creare uno schema di destinazione per strutturare i dati di origine in base alle tue esigenze. Lo schema di destinazione viene quindi utilizzato per creare un set di dati di Platform in cui sono contenuti i dati di origine.

È possibile creare uno schema XDM di destinazione effettuando una richiesta POST al API del Registro di sistema dello schema.

Per i passaggi dettagliati su come creare uno schema XDM di destinazione, consulta l’esercitazione su creazione di uno schema tramite API.

Creare un set di dati di destinazione

È possibile creare un set di dati di destinazione eseguendo una richiesta di POST al API del servizio catalogo, fornendo l’ID dello schema di destinazione all’interno del payload.

Per i passaggi dettagliati su come creare un set di dati di destinazione, consulta l’esercitazione su creazione di un set di dati tramite API.

Creare una connessione di destinazione

Una connessione di destinazione rappresenta la connessione alla destinazione in cui i dati acquisiti arrivano. Per creare una connessione di destinazione, è necessario fornire l'ID di specifica di connessione fisso associato al Data Lake. Questo ID della specifica di connessione è: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Ora disponi degli identificatori univoci di uno schema di destinazione di un set di dati di destinazione e dell’ID delle specifiche di connessione di un data lake. Utilizzo della Flow Service API, puoi creare una connessione di destinazione specificando questi identificatori insieme al set di dati che conterrà i dati di origine in entrata.

Formato API

POST /targetConnections

Richiesta

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database target connection",
        "description": "Database target connection",
        "data": {
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "6019e0e7c5dcf718db5ebc71"
        },
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        }
    }'
Proprietà Descrizione
data.schema.id La $id dello schema XDM di destinazione.
data.schema.version Versione dello schema. Questo valore deve essere impostato application/vnd.adobe.xed-full+json;version=1, che restituisce la versione secondaria più recente dello schema.
params.dataSetId ID del set di dati di destinazione raccolto nel passaggio precedente.
connectionSpec.id ID della specifica di connessione utilizzato per connettersi al Data Lake. Questo ID è: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Risposta

Una risposta corretta restituisce l'identificatore univoco della nuova connessione di destinazione (id). Questo valore è necessario in un passaggio successivo per creare un flusso di dati.

{
    "id": "320f119a-5ac1-4ab1-88ea-eb19e674ea2e",
    "etag": "\"c0038936-0000-0200-0000-6019e1190000\""
}

Creare una mappatura

Affinché i dati di origine possano essere acquisiti in un set di dati di destinazione, devono prima essere mappati sullo schema di destinazione a cui aderisce il set di dati di destinazione.

Per creare un set di mappatura, invia una richiesta POST al gruppo mappingSets punto finale Data Prep API fornendo lo schema XDM di destinazione $id e i dettagli dei set di mappatura che desideri creare.

Formato API

POST /mappingSets

Richiesta

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
        "xdmVersion": "1.0",
        "id": null,
        "mappings": [
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "TestID",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.fullName",
                "sourceAttribute": "Name",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.birthDate",
                "sourceAttribute": "Datefield",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            }
        ]
    }'
Proprietà Descrizione
xdmSchema La $id dello schema XDM di destinazione.

Risposta

Una risposta corretta restituisce i dettagli della mappatura appena creata, incluso il relativo identificatore univoco (id). Questo ID è necessario in un passaggio successivo per creare un flusso di dati.

{
    "id": "0b090130b58b4819afc78b6dc98b484d",
    "version": 0,
    "createdDate": 1612309018666,
    "modifiedDate": 1612309018666,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

Recupera specifiche del flusso di dati

Un flusso di dati è responsabile della raccolta dei dati da origini e del loro inserimento in Platform. Per creare un flusso di dati, devi prima ottenere le specifiche del flusso di dati eseguendo una richiesta di GET al Flow Service API. Le specifiche del flusso di dati sono responsabili della raccolta di dati da un database esterno o da un sistema NoSQL.

Formato API

GET /flowSpecs?property=name=="CRMToAEP"

Richiesta

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

Risposta

Una risposta corretta restituisce i dettagli della specifica del flusso di dati responsabile dell’inserimento dei dati dall’origine in Platform. La risposta include le specifiche di flusso univoche id necessario per creare un nuovo flusso di dati.

{
    "items": [
        {
            "id": "14518937-270c-4525-bdec-c2ba7cce3860",
            "name": "CRMToAEP",
            "providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
            "version": "1.0",
            "sourceConnectionSpecIds": [
                "3416976c-a9ca-4bba-901a-1f08f66978ff",
                "38ad80fe-8b06-4938-94f4-d4ee80266b07",
                "d771e9c1-4f26-40dc-8617-ce58c4b53702",
                "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
                "cc6a4487-9e91-433e-a3a3-9cf6626c1806",
                "3000eb99-cd47-43f3-827c-43caf170f015",
                "26d738e0-8963-47ea-aadf-c60de735468a",
                "74a1c565-4e59-48d7-9d67-7c03b8a13137",
                "cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
                "4f63aa36-bd48-4e33-bb83-49fbcd11c708",
                "cb66ab34-8619-49cb-96d1-39b37ede86ea",
                "eb13cb25-47ab-407f-ba89-c0125281c563",
                "1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
                "37b6bf40-d318-4655-90be-5cd6f65d334b",
                "a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
                "221c7626-58f6-4eec-8ee2-042b0226f03b",
                "a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
                "6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
                "aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
                "8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
                "ecde33f2-c56f-46cc-bdea-ad151c16cd69",
                "102706fb-a5cd-42ee-afe0-bc42f017ff43",
                "09182899-b429-40c9-a15a-bf3ddbc8ced7",
                "0479cc14-7651-4354-b233-7480606c2ac3",
                "d6b52d86-f0f8-475f-89d4-ce54c8527328",
                "a8f4d393-1a6b-43f3-931f-91a16ed857f4",
                "1fe283f6-9bec-11ea-bb37-0242ac130002"
            ],
            "targetConnectionSpecIds": [
                "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
            ],
            "optionSpec": {
                "name": "OptionSpec",
                "spec": {
                    "$schema": "http://json-schema.org/draft-07/schema#",
                    "type": "object",
                    "properties": {
                        "errorDiagnosticsEnabled": {
                            "title": "Error diagnostics.",
                            "description": "Flag to enable detailed and sample error diagnostics summary.",
                            "type": "boolean",
                            "default": false
                        },
                        "partialIngestionPercent": {
                            "title": "Partial ingestion threshold.",
                            "description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
                            "type": "number",
                            "exclusiveMinimum": 0
                        }
                    }
                }
            },
            "transformationSpecs": [
                {
                    "name": "Copy",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "properties": {
                            "deltaColumn": {
                                "type": "object",
                                "properties": {
                                    "name": {
                                        "type": "string"
                                    },
                                    "dateFormat": {
                                        "type": "string"
                                    },
                                    "timezone": {
                                        "type": "string"
                                    }
                                },
                                "required": [
                                    "name"
                                ]
                            }
                        },
                        "required": [
                            "deltaColumn"
                        ]
                    }
                },
                {
                    "name": "Mapping",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "description": "defines various params required for different mapping from source to target",
                        "properties": {
                            "mappingId": {
                                "type": "string"
                            },
                            "mappingVersion": {
                                "type": "string"
                            }
                        }
                    }
                }
            ],
            "scheduleSpec": {
                "name": "PeriodicSchedule",
                "type": "Periodic",
                "spec": {
                    "$schema": "http://json-schema.org/draft-07/schema#",
                    "type": "object",
                    "properties": {
                        "startTime": {
                            "description": "epoch time",
                            "type": "integer"
                        },
                        "frequency": {
                            "type": "string",
                            "enum": [
                                "once",
                                "minute",
                                "hour",
                                "day",
                                "week"
                            ]
                        },
                        "interval": {
                            "type": "integer"
                        },
                        "backfill": {
                            "type": "boolean",
                            "default": true
                        }
                    },
                    "required": [
                        "startTime",
                        "frequency"
                    ],
                    "if": {
                        "properties": {
                            "frequency": {
                                "const": "once"
                            }
                        }
                    },
                    "then": {
                        "allOf": [
                            {
                                "not": {
                                    "required": [
                                        "interval"
                                    ]
                                }
                            },
                            {
                                "not": {
                                    "required": [
                                        "backfill"
                                    ]
                                }
                            }
                        ]
                    },
                    "else": {
                        "required": [
                            "interval"
                        ],
                        "if": {
                            "properties": {
                                "frequency": {
                                    "const": "minute"
                                }
                            }
                        },
                        "then": {
                            "properties": {
                                "interval": {
                                    "minimum": 15
                                }
                            }
                        },
                        "else": {
                            "properties": {
                                "interval": {
                                    "minimum": 1
                                }
                            }
                        }
                    }
                }
            },
            "attributes": {
                "notification": {
                    "category": "sources",
                    "flowRun": {
                        "enabled": true
                    }
                }
            },
            "permissionsInfo": {
                "view": [
                    {
                        "@type": "lowLevel",
                        "name": "EnterpriseSource",
                        "permissions": [
                            "read"
                        ]
                    }
                ],
                "manage": [
                    {
                        "@type": "lowLevel",
                        "name": "EnterpriseSource",
                        "permissions": [
                            "write"
                        ]
                    }
                ]
            }
        }
    ]
}

Creare un flusso di dati

L’ultimo passo verso la raccolta dei dati è quello di creare un flusso di dati. A questo punto, è necessario che siano preparati i seguenti valori richiesti:

Un flusso di dati è responsabile della pianificazione e della raccolta dei dati da un’origine. È possibile creare un flusso di dati eseguendo una richiesta di POST fornendo i valori precedentemente menzionati all’interno del payload della richiesta.

Per pianificare un’acquisizione, è innanzitutto necessario impostare il valore dell’ora di inizio in modo che l’ora di inizio sia espressa in secondi. Quindi, è necessario impostare il valore della frequenza su una delle cinque opzioni: once, minute, hour, dayoppure week. Il valore dell’intervallo indica il periodo tra due acquisizioni consecutive e la creazione di un’acquisizione una tantum non richiede l’impostazione di un intervallo. Per tutte le altre frequenze, il valore dell'intervallo deve essere impostato su uguale o maggiore di 15.

Formato API

POST /flows

Richiesta

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database dataflow using BigQuery",
        "description": "collecting test1.Mytable",
        "flowSpec": {
            "id": "14518937-270c-4525-bdec-c2ba7cce3860",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "b7581b59-c603-4df1-a689-d23d7ac440f3"
        ],
        "targetConnectionIds": [
            "320f119a-5ac1-4ab1-88ea-eb19e674ea2e"
        ],
        "transformations": [
            {
                "name": "Copy",
                "params": {
                    "deltaColumn": {
                        "name": "Datefield",
                        "dateFormat": "YYYY-MM-DD",
                        "timezone": "UTC"
                    }
                }
            },
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "0b090130b58b4819afc78b6dc98b484d",
                    "mappingVersion": 0
                }
            }
        ],
        "scheduleParams": {
            "startTime": "1612310466",
            "frequency":"minute",
            "interval":"15",
            "backfill": "true"
        }
    }'
Proprietà Descrizione
flowSpec.id La ID delle specifiche di flusso recuperato nel passaggio precedente.
sourceConnectionIds La ID connessione di origine recuperato in un passaggio precedente.
targetConnectionIds La ID connessione di destinazione recuperato in un passaggio precedente.
transformations.params.mappingId La ID mappatura recuperato in un passaggio precedente.
transformations.params.deltaColum Colonna designata utilizzata per distinguere tra dati nuovi ed esistenti. I dati incrementali verranno acquisiti in base al timestamp della colonna selezionata. Il formato della data supportato per deltaColumn è yyyy-MM-dd HH:mm:ss. Se si utilizza Azure Table Storage, il formato supportato per deltaColumn è yyyy-MM-ddTHH:mm:ssZ.
transformations.params.mappingId ID di mappatura associato al database.
scheduleParams.startTime Ora di inizio del flusso di dati in epoch time.
scheduleParams.frequency Frequenza con cui il flusso di dati raccoglie i dati. I valori accettabili includono: once, minute, hour, dayoppure week.
scheduleParams.interval L'intervallo indica il periodo tra due esecuzioni di flusso consecutive. Il valore dell'intervallo deve essere un numero intero diverso da zero. L'intervallo non è necessario quando la frequenza è impostata come once e deve essere maggiore o uguale a 15 per altri valori di frequenza.

Risposta

Una risposta corretta restituisce l'ID (id) del flusso di dati appena creato.

{
    "id": "2edc08ac-4df5-4fe6-936f-81a19ce92f5c",
    "etag": "\"770029f8-0000-0200-0000-6019e7d40000\""
}

Monitorare il flusso di dati

Una volta creato il flusso di dati, puoi monitorare i dati che vengono acquisiti tramite di esso per visualizzare informazioni sulle esecuzioni del flusso, lo stato di completamento e gli errori. Per ulteriori informazioni su come monitorare i flussi di dati, consulta l’esercitazione su monitoraggio dei flussi di dati nell’API

Passaggi successivi

Seguendo questa esercitazione, hai creato un connettore di origine per raccogliere dati da un database su base pianificata. I dati in arrivo possono ora essere utilizzati dai servizi della piattaforma a valle, come Real-time Customer Profile e Data Science Workspace. Per ulteriori informazioni, consulta i seguenti documenti:

Appendice

Nella sezione seguente sono elencati i diversi connettori sorgente di archiviazione cloud e le relative specifiche di connessione.

Specifica di connessione

Nome del connettore ID della specifica di connessione
Amazon Redshift 3416976c-a9ca-4bba-901a-1f08f66978ff
Apache Hive su Azure HDInsights aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f
Apache Spark su Azure HDInsights 6a8d82bc-1caf-45d1-908d-cadabc9d63a6
Azure Data Explorer 0479cc14-7651-4354-b233-7480606c2ac3
Azure Synapse Analytics a49bcc7d-8038-43af-b1e4-5a7a089a7d79
Azure Table Storage ecde33f2-c56f-46cc-bdea-ad151c16cd69
Couchbase 1fe283f6-9bec-11ea-bb37-0242ac130002
Google BigQuery 3c9b37f8-13a6-43d8-bad3-b863b941fedd
Greenplum 37b6bf40-d318-4655-90be-5cd6f65d334b
IBM DB2 09182899-b429-40c9-a15a-bf3ddbc8ced7
MariaDB 000eb99-cd47-43f3-827c-43caf170f015
Microsoft SQL Server 1f372ff9-38a4-4492-96f5-b9a4e4bd00ec
MySQL 26d738e0-8963-47ea-aadf-c60de735468a
Oracle d6b52d86-f0f8-475f-89d4-ce54c8527328
Phoenix 102706fb-a5cd-42ee-afe0-bc42f017ff43
PostgreSQL 74a1c565-4e59-48d7-9d67-7c03b8a13137

In questa pagina