Créer un flux de données pour les sources de stockage cloud à l’aide de l’API Flow Service
Ce tutoriel décrit la procédure à suivre pour récupérer des données à partir d’une source de stockage cloud afin de les importer dans Platform à l’aide de l’API Flow Service.
Prise en main
Ce tutoriel nécessite une compréhension du fonctionnement des composants suivants d’Adobe Experience Platform :
-
Experience Data Model (XDM) System : framework normalisé selon lequel Experience Platform organise les données de l’expérience client.
- Principes de base de la composition des schémas : découvrez les blocs de création de base des schémas XDM, y compris les principes clés et les bonnes pratiques en matière de composition de schémas.
- Guide du développeur de Schema Registry : inclut des informations importantes à connaître avant dʼeffectuer des appels vers l’API Schema Registry. Cela inclut votre
{TENANT_ID}
, le concept de « conteneurs » et les en-têtes requis pour effectuer des requêtes (avec une attention particulière à l’en-tête Accept et à ses valeurs possibles).
-
Catalog Service : Catalogue constitue le système d’enregistrement de l’emplacement et de la liaison des données dans Experience Platform.
-
Batch ingestion : l’API Batch Ingestion vous permet d’ingérer des données dans Experience Platform sous forme de fichiers séquentiels.
-
Sandbox : Experience Platform fournit des sandbox virtuels qui divisent une instance de plateforme unique en environnements virtuels distincts pour favoriser le développement et l’évolution d’applications d’expérience digitale.
Utiliser les API Platform
Pour plus d’informations sur la manière d’effectuer correctement des appels vers les API Platform, consultez le guide de Prise en main des API Platform.
Créer une connexion source source
Vous pouvez créer une connexion source en envoyant une requête de POST au point de terminaison sourceConnections
de l’API Flow Service tout en fournissant votre identifiant de connexion de base, le chemin d’accès au fichier source à ingérer et l’identifiant de spécification de connexion correspondant de votre source.
Lors de la création d'une connexion source, vous devez également définir une valeur d'énumération pour l'attribut data format.
Utilisez les valeurs d’énumération suivantes pour les sources basées sur des fichiers :
delimited
json
parquet
Pour toutes les sources basées sur un tableau, définissez la valeur sur tabular
.
Format d’API
POST /sourceConnections
Requête
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}",
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
baseConnectionId
data.format
delimited
, JSON
et parquet
.data.properties
data.properties.columnDelimiter
,
) est utilisée comme valeur par défaut. Remarque : La propriété columnDelimiter
ne peut être utilisée que lors de l’ingestion de fichiers délimités.data.properties.encoding
UTF-8
et ISO-8859-1
. Remarque : Le paramètre encoding
n’est disponible que lors de l’ingestion de fichiers CSV délimités. D’autres types de fichiers seront ingérés avec l’encodage par défaut, UTF-8
.data.properties.compressionType
bzip2
, gzip
, deflate
, zipDeflate
, tarGzip
et tar
. Remarque : La propriété compressionType
ne peut être utilisée que lors de l’ingestion de fichiers délimités ou JSON.params.path
/acme/summerCampaign/*.csv
ingère l’intégralité du dossier /acme/summerCampaign/
.params.type
file
pour ingérer un fichier individuel et le type folder
pour ingérer un dossier entier.connectionSpec.id
Réponse
Une réponse réussie renvoie l’identifiant unique (id
) de la nouvelle connexion source. Cet identifiant est requis lors d’une étape ultérieure pour créer un flux de données.
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
Utilisation d’expressions régulières pour sélectionner un ensemble spécifique de fichiers à ingérer regex
Vous pouvez utiliser des expressions régulières pour ingérer un ensemble particulier de fichiers de votre source vers Platform lors de la création d’une connexion source.
Format d’API
POST /sourceConnections
Requête
Dans l’exemple ci-dessous, l’expression régulière est utilisée dans le chemin d’accès au fichier pour spécifier l’ingestion de tous les fichiers CSV dont le nom contient premium
.
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/*premium*.csv",
"type": "folder"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Configurer une connexion source pour ingérer les données de manière récursive
Lors de la création d’une connexion source, vous pouvez utiliser le paramètre recursive
pour ingérer des données à partir de dossiers profondément imbriqués.
Format d’API
POST /sourceConnections
Requête
Dans l’exemple ci-dessous, le paramètre recursive: true
informe Flow Service de lire tous les sous-dossiers de manière récursive pendant le processus d’ingestion.
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source with recursive ingestion",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/customers/premium/buyers/recursive",
"type": "folder",
"recursive": true
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Créer un schéma XDM cible target-schema
Pour que les données sources soient utilisées dans Platform, un schéma cible doit être créé pour structurer les données sources en fonction de vos besoins. Le schéma cible est ensuite utilisé pour créer un jeu de données Platform contenant les données sources.
Un schéma XDM cible peut être créé en adressant une requête POST à l’API Schema Registry.
Pour obtenir des instructions détaillées sur la création d’un schéma XDM cible, suivez le tutoriel sur la création d’un schéma à l’aide de l’API.
Créer un jeu de données cible target-dataset
Un jeu de données cible peut être créé en adressant une requête POST à l’API Catalog Service et en fournissant l’identifiant du schéma cible dans la payload.
Pour obtenir des instructions détaillées sur la création d’un jeu de données cible, suivez le tutoriel sur la création d’un jeu de données à l’aide de l’API.
Créer une connexion cible target-connection
Une connexion cible représente la connexion à la destination où se trouvent les données ingérées. Pour créer une connexion cible, vous devez indiquer l’identifiant de spécification de connexion fixe associé au lac de données. Cet identifiant de connexion spécifique est : c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Vous disposez désormais des identifiants uniques d’un schéma cible, d’un jeu de données cible et de l’identifiant de spécification de connexion au lac de données. À l’aide de ces identifiants, vous pouvez créer une connexion cible grâce à l’API Flow Service pour spécifier le jeu de données qui contiendra les données source entrantes.
Format d’API
POST /targetConnections
Requête
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
du schéma XDM cible.data.schema.version
application/vnd.adobe.xed-full+json;version=1
, qui renvoie la dernière version mineure du schéma.params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
.Réponse
Une réponse réussie renvoie l’identifiant unique de la nouvelle connexion cible (id
). Cet identifiant est requis aux étapes suivantes.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Créer un mappage mapping
Pour que les données sources soient ingérées dans un jeu de données cible, elles doivent d’abord être mappées au schéma cible auquel le jeu de données cible se rattache.
Pour créer un jeu de mappages, envoyez une requête POST au point d’entrée mappingSets
de l’Data Prep API tout en fournissant votre schéma XDM cible $id
et les détails des jeux de mappages que vous souhaitez créer.
Format d’API
POST /conversion/mappingSets
Requête
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
xdmSchema
Réponse
Une réponse réussie renvoie les détails du mappage nouvellement créé, y compris son identifiant unique (id
). Cette valeur est requise lors d’une étape ultérieure pour créer un flux de données.
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Récupérer des spécifications du flux de données specs
Un flux de données est chargé de collecter des données provenant de sources et de les importer dans Platform. Afin de créer un flux de données, vous devez d’abord obtenir les spécifications du flux de données responsables de la collecte des données de stockage dans le cloud.
Format d’API
GET /flowSpecs?property=name=="CloudStorageToAEP"
Requête
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Réponse
Une réponse réussie renvoie les détails de la spécification du flux de données responsable de l’importation des données de votre source dans Platform. La réponse inclut la valeur id
unique de spécification de flux requise pour créer un flux de données.
code language-json |
---|
|
Créer un flux de données
La dernière étape de la collecte des données de stockage dans le cloud consiste à créer un flux de données. Vous disposez à présent des valeurs requises suivantes :
Un flux de données est chargé de planifier et de collecter les données provenant d’une source. Vous pouvez créer un flux de données en exécutant une requête POST et en fournissant les valeurs mentionnées précédemment dans la payload.
Pour planifier une ingestion, vous devez d’abord définir la valeur de l’heure de début en temps Unix en secondes. Vous devez ensuite définir la valeur de fréquence sur l’une des cinq options suivantes : once
, minute
, hour
, day
ou week
. La valeur de l’intervalle désigne la période entre deux ingestions consécutives et aucun intervalle ne doit être défini pour la création d’une ingestion unique. Pour toutes les autres fréquences, la valeur de l’intervalle doit être égale ou supérieure à 15
.
Format d’API
POST /flows
Requête
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Platform",
"description": "Cloud Storage flow to Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
once
, minute
, hour
, day
ou week
.scheduleParams.interval
L’intervalle désigne la période entre deux exécutions consécutives de flux. La valeur de l’intervalle doit être un nombre entier non nul. La valeur minimale de l’intervalle accepté pour chaque fréquence est la suivante :
- Une fois : n/a
- Minute : 15
- Heure : 1
- Jour : 1
- Semaine : 1
Réponse
Une réponse réussie renvoie l’identifiant (id
) du flux de données nouvellement créé.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Surveiller votre flux de données
Une fois votre flux de données créé, vous pouvez surveiller les données ingérées pour afficher des informations sur les exécutions du flux, le statut d’achèvement et les erreurs. Pour plus d’informations sur la surveillance des flux de données, consultez le tutoriel sur la surveillance des flux de données dans l’API
Étapes suivantes
Grâce à ce tutoriel, vous avez créé un connecteur source permettant de collecter les données de votre espace de stockage à intervalles réguliers. Ces données entrantes peuvent désormais être utilisées par les services Platform en aval, comme Real-Time Customer Profile et Data Science Workspace. Consultez les documents suivants pour plus d’informations :
Annexe appendix
La section suivante répertorie les différents connecteurs de source de stockage dans le cloud et leurs spécifications de connexion.
Spécification de connexion
ecadc60c-7455-4d87-84dc-2a0e293d997b
86043421-563b-46ec-8e6c-e23184711bf6
4c10e202-c428-4796-9208-5f1f5732b1cf
b3ba5556-48be-44b7-8b85-ff2b69b46dc4
bf9f5905-92b7-48bf-bf20-455bc6b60a4e
be5ec48c-5b78-49d5-b8fa-7c89ec4569b8
32e8f412-cdf7-464c-9885-78184cb113fd
54e221aa-d342-4707-bcff-7a4bceef0001
c85f9425-fb21-426c-ad0b-405e9bd8a46c
bf367b0d-3d9b-4060-b67b-0d3d9bd06094