Créez un flux de données pour les sources de base de données à l’aide de l’API Flow Service.

Ce tutoriel décrit les étapes à suivre pour récupérer des données d’une source de base de données et les importer dans Platform à l’aide de l’Flow Service API.

REMARQUE

Pour créer un flux de données, vous devez déjà disposer d’un identifiant de connexion de base valide avec une source de base de données. Si vous ne disposez pas de cet identifiant, reportez-vous à la section présentation des sources pour obtenir la liste des sources de base de données avec lesquelles vous pouvez créer une connexion de base.

Prise en main

Ce tutoriel nécessite une compréhension du fonctionnement des composants suivants d’Adobe Experience Platform :

  • Experience Data Model (XDM) System : framework normalisé selon lequel Experience Platform organise les données de l’expérience client.
    • Principes de base de la composition des schémas : découvrez les blocs de création de base des schémas XDM, y compris les principes clés et les bonnes pratiques en matière de composition de schémas.
    • Guide du développeur de Schema Registry : inclut des informations importantes à connaître avant dʼeffectuer des appels vers l’API Schema Registry. Cela inclut votre {TENANT_ID}, le concept de « conteneurs » et les en-têtes requis pour effectuer des requêtes (avec une attention particulière à l’en-tête Accept et à ses valeurs possibles).
  • Catalog Service : Catalogue constitue le système d’enregistrement de l’emplacement et de la liaison des données dans Experience Platform.
  • Batch ingestion : l’API Batch Ingestion vous permet d’ingérer des données dans Experience Platform sous forme de fichiers séquentiels.
  • Sandbox : Experience Platform fournit des sandbox virtuelles qui divisent une seule instance de Platform en environnements virtuels distincts afin de favoriser le développement et l’évolution d’applications d’expérience numérique.

Utiliser les API Platform

Pour plus d’informations sur la manière d’effectuer correctement des appels vers les API Platform, consultez le guide de Prise en main des API Platform.

Créer une connexion source

Vous pouvez créer une connexion source en effectuant une requête POST à l’API Flow Service. Une connexion source se compose d’un identifiant de connexion, d’un chemin d’accès au fichier de données source et d’un identifiant de spécification de connexion.

Pour créer une connexion source, vous devez également définir une valeur d’énumération pour l’attribut du format de données.

Utilisez les valeurs d’énumération suivantes pour les connecteurs basés sur des fichiers :

Format des données Valeur d’énumération
Délimité delimited
JSON json
Parquet parquet

Pour tous les connecteurs basés sur des tableaux, définissez la valeur sur tabular.

Format d’API

POST /sourceConnections

Requête

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database source connection",
        "baseConnectionId": "6990abad-977d-41b9-a85d-17ea8cf1c0e4",
        "description": "Database source connection",
        "data": {
            "format": "tabular"
        },
        "params": {
            "tableName": "test1.Mytable",
            "columns": [
                {
                    "name": "TestID",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "Name",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "Datefield",
                    "type": "string",
                    "meta:xdmType": "date-time",
                    "xdm": {
                        "type": "string",
                        "format": "date-time"
                    }
                }
            ]
        },
        "connectionSpec": {
            "id": "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
            "version": "1.0"
        }
    }'
Propriété Description
baseConnectionId Identifiant de connexion de votre source de base de données.
params.path Chemin d’accès au fichier source.
connectionSpec.id Identifiant de spécification de connexion de la source de votre base de données. Consultez l’ annexe pour obtenir une liste des identifiants de spécification de base de données.

Réponse

Une réponse réussie renvoie l’identifiant unique (id) de la connexion source nouvellement créée. Cet identifiant est requis lors des étapes suivantes pour créer une connexion cible.

{
    "id": "b7581b59-c603-4df1-a689-d23d7ac440f3",
    "etag": "\"ef05d265-0000-0200-0000-6019e0080000\""
}

Créer un schéma XDM cible

Pour que les données sources soient utilisées dans Platform, un schéma cible doit être créé pour structurer les données sources en fonction de vos besoins. Le schéma cible est ensuite utilisé pour créer un jeu de données Platform contenant les données sources.

Un schéma XDM cible peut être créé en adressant une requête POST à l’API Schema Registry.

Pour obtenir des instructions détaillées sur la création d’un schéma XDM cible, suivez le tutoriel sur la création d’un schéma à l’aide de l’API.

Créer un jeu de données cible

Un jeu de données cible peut être créé en adressant une requête POST à l’API Catalog Service et en fournissant l’identifiant du schéma cible dans la payload.

Pour obtenir des instructions détaillées sur la création d’un jeu de données cible, suivez le tutoriel sur la création d’un jeu de données à l’aide de l’API.

Créer une connexion cible

Une connexion cible représente la connexion à la destination où se trouvent les données ingérées. Pour créer une connexion cible, vous devez indiquer l’identifiant de spécification de connexion fixe associé au lac de données. Cet identifiant de spécification de connexion est c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Vous disposez désormais des identifiants uniques d’un schéma cible, d’un jeu de données cible, ainsi que l’identifiant de spécification de connexion au lac de données. À lʼaide de l’API Flow Service, vous pouvez créer une connexion cible en spécifiant ces identifiants ainsi que le jeu de données qui contiendra les données source entrantes.

Format d’API

POST /targetConnections

Requête

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database target connection",
        "description": "Database target connection",
        "data": {
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "6019e0e7c5dcf718db5ebc71"
        },
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        }
    }'
Propriété Description
data.schema.id $id du schéma XDM cible.
data.schema.version La version du schéma. Cette valeur doit être définie sur application/vnd.adobe.xed-full+json;version=1, qui renvoie la dernière version mineure du schéma.
params.dataSetId Identifiant du jeu de données cible rassemblé à l’étape précédente.
connectionSpec.id Identifiant de spécification de connexion utilisé pour la connexion au lac de données. Cet identifiant est c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Réponse

Une réponse réussie renvoie l’identifiant unique de la nouvelle connexion cible (id). Cette valeur est requise lors d’une étape ultérieure pour créer un flux de données.

{
    "id": "320f119a-5ac1-4ab1-88ea-eb19e674ea2e",
    "etag": "\"c0038936-0000-0200-0000-6019e1190000\""
}

Créer un mappage

Pour que les données sources soient ingérées dans un jeu de données cible, elles doivent d’abord être mappées au schéma cible auquel le jeu de données cible se rattache.

Pour créer un jeu de mappage, envoyez une requête POST au point dʼentrée mappingSets de lʼData Prep API et indiquez votre schéma XDM cible $id et les détails des jeux de mappages que vous souhaitez créer.

Format d’API

POST /mappingSets

Requête

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
        "xdmVersion": "1.0",
        "id": null,
        "mappings": [
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "TestID",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.fullName",
                "sourceAttribute": "Name",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.birthDate",
                "sourceAttribute": "Datefield",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            }
        ]
    }'
Propriété Description
xdmSchema La valeur $id du schéma XDM cible.

Réponse

Une réponse réussie renvoie les détails du mappage nouvellement créé, y compris son identifiant unique (id). Cet identifiant est requis lors d’une étape ultérieure pour créer un flux de données.

{
    "id": "0b090130b58b4819afc78b6dc98b484d",
    "version": 0,
    "createdDate": 1612309018666,
    "modifiedDate": 1612309018666,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

Récupérer des spécifications du flux de données

Un flux de données est chargé de collecter des données à partir de sources et de les importer dans Platform. Pour créer un flux de données, vous devez d’abord obtenir les spécifications du flux de données en adressant une requête GET à l’API Flow Service. Les spécifications du flux de données sont chargées de collecter les données d’une base de données externe ou d’un système NoSQL.

Format d’API

GET /flowSpecs?property=name=="CRMToAEP"

Requête

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

Réponse

Une réponse réussie renvoie les détails de la spécification du flux de données responsable de l’importation des données de votre source dans Platform. La réponse inclut la valeur id unique de spécification de flux requise pour créer un flux de données.

REMARQUE

Le payload de réponse JSON ci-dessous est masqué pour plus de concision. Sélectionnez "payload" pour afficher la payload de réponse.

 Afficher la payload
{
  "id": "14518937-270c-4525-bdec-c2ba7cce3860",
  "name": "CRMToAEP",
  "providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
  "version": "1.0",
  "attributes": {
    "isSourceFlow": true,
    "flacValidationSupported": true,
    "frequency": "batch",
    "notification": {
      "category": "sources",
      "flowRun": {
        "enabled": true
      }
    }
  },
  "sourceConnectionSpecIds": [
    "3416976c-a9ca-4bba-901a-1f08f66978ff",
    "38ad80fe-8b06-4938-94f4-d4ee80266b07",
    "d771e9c1-4f26-40dc-8617-ce58c4b53702",
    "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
    "cc6a4487-9e91-433e-a3a3-9cf6626c1806",
    "3000eb99-cd47-43f3-827c-43caf170f015",
    "26d738e0-8963-47ea-aadf-c60de735468a",
    "74a1c565-4e59-48d7-9d67-7c03b8a13137",
    "cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
    "4f63aa36-bd48-4e33-bb83-49fbcd11c708",
    "cb66ab34-8619-49cb-96d1-39b37ede86ea",
    "eb13cb25-47ab-407f-ba89-c0125281c563",
    "1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
    "37b6bf40-d318-4655-90be-5cd6f65d334b",
    "a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
    "221c7626-58f6-4eec-8ee2-042b0226f03b",
    "a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
    "6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
    "aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
    "8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
    "ecde33f2-c56f-46cc-bdea-ad151c16cd69",
    "102706fb-a5cd-42ee-afe0-bc42f017ff43",
    "09182899-b429-40c9-a15a-bf3ddbc8ced7",
    "0479cc14-7651-4354-b233-7480606c2ac3",
    "d6b52d86-f0f8-475f-89d4-ce54c8527328",
    "a8f4d393-1a6b-43f3-931f-91a16ed857f4",
    "1fe283f6-9bec-11ea-bb37-0242ac130002",
    "fcad62f3-09b0-41d3-be11-449d5a621b69",
    "ea1c2a08-b722-11eb-8529-0242ac130003",
    "35d6c4d8-c9a9-11eb-b8bc-0242ac130003",
    "ff4274f2-c9a9-11eb-b8bc-0242ac130003",
    "ba5126ec-c9ac-11eb-b8bc-0242ac130003",
    "b2e08744-4f1a-40ce-af30-7abac3e23cf3",
    "929e4450-0237-4ed2-9404-b7e1e0a00309",
    "2acf109f-9b66-4d5e-bc18-ebb2adcff8d5",
    "2fa8af9c-2d1a-43ea-a253-f00a00c74412"
  ],
  "targetConnectionSpecIds": [
    "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
  ],
  "permissionsInfo": {
    "view": [
      {
        "@type": "lowLevel",
        "name": "EnterpriseSource",
        "permissions": [
          "read"
        ]
      }
    ],
    "manage": [
      {
        "@type": "lowLevel",
        "name": "EnterpriseSource",
        "permissions": [
          "write"
        ]
      }
    ]
  },
  "optionSpec": {
    "name": "OptionSpec",
    "spec": {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "properties": {
        "errorDiagnosticsEnabled": {
          "title": "Error diagnostics.",
          "description": "Flag to enable detailed and sample error diagnostics summary.",
          "type": "boolean",
          "default": false
        },
        "partialIngestionPercent": {
          "title": "Partial ingestion threshold.",
          "description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
          "type": "number",
          "exclusiveMinimum": 0
        }
      }
    }
  },
  "scheduleSpec": {
    "name": "PeriodicSchedule",
    "type": "Periodic",
    "spec": {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "properties": {
        "startTime": {
          "description": "epoch time",
          "type": "integer"
        },
        "frequency": {
          "type": "string",
          "enum": [
            "once",
            "minute",
            "hour",
            "day",
            "week"
          ]
        },
        "interval": {
          "type": "integer"
        },
        "backfill": {
          "type": "boolean",
          "default": true
        }
      },
      "required": [
        "startTime",
        "frequency"
      ],
      "if": {
        "properties": {
          "frequency": {
            "const": "once"
          }
        }
      },
      "then": {
        "allOf": [
          {
            "not": {
              "required": [
                "interval"
              ]
            }
          },
          {
            "not": {
              "required": [
                "backfill"
              ]
            }
          }
        ]
      },
      "else": {
        "required": [
          "interval"
        ],
        "if": {
          "properties": {
            "frequency": {
              "const": "minute"
            }
          }
        },
        "then": {
          "properties": {
            "interval": {
              "minimum": 15
            }
          }
        },
        "else": {
          "properties": {
            "interval": {
              "minimum": 1
            }
          }
        }
      }
    }
  },
  "transformationSpec": [
    {
      "name": "Copy",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "properties": {
          "deltaColumn": {
            "type": "object",
            "properties": {
              "name": {
                "type": "string"
              },
              "dateFormat": {
                "type": "string"
              },
              "timezone": {
                "type": "string"
              }
            },
            "required": [
              "name"
            ]
          }
        },
        "required": [
          "deltaColumn"
        ]
      }
    },
    {
      "name": "Mapping",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "description": "defines various params required for different mapping from source to target",
        "properties": {
          "mappingId": {
            "type": "string"
          },
          "mappingVersion": {
            "type": "string"
          }
        }
      }
    }
  ],
  "runSpec": {
      "name": "ProviderParams",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "description": "defines various params required for creating flow run.",
        "properties": {
          "startTime": {
            "type": "integer",
            "description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
          },
          "windowStartTime": {
            "type": "integer",
            "description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
          },
          "windowEndTime": {
            "type": "integer",
            "description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
          },
          "deltaColumn": {
            "type": "object",
            "description": "The delta column is required to partition the data and separate newly ingested data from historic data.",
            "properties": {
              "name": {
                "type": "string"
              },
              "dateFormat": {
                "type": "string"
              },
              "timezone": {
                "type": "string"
              }
            },
            "required": [
              "name"
            ]
          }
        },
        "required": [
          "startTime",
          "windowStartTime",
          "windowEndTime",
          "deltaColumn"
        ]
      }
    }
}

Créer un flux de données

La dernière étape de la collecte de données consiste à créer un flux de données. À ce stade, vous devez disposer des valeurs requises suivantes :

Un flux de données est chargé de planifier et de collecter les données provenant d’une source. Vous pouvez créer un flux de données en effectuant une requête POST et en fournissant les valeurs mentionnées précédemment dans la payload de la requête.

Pour planifier une ingestion, vous devez d’abord définir la valeur de l’heure de début en temps Unix en secondes. Vous devez ensuite définir la valeur de fréquence sur l’une des cinq options suivantes : once, minute, hour, day ou week. La valeur de l’intervalle désigne la période entre deux ingestions consécutives et aucun intervalle ne doit être défini pour la création d’une ingestion unique. Pour toutes les autres fréquences, la valeur de l’intervalle doit être égale ou supérieure à 15.

Format d’API

POST /flows

Requête

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database dataflow using BigQuery",
        "description": "collecting test1.Mytable",
        "flowSpec": {
            "id": "14518937-270c-4525-bdec-c2ba7cce3860",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "b7581b59-c603-4df1-a689-d23d7ac440f3"
        ],
        "targetConnectionIds": [
            "320f119a-5ac1-4ab1-88ea-eb19e674ea2e"
        ],
        "transformations": [
            {
                "name": "Copy",
                "params": {
                    "deltaColumn": {
                        "name": "Datefield",
                        "dateFormat": "YYYY-MM-DD",
                        "timezone": "UTC"
                    }
                }
            },
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "0b090130b58b4819afc78b6dc98b484d",
                    "mappingVersion": 0
                }
            }
        ],
        "scheduleParams": {
            "startTime": "1612310466",
            "frequency":"minute",
            "interval":"15",
            "backfill": "true"
        }
    }'

+++

Propriété Description
flowSpec.id Identifiant de spécification de flux récupéré à l’étape précédente.
sourceConnectionIds Identifiant de connexion source récupéré lors d’une étape précédente.
targetConnectionIds Identifiant de connexion cible récupéré lors d’une étape précédente.
transformations.params.mappingId Identifiant de mappage récupéré lors d’une étape précédente.
transformations.params.deltaColum Colonne désignée utilisée pour différencier les données nouvelles des données existantes. Les données incrémentielles seront ingérées en fonction de la date et de l’heure de la colonne sélectionnée. Le format de date pris en charge pour deltaColumn est yyyy-MM-dd HH:mm:ss. Si vous utilisez Azure Table Storage, le format pris en charge pour deltaColumn est yyyy-MM-ddTHH:mm:ssZ.
transformations.params.mappingId Identifiant de mappage associé à la base de données.
scheduleParams.startTime Heure de début du flux de données en temps Unix.
scheduleParams.frequency Fréquence de collecte des données par le flux de données. Les valeurs possibles sont les suivantes : once, minute, hour, day ou week.
scheduleParams.interval L’intervalle désigne la période entre deux exécutions consécutives de flux. La valeur de l’intervalle doit être un nombre entier non nul. L’intervalle n’est pas requis lorsque la fréquence est définie sur once et doit être supérieur ou égal à 15 pour d’autres valeurs de fréquence.

Réponse

Une réponse réussie renvoie l’identifiant (id) du flux de données nouvellement créé.

{
    "id": "2edc08ac-4df5-4fe6-936f-81a19ce92f5c",
    "etag": "\"770029f8-0000-0200-0000-6019e7d40000\""
}

Surveiller votre flux de données

Une fois votre flux de données créé, vous pouvez surveiller les données ingérées pour afficher des informations sur les exécutions du flux, le statut d’achèvement et les erreurs. Pour plus d’informations sur la surveillance des flux de données, suivez le tutoriel sur la surveillance des flux de données dans l’API.

Étapes suivantes

Vous êtes arrivé au bout de ce tutoriel, félicitations ! Grâce à celui-ci, vous avez créé un connecteur source pour collecter des données d’une base de données à intervalles réguliers. Les données reçues peuvent désormais être utilisées par les services Platform en aval, tels que Real-time Customer Profile et Data Science Workspace. Consultez les documents suivants pour plus d’informations :

Annexe

La section suivante répertorie les différents connecteurs de source de stockage dans le cloud et leurs spécifications de connexion.

Spécification de connexion

Nom du connecteur Identifiant de spécification de connexion
Amazon Redshift 3416976c-a9ca-4bba-901a-1f08f66978ff
Apache Hive sur Azure HDInsights aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f
Apache Spark sur Azure HDInsights 6a8d82bc-1caf-45d1-908d-cadabc9d63a6
Azure Data Explorer 0479cc14-7651-4354-b233-7480606c2ac3
Azure Synapse Analytics a49bcc7d-8038-43af-b1e4-5a7a089a7d79
Azure Table Storage ecde33f2-c56f-46cc-bdea-ad151c16cd69
Couchbase 1fe283f6-9bec-11ea-bb37-0242ac130002
Google BigQuery 3c9b37f8-13a6-43d8-bad3-b863b941fedd
Greenplum 37b6bf40-d318-4655-90be-5cd6f65d334b
IBM DB2 09182899-b429-40c9-a15a-bf3ddbc8ced7
MariaDB 000eb99-cd47-43f3-827c-43caf170f015
Microsoft SQL Server 1f372ff9-38a4-4492-96f5-b9a4e4bd00ec
MySQL 26d738e0-8963-47ea-aadf-c60de735468a
Oracle d6b52d86-f0f8-475f-89d4-ce54c8527328
Phoenix 102706fb-a5cd-42ee-afe0-bc42f017ff43
PostgreSQL 74a1c565-4e59-48d7-9d67-7c03b8a13137

Sur cette page