Ce tutoriel décrit la procédure à suivre pour récupérer des données à partir d’une source de stockage cloud afin de les importer dans Platform à l’aide de l’API Flow Service .
Pour créer un flux de données, vous devez déjà disposer d’un identifiant de connexion de base valide avec une source de stockage dans le cloud. Si vous ne disposez pas de cet identifiant, reportez-vous à la section présentation des sources pour obtenir la liste des sources de stockage dans le cloud avec lesquelles vous pouvez créer une connexion de base.
Ce tutoriel nécessite une compréhension du fonctionnement des composants suivants d’Adobe Experience Platform :
{TENANT_ID}
, le concept de « conteneurs » et les en-têtes requis pour effectuer des requêtes (avec une attention particulière à l’en-tête Accept et à ses valeurs possibles).Pour plus d’informations sur la manière d’effectuer correctement des appels vers les API Platform, consultez le guide de Prise en main des API Platform.
Vous pouvez créer une connexion source en envoyant une requête de POST au sourceConnections
point d’entrée Flow Service l’API lors de la fourniture de votre identifiant de connexion de base, le chemin d’accès au fichier source que vous souhaitez ingérer et l’identifiant de spécification de connexion correspondant à votre source.
Lors de la création d'une connexion source, vous devez également définir une valeur d'énumération pour l'attribut data format.
Utilisez les valeurs d’énumération suivantes pour les sources basées sur des fichiers :
Format des données | Valeur d’énumération |
---|---|
Délimité | delimited |
JSON | json |
Parquet | parquet |
Pour toutes les sources basées sur un tableau, définissez la valeur sur tabular
.
Format d’API
POST /sourceConnections
Requête
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}"
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Propriété | Description |
---|---|
baseConnectionId |
Identifiant de connexion de base de votre source de stockage dans le cloud. |
data.format |
Le format des données que vous souhaitez importer dans Platform. Les valeurs prises en charge sont les suivantes : delimited , JSON , et parquet . |
data.properties |
(Facultatif) Ensemble de propriétés que vous pouvez appliquer à vos données lors de la création d’une connexion source. |
data.properties.columnDelimiter |
(Facultatif) Un délimiteur de colonne à un seul caractère que vous pouvez spécifier lors de la collecte de fichiers plats. Toute valeur de caractère unique est un délimiteur de colonne autorisé. Si elle n’est pas fournie, une virgule (, ) est utilisée comme valeur par défaut. Remarque: Le columnDelimiter ne peut être utilisée que lors de l’ingestion de fichiers délimités. |
data.properties.encoding |
(Facultatif) Une propriété qui définit le type de codage à utiliser lors de l’ingestion de vos données vers Platform. Les types de codage pris en charge sont les suivants : UTF-8 et ISO-8859-1 . Remarque: Le encoding n’est disponible que lors de l’ingestion de fichiers CSV délimités. Les autres types de fichiers seront ingérés avec le codage par défaut, UTF-8 . |
data.properties.compressionType |
(Facultatif) Une propriété qui définit le type de fichier compressé pour l’ingestion. Les types de fichiers compressés pris en charge sont les suivants : bzip2 , gzip , deflate , zipDeflate , tarGzip , et tar . Remarque: Le compressionType ne peut être utilisée que lors de l’ingestion de fichiers JSON ou délimités. |
params.path |
Chemin d’accès au fichier source auquel vous accédez. |
connectionSpec.id |
Identifiant de spécification de connexion associé à votre source de stockage dans le cloud spécifique. Consultez lʼannexe pour obtenir la liste des identifiants de spécification de connexion. |
Réponse
Une réponse réussie renvoie l’identifiant unique (id
) de la nouvelle connexion source. Cet identifiant est requis lors d’une étape ultérieure pour créer un flux de données.
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
Pour que les données sources soient utilisées dans Platform, un schéma cible doit être créé pour structurer les données sources en fonction de vos besoins. Le schéma cible est ensuite utilisé pour créer un jeu de données Platform contenant les données sources.
Un schéma XDM cible peut être créé en adressant une requête POST à l’API Schema Registry.
Pour obtenir des instructions détaillées sur la création d’un schéma XDM cible, suivez le tutoriel sur la création d’un schéma à l’aide de l’API.
Un jeu de données cible peut être créé en adressant une requête POST à l’API Catalog Service et en fournissant l’identifiant du schéma cible dans la payload.
Pour obtenir des instructions détaillées sur la création d’un jeu de données cible, suivez le tutoriel sur la création d’un jeu de données à l’aide de l’API.
Une connexion cible représente la connexion à la destination où se trouvent les données ingérées. Pour créer une connexion cible, vous devez indiquer l’identifiant de spécification de connexion fixe associé au lac de données. Cet identifiant de connexion spécifique est : c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Vous disposez désormais des identifiants uniques d’un schéma cible, d’un jeu de données cible et de l’identifiant de spécification de connexion au lac de données. À l’aide de ces identifiants, vous pouvez créer une connexion cible grâce à l’API Flow Service pour spécifier le jeu de données qui contiendra les données source entrantes.
Format d’API
POST /targetConnections
Requête
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
Propriété | Description |
---|---|
data.schema.id |
$id du schéma XDM cible. |
data.schema.version |
La version du schéma. Cette valeur doit être définie sur application/vnd.adobe.xed-full+json;version=1 , qui renvoie la dernière version mineure du schéma. |
params.dataSetId |
L’identifiant du jeu de données cible. |
connectionSpec.id |
Identifiant de spécification de connexion fixe au lac de données. Cet identifiant est : c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
Réponse
Une réponse réussie renvoie l’identifiant unique de la nouvelle connexion cible (id
). Cet identifiant est requis aux étapes suivantes.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Pour que les données sources soient ingérées dans un jeu de données cible, elles doivent d’abord être mappées au schéma cible auquel le jeu de données cible se rattache.
Pour créer un jeu de mappages, envoyez une requête POST au point d’entrée mappingSets
de l’Data Prep API tout en fournissant votre schéma XDM cible $id
et les détails des jeux de mappages que vous souhaitez créer.
Vous pouvez désormais mapper des types de données complexes, tels que des tableaux dans des fichiers JSON en utilisant un connecteur source d’espace de stockage.
Format d’API
POST /conversion/mappingSets
Requête
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
Propriété | Description |
---|---|
xdmSchema |
Identifiant du schéma XDM cible. |
Réponse
Une réponse réussie renvoie les détails du mappage nouvellement créé, y compris son identifiant unique (id
). Cette valeur est requise lors d’une étape ultérieure pour créer un flux de données.
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Un flux de données est chargé de collecter des données provenant de sources et de les importer dans Platform. Afin de créer un flux de données, vous devez d’abord obtenir les spécifications du flux de données responsables de la collecte des données de stockage dans le cloud.
Format d’API
GET /flowSpecs?property=name=="CloudStorageToAEP"
Requête
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Réponse
Une réponse réussie renvoie les détails de la spécification du flux de données responsable de l’importation des données de votre source dans Platform. La réponse inclut la valeur id
unique de spécification de flux requise pour créer un flux de données.
{
"items": [
{
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"name": "CloudStorageToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"sourceConnectionSpecIds": [
"b3ba5556-48be-44b7-8b85-ff2b69b46dc4",
"ecadc60c-7455-4d87-84dc-2a0e293d997b",
"b7829c2f-2eb0-4f49-a6ee-55e33008b629",
"4c10e202-c428-4796-9208-5f1f5732b1cf",
"fb2e94c9-c031-467d-8103-6bd6e0a432f2",
"32e8f412-cdf7-464c-9885-78184cb113fd",
"b7bf2577-4520-42c9-bae9-cad01560f7bc",
"998b8ae3-cec0-43b7-8abe-40b1eb4ee069",
"be5ec48c-5b78-49d5-b8fa-7c89ec4569b8"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"transformationSpecs": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"endTime": {
"description": "epoch time",
"type": "integer"
},
"interval": {
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"minute",
"hour",
"day",
"week"
]
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency",
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
},
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
}
}
]
}
La dernière étape de la collecte des données de stockage dans le cloud consiste à créer un flux de données. Vous disposez à présent des valeurs requises suivantes :
Un flux de données est chargé de planifier et de collecter les données provenant d’une source. Vous pouvez créer un flux de données en exécutant une requête POST et en fournissant les valeurs mentionnées précédemment dans la payload.
Pour l’ingestion par lots, chaque flux de données qui s’ensuit sélectionne les fichiers à ingérer à partir de votre source en fonction de la date et heure de leur dernière modification. Cela signifie que les flux de données par lot sélectionnent des fichiers de la source qui sont nouveaux ou qui ont été modifiés depuis la dernière exécution du flux de données.
Pour planifier une ingestion, vous devez d’abord définir la valeur de l’heure de début en temps Unix en secondes. Vous devez ensuite définir la valeur de fréquence sur l’une des cinq options suivantes : once
, minute
, hour
, day
ou week
. La valeur de l’intervalle désigne la période entre deux ingestions consécutives et aucun intervalle ne doit être défini pour la création d’une ingestion unique. Pour toutes les autres fréquences, la valeur de l’intervalle doit être égale ou supérieure à 15
.
Il est vivement recommandé de planifier votre flux de données pour une ingestion unique lors de l’utilisation du connecteur FTP.
Format d’API
POST /flows
Requête
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Platform",
"description": "Cloud Storage flow to Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
Propriété | Description |
---|---|
flowSpec.id |
Identifiant de spécification de flux récupéré à l’étape précédente. |
sourceConnectionIds |
Identifiant de connexion source récupéré lors d’une étape précédente. |
targetConnectionIds |
Identifiant de connexion cible récupéré lors d’une étape précédente. |
transformations.params.mappingId |
L’identifiant de mappage récupéré lors d’une étape précédente. |
scheduleParams.startTime |
Heure de début du flux de données en temps Unix. |
scheduleParams.frequency |
Fréquence de collecte des données par le flux de données. Les valeurs possibles sont les suivantes : once , minute , hour , day ou week . |
scheduleParams.interval |
L’intervalle désigne la période entre deux exécutions consécutives de flux. La valeur de l’intervalle doit être un nombre entier non nul. L’intervalle n’est pas requis lorsque la fréquence est définie sur once et doit être supérieur ou égal à 15 pour d’autres valeurs de fréquence. |
Réponse
Une réponse réussie renvoie l’identifiant (id
) du flux de données nouvellement créé.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Une fois votre flux de données créé, vous pouvez surveiller les données ingérées pour afficher des informations sur les exécutions du flux, le statut d’achèvement et les erreurs. Pour plus d’informations sur la surveillance des flux de données, consultez le tutoriel sur la surveillance des flux de données dans l’API
Grâce à ce tutoriel, vous avez créé un connecteur source permettant de collecter les données de votre espace de stockage à intervalles réguliers. Ces données entrantes peuvent désormais être utilisées par les services Platform en aval, comme Real-time Customer Profile et Data Science Workspace. Consultez les documents suivants pour plus d’informations :
La section suivante répertorie les différents connecteurs de source de stockage dans le cloud et leurs spécifications de connexion.
Nom du connecteur | Spécification de connexion |
---|---|
Amazon S3 (S3) | ecadc60c-7455-4d87-84dc-2a0e293d997b |
Amazon Kinesis (Kinesis) | 86043421-563b-46ec-8e6c-e23184711bf6 |
Azure Blob (Blob) | 4c10e202-c428-4796-9208-5f1f5732b1cf |
Azure Data Lake Storage Gen2 (ADLS Gen2) | b3ba5556-48be-44b7-8b85-ff2b69b46dc4 |
Azure Event Hubs (Concentrateur d’événements) | bf9f5905-92b7-48bf-bf20-455bc6b60a4e |
Azure File Storage | be5ec48c-5b78-49d5-b8fa-7c89ec4569b8 |
Google Cloud Storage | 32e8f412-cdf7-464c-9885-78184cb113fd |
HDFS | 54e221aa-d342-4707-bcff-7a4bceef0001 |
Oracle Object Storage | c85f9425-fb21-426c-ad0b-405e9bd8a46c |
SFTP | bf367b0d-3d9b-4060-b67b-0d3d9bd06094 |