Collecte de données en flux continu via les connecteurs et les API source

Flow Service est utilisée pour collecter et centraliser les données client provenant de diverses sources disparates à Adobe Experience Platform. Le service fournit une interface utilisateur et une API RESTful à partir de laquelle toutes les sources prises en charge sont connectables.

Ce didacticiel décrit les étapes à suivre pour récupérer les données d’un connecteur source de flux continu et les amener à Experience Platform l’aide de l’ Flow Service API.

Prise en main

Ce didacticiel nécessite que vous disposiez d’un ID de connexion valide pour un connecteur de flux continu. Si vous ne disposez pas de ces informations, consultez les didacticiels suivants sur la création d’une connexion source de flux continu avant de tenter ce didacticiel :

Ce didacticiel nécessite également une bonne compréhension des composants suivants de Adobe Experience Platform :

  • Experience Data Model (XDM) System: Cadre normalisé selon lequel l’Experience Platform organise les données d’expérience client.
    • Principes de base de la composition des schémas : découvrez les blocs de création de base des schémas XDM, y compris les principes clés et les bonnes pratiques en matière de composition de schémas.
    • Guidedu développeur du registre des schémas : Inclut des informations importantes que vous devez connaître pour pouvoir effectuer des appels à l'API de registre du Schéma. Cela inclut votre {TENANT_ID}, le concept de « conteneurs » et les en-têtes requis pour effectuer des requêtes (avec une attention particulière à l’en-tête Accept et à ses valeurs possibles).
  • Catalog Service: Le catalogue est le système d’enregistrement pour l’emplacement et le lignage des données à l’intérieur Experience Platform.
  • Streaming ingestion: La fonction d’assimilation en flux continu pour Platform permet aux utilisateurs d’envoyer en temps Experience Platform réel des données depuis les périphériques client et serveur.
  • Sandbox: Experience Platform fournit des sandbox virtuels qui partitionnent une Platform instance unique en environnements virtuels distincts pour aider à développer et développer des applications d'expérience numérique.

The following sections provide additional information that you will need to know in order to successfully collect streaming data using the Flow Service API.

Lecture d’exemples d’appels API

Ce tutoriel fournit des exemples d’appels API pour démontrer comment formater vos requêtes. Il s’agit notamment de chemins d’accès, d’en-têtes requis et de payloads de requêtes correctement formatés. L’exemple JSON renvoyé dans les réponses de l’API est également fourni. Pour plus d’informations sur les conventions utilisées dans la documentation pour les exemples d’appels d’API, voir la section concernant la lecture d’exemples d’appels d’API dans le guide de dépannageExperience Platform.

Collecte des valeurs des en-têtes requis

Pour lancer des appels aux API Platform, vous devez d’abord suivre le tutoriel d’authentification. Le tutoriel d’authentification fournit les valeurs de chacun des en-têtes requis dans tous les appels d’API Experience Platform, comme indiqué ci-dessous :

  • Authorization: Bearer {ACCESS_TOKEN}
  • x-api-key: {API_KEY}
  • x-gw-ims-org-id: {IMS_ORG}

All resources in Experience Platform, including those belonging to Flow Service, are isolated to specific virtual sandboxes. All requests to Platform APIs require a header that specifies the name of the sandbox the operation will take place in:

  • x-sandbox-name: {SANDBOX_NAME}

Toutes les requêtes qui contiennent un payload (POST, PUT, PATCH) nécessitent un en-tête de type de média supplémentaire :

  • Content-Type: application/json

Création d’une connexion source

You can create a source connection by making a POST request to the Flow Service API. Une connexion source se compose d’un identifiant de connexion, d’un chemin d’accès au fichier de données source et d’un identifiant de spécification de connexion.

Pour créer une connexion source, vous devez également définir une valeur d’énumération pour l’attribut de format de données.

Utilisez les valeurs d’énumération suivantes pour les connecteurs basés sur des fichiers :

Sur le format des données saisies Valeur maximale
Délimité delimited
JSON json
Parquet parquet

Pour tous les connecteurs basés sur un tableau, définissez la valeur sur tabular.

Format d’API

POST /sourceConnections

Requête

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Test source connector for streaming data",
        "providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
        "connectionId": "f6aa6c58-3c3d-4c59-aa6c-583c3d6c599c",
        "description": "Test source connector for streaming data",
        "data": {
            "format": "delimited"
        },
            "connectionSpec": {
            "id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
            "version": "1.0"
        }
    }'
Propriété Description
providerId ID de fournisseur de votre connecteur de diffusion en continu.
connectionId ID de connexion unique de votre connecteur de diffusion en continu.
connectionSpec.id Identifiant de spécification de connexion associé à votre connecteur de diffusion en continu spécifique.

Réponse

Une réponse réussie renvoie l'identifiant unique (id) de la connexion source nouvellement créée. Cet identifiant est nécessaire à une étape ultérieure pour créer un flux de données.

{
    "id": "2abd97c4-91bb-4c93-bd97-c491bbfc933d",
    "etag": "\"66013508-0000-0200-0000-5f6e2ae70000\""
}

Création d’un schéma XDM cible

Pour que les données source soient utilisées dans Platform, un schéma de cible doit être créé pour structurer les données source en fonction de vos besoins. Le schéma de cible est ensuite utilisé pour créer un Platform jeu de données dans lequel les données source sont contenues. Ce schéma XDM de cible étend également la Individual Profile classe XDM.

Un schéma XDM de cible peut être créé en exécutant une requête de POST sur l'API de registre duSchéma.

Format d’API

POST /tenant/schemas

Requête

L'exemple de demande suivant crée un schéma XDM qui étend la Individual Profile classe XDM.

curl -X POST \
    'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "type": "object",
        "title": "Sample schema for a streaming connector",
        "description": "Sample schema for a streaming connector",
        "allOf": [
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
            }
        ],
        "meta:containerId": "tenant",
        "meta:resourceType": "schemas",
        "meta:xdmType": "object",
        "meta:class": "https://ns.adobe.com/xdm/context/profile"
    }'

Réponse

A successful response returns details of the newly created schema including its unique identifier ($id). Cet identifiant est requis dans les étapes suivantes pour créer un jeu de données de cible, un mappage et un flux de données.

{
    "$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
    "meta:altId": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
    "meta:resourceType": "schemas",
    "version": "1.0",
    "title": "Sample schema for a streaming connector",
    "type": "object",
    "description": "Sample schema for a streaming connector",
    "allOf": [
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
            "type": "object",
            "meta:xdmType": "object"
        }
    ],
    "refs": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "imsOrg": "{IMS_ORG}",
    "meta:extensible": false,
    "meta:abstract": false,
    "meta:extends": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/common/auditable",
        "https://ns.adobe.com/xdm/data/record",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "meta:xdmType": "object",
    "meta:registryMetadata": {
        "repo:createdDate": 1604960074752,
        "repo:lastModifiedDate": 1604960074752,
        "xdm:createdClientId": "{CREATED_CLIENT_ID}",
        "xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
        "xdm:createdUserId": "{CREATED_USER_ID}",
        "xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
        "eTag": "8522a151effd974429518ed90c3eaf6efc9bf6ffb6644087a85c6d4455dcd045",
        "meta:globalLibVersion": "1.16.1"
    },
    "meta:class": "https://ns.adobe.com/xdm/context/profile",
    "meta:containerId": "tenant",
    "meta:sandboxId": "{SANDBOX_ID}",
    "meta:sandboxType": "production",
    "meta:tenantNamespace": "_{TENANT_ID}"
}

Création d’un jeu de données cible

Un jeu de données de cible peut être créé en exécutant une requête de POST sur l’API Catalog Service, en fournissant l’identifiant du schéma de cible dans la charge utile.

Format d’API

POST /catalog/dataSets

Requête

curl -X POST \
    'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "schemaRef": {
            "id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
            "contentType": "application/vnd.adobe.xed-full-notext+json; version=1.1"
        },
        "fileDescription": {
            "format": "parquet"
        },
        "tags": {
            "identity": [
            "enabled:true"
            ],
            "profile": [
            "enabled:true"
            ]
        },
        "name": "Test streaming dataset"
    }'
Propriété Description
schemaRef.id ID du schéma XDM de cible.

Réponse

A successful response returns an array containing the ID of the newly created dataset in the format "@/datasets/{DATASET_ID}". L’identifiant du jeu de données est une chaîne en lecture seule générée par le système et utilisée pour référencer le jeu de données dans les appels API. L’ID du jeu de données de cible est requis dans les étapes suivantes pour créer une connexion à une cible et un flux de données.

[
    "@/dataSets/5f7187bac6d00f194fb937c0"
]

Création d’une connexion à une cible

Une connexion de cible représente la connexion à la destination où se trouvent les données saisies. Pour créer une connexion de cible, vous devez fournir l’identifiant de spécification de connexion fixe associé au lac de données. Cet identifiant de spécification de connexion est : c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Vous disposez désormais des identifiants uniques d’un schéma de cible d’un jeu de données de cible et de l’ID de spécification de connexion au lac de données. A l’aide de ces identifiants, vous pouvez créer une connexion de cible à l’aide de l’ Flow Service API pour spécifier le jeu de données qui contiendra les données source entrantes.

Format d’API

POST /targetConnections

Requête

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Streaming target connection",
        "description": "Streaming target connection",
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        },
        "data": {
            "format": "parquet_xdm"
        },
        "params": {
        "dataSetId": "5f7187bac6d00f194fb937c0"
        }
    }'
Propriété Description
params.dataSetId ID du jeu de données de cible.
connectionSpec.id ID de spécification de connexion utilisé pour la connexion au lac Data. Cet ID est : c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Réponse

Une réponse réussie renvoie l'identifiant unique (id) de la nouvelle connexion à la cible. Cet identifiant est requis par la suite.

{
    "id": "d9300194-6a82-4163-b001-946a821163b8",
    "etag": "\"4006d3e4-0000-0200-0000-5f7189220000\""
}

Création d’un mappage

Pour que les données source soient assimilées à un jeu de données de cible, elles doivent d’abord être mises en correspondance avec le schéma de cible auquel adhère le jeu de données de cible. Pour ce faire, il effectue une demande de POST au service de conversion avec des mappages de données définis dans la charge utile de la demande.

Format d’API

POST /conversion/mappingSets

Requête

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
        "xdmVersion": "1.0",
        "mappings": [
            {
                "destinationXdmPath": "person.name.firstName",
                "sourceAttribute": "firstName",
                "identity": false,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.lastName",
                "sourceAttribute": "lastName",
                "identity": false,
                "version": 0
            }
        ]
    }'
Propriété Description
xdmSchema Le schéma $id XDM de la cible.

Réponse

Une réponse réussie renvoie les détails du nouveau mappage, y compris son identifiant unique (id). Cet identifiant est nécessaire à une étape ultérieure pour créer un flux de données.

{
    "id": "380b032b445a46008e77585e046efe5e",
    "version": 0,
    "createdDate": 1604960750613,
    "modifiedDate": 1604960750613,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

Rechercher les spécifications des flux de données

Un flux de données est responsable de la collecte de données provenant de sources et de leur intégration dans Platformles sources. Pour créer un flux de données, vous devez d’abord obtenir les spécifications du flux de données en exécutant une demande de GET à l’ Flow Service API. Les spécifications de flux de données sont responsables de la collecte de données à partir d'un connecteur de flux continu.
Format d’API

GET /flowSpecs?property=name=="Steam data with transformation"

Requête

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="Steam data with transformation"' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

Réponse

Une réponse réussie renvoie les détails de la spécification de flux de données responsable de l’introduction de données à partir de votre connecteur de diffusion en continu Platform. Cet identifiant est requis à l’étape suivante pour créer un nouveau flux de données.

{
    "items": [
        {
            "id": "c1a19761-d2c7-4702-b9fa-fe91f0613e81",
            "name": "Steam data with transformation",
            "providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
            "version": "1.0",
            "sourceConnectionSpecIds": [
                "d27d4907-7351-47dd-bbc2-05a04365703d",
                "51ae16c2-bdad-42fd-9fce-8d5dfddaf140",
                "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb"
            ],
            "targetConnectionSpecIds": [
                "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
            ],
            "transformationSpecs": [
                {
                    "name": "Mapping",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "description": "defines various params required for different mapping from Raw to XDM",
                        "properties": {
                            "mappingId": {
                                "type": "string"
                            }
                        },
                        "required": [
                            "mappingId"
                        ]
                    }
                }
            ],
            "attributes": {
                "uiAttributes": {
                    "apiFeatures": {
                        "deleteSupported": false,
                        "updateSupported": false,
                        "flowRunsSupported": false
                    }
                }
            },
            "permissionsInfo": {
                "view": [
                    {
                        "@type": "lowLevel",
                        "name": "StreamingSource",
                        "permissions": [
                            "read"
                        ]
                    }
                ],
                "manage": [
                    {
                        "@type": "lowLevel",
                        "name": "StreamingSource",
                        "permissions": [
                            "write"
                        ]
                    }
                ]
            }
        }
    ]
}

Création d’un flux de données

La dernière étape de la collecte des données en flux continu consiste à créer un flux de données. A l’heure actuelle, les valeurs requises suivantes sont préparées :

Un flux de données est responsable de la planification et de la collecte des données d’une source. Vous pouvez créer un flux de données en exécutant une requête de POST tout en fournissant les valeurs mentionnées précédemment dans la charge utile.

Format d’API

POST /flows

Requête

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Streaming dataflow",
        "description": "Streaming dataflow",
        "flowSpec": {
            "id": "c1a19761-d2c7-4702-b9fa-fe91f0613e81",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "2abd97c4-91bb-4c93-bd97-c491bbfc933d"
        ],
        "targetConnectionIds": [
            "723222e2-6ab9-4b0b-b222-e26ab9bb0bc2"
        ],
        "transformations": [
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "380b032b445a46008e77585e046efe5e",
                    "mappingVersion": 0
                }
            }
        ]
    }'
Propriété Description
flowSpec.id Identifiant de spécification de flux récupéré à l’étape précédente.
sourceConnectionIds ID de connexion source récupéré lors d’une étape précédente.
targetConnectionIds ID de connexion à la cible récupéré lors d’une étape précédente.
transformations.params.mappingId ID de mappage récupéré lors d’une étape précédente.

Réponse

A successful response returns the ID (id) of the newly created dataflow.

{
    "id": "1f086c23-2ea8-4d06-886c-232ea8bd061d",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

Étapes suivantes

En suivant ce didacticiel, vous avez créé un flux de données pour collecter des données en flux continu à partir de votre connecteur de flux continu. Les données entrantes peuvent désormais être utilisées par Platform les services en aval tels que Real-time Customer Profile et Data Science Workspace. Pour plus d’informations, voir les documents suivants :

Sur cette page