Crear un flujo de datos de flujo continuo para los datos sin procesar mediante la API Flow Service
Este tutorial cubre los pasos para recuperar datos sin procesar de un conector de origen de flujo continuo y llevarlos al Experience Platform mediante la Flow Service API.
Introducción
Este tutorial requiere una comprensión práctica de los siguientes componentes de Adobe Experience Platform:
-
Experience Data Model (XDM) System: el marco estandarizado mediante el cual el Experience Platform organiza los datos de experiencia del cliente.
- Aspectos básicos de la composición de esquemas: obtenga información sobre los componentes básicos de los esquemas XDM, incluidos los principios clave y las prácticas recomendadas en la composición de esquemas.
- Guía para desarrolladores de Schema Registry: Incluye información importante que necesita conocer para realizar correctamente llamadas a la API de Schema Registry. Esto incluye su
{TENANT_ID}
, el concepto de "contenedores" y los encabezados necesarios para realizar solicitudes (con especial atención al encabezado Aceptar y sus posibles valores).
-
Catalog Service: el catálogo es el sistema de registro para la ubicación de datos y el linaje dentro del Experience Platform.
-
Streaming ingestion: la transmisión por secuencias de ingesta para Platform proporciona a los usuarios un método para enviar datos desde dispositivos del cliente y del lado del servidor al Experience Platform en tiempo real.
-
Zonas protegidas: El Experience Platform proporciona zonas protegidas virtuales que dividen una sola instancia de Platform en entornos virtuales independientes para ayudar a desarrollar y evolucionar aplicaciones de experiencia digital.
Uso de API de Platform
Para obtener información sobre cómo realizar llamadas correctamente a las API de Platform, consulte la guía sobre introducción a las API de Platform.
Crear una conexión de origen source
Este tutorial también requiere que tenga un ID de conexión de origen válido para un conector de flujo continuo. Si no tiene esta información, consulte los siguientes tutoriales sobre la creación de una conexión de origen de flujo continuo antes de intentar realizar este tutorial:
Creación de un esquema XDM de destino target-schema
Para que los datos de origen se utilicen en Platform, se debe crear un esquema de destino para estructurar los datos de origen según sus necesidades. A continuación, el esquema de destino se utiliza para crear un conjunto de datos de Platform en el que se incluyen los datos de origen. Este esquema XDM de destino también amplía la clase XDM Individual Profile.
Para crear un esquema XDM de destino, realice una solicitud de POST al extremo /schemas
de la Schema Registry API.
Formato de API
POST /tenant/schemas
Solicitud
La siguiente solicitud de ejemplo crea un esquema XDM que amplía la clase XDM Individual Profile.
curl -X POST \
'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"type": "object",
"title": "Sample schema for a streaming connector",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
}
],
"meta:containerId": "tenant",
"meta:resourceType": "schemas",
"meta:xdmType": "object",
"meta:class": "https://ns.adobe.com/xdm/context/profile"
}'
Respuesta
Una respuesta correcta devuelve detalles del esquema recién creado, incluido su identificador único ($id
). Este ID es necesario en pasos posteriores para crear un conjunto de datos, una asignación y un flujo de datos de destino.
{
"$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:altId": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:resourceType": "schemas",
"version": "1.0",
"title": "Sample schema for a streaming connector",
"type": "object",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
"type": "object",
"meta:xdmType": "object"
}
],
"refs": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/context/profile"
],
"imsOrg": "{ORG_ID}",
"meta:extensible": false,
"meta:abstract": false,
"meta:extends": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/common/auditable",
"https://ns.adobe.com/xdm/data/record",
"https://ns.adobe.com/xdm/context/profile"
],
"meta:xdmType": "object",
"meta:registryMetadata": {
"repo:createdDate": 1604960074752,
"repo:lastModifiedDate": 1604960074752,
"xdm:createdClientId": "{CREATED_CLIENT_ID}",
"xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
"xdm:createdUserId": "{CREATED_USER_ID}",
"xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
"eTag": "8522a151effd974429518ed90c3eaf6efc9bf6ffb6644087a85c6d4455dcd045",
"meta:globalLibVersion": "1.16.1"
},
"meta:class": "https://ns.adobe.com/xdm/context/profile",
"meta:containerId": "tenant",
"meta:sandboxId": "{SANDBOX_ID}",
"meta:sandboxType": "production",
"meta:tenantNamespace": "_{TENANT_ID}"
}
Crear un conjunto de datos de destinatario
Con un esquema XDM de destino creado y su único $id
, ahora puede crear un conjunto de datos de destino para contener los datos de origen. Para crear un conjunto de datos de destino, realice una solicitud de POST al extremo dataSets
de la API de servicio de catálogo, al tiempo que proporciona el ID del esquema de destino dentro de la carga útil.
Formato de API
POST /catalog/dataSets
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Test streaming dataset",
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"contentType": "application/vnd.adobe.xed-full-notext+json; version=1"
},
"tags": {
"identity": [
"enabled:true"
],
"profile": [
"enabled:true"
]
}
}'
name
schemaRef.id
$id
para el esquema XDM en el que se basará el conjunto de datos.schemaRef.contentType
application/vnd.adobe.xed-full-notext+json;version=1
, que devuelve la última versión secundaria del esquema. Consulte la sección sobre versiones de esquema en la guía de API de XDM para obtener más información.Respuesta
Una respuesta correcta devuelve una matriz que contiene el ID del conjunto de datos recién creado con el formato "@/datasets/{DATASET_ID}"
. El ID del conjunto de datos es una cadena generada por el sistema de solo lectura que se utiliza para hacer referencia al conjunto de datos en las llamadas a la API. El ID del conjunto de datos de destino es necesario en pasos posteriores para crear una conexión de destino y un flujo de datos.
[
"@/dataSets/5f7187bac6d00f194fb937c0"
]
Creación de una conexión de destino target-connection
Las conexiones de Target crean y administran una conexión de destino a Platform o a cualquier ubicación donde aterrizarán los datos transferidos. Las conexiones de destino contienen información sobre el destino de datos, el formato de datos y el ID de conexión de destino necesarios para crear un flujo de datos. Las instancias de conexión de destino son específicas de un inquilino y una organización.
Para crear una conexión de destino, realice una solicitud de POST al extremo /targetConnections
de la API Flow Service. Como parte de la solicitud, debe proporcionar el formato de datos, el dataSetId
recuperado en el paso anterior y el identificador de especificación de conexión fija vinculado a Data Lake. Este identificador es c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Formato de API
POST /targetConnections
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming target connection",
"description": "Streaming target connection",
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
},
"data": {
"format": "parquet_xdm",
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f7187bac6d00f194fb937c0"
}
}'
data.format
params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
.Respuesta
Una respuesta correcta devuelve el identificador único (id
) de la nueva conexión de destino. Este ID es necesario en pasos posteriores.
{
"id": "d9300194-6a82-4163-b001-946a821163b8",
"etag": "\"4006d3e4-0000-0200-0000-5f7189220000\""
}
Creación de una asignación mapping
Para que los datos de origen se incorporen en un conjunto de datos de destino, primero deben asignarse al esquema de destino al que se adhiere el conjunto de datos de destino.
Para crear un conjunto de asignaciones, realice una solicitud de POST al extremo mappingSets
de la Data Prep API y proporcione el esquema XDM de destino $id
y los detalles de los conjuntos de asignaciones que desee crear.
Formato de API
POST /mappingSets
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"xdmVersion": "1.0",
"mappings": [
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "firstName",
"identity": false,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "lastName",
"identity": false,
"version": 0
}
]
}'
xdmSchema
$id
del esquema XDM de destino.Respuesta
Una respuesta correcta devuelve detalles de la asignación recién creada, incluido su identificador único (id
). Este ID es necesario en un paso posterior para crear un flujo de datos.
{
"id": "380b032b445a46008e77585e046efe5e",
"version": 0,
"createdDate": 1604960750613,
"modifiedDate": 1604960750613,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Recuperación de una lista de especificaciones de flujo de datos specs
Un flujo de datos es responsable de recopilar datos de fuentes y llevarlos a Platform. GET Para crear un flujo de datos, primero debe obtener las especificaciones del flujo de datos realizando una solicitud a la API Flow Service.
Formato de API
GET /flowSpecs
Solicitud
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Respuesta
Una respuesta correcta devuelve una lista de especificaciones de flujo de datos. El id. de especificación de flujo de datos que debe recuperar para crear un flujo de datos con cualquiera de Amazon Kinesis, Azure Event Hubs o Google PubSub es d69717ba-71b4-4313-b654-49f9cf126d7a
.
{
"items": [
{
"id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
"name": "Stream data with optional transformation",
"providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
"version": "1.0",
"sourceConnectionSpecIds": [
"bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
"bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
"86043421-563b-46ec-8e6c-e23184711bf6",
"70116022-a743-464a-bbfe-e226a7f8210c"
],
"targetConnectionSpecIds": [
"bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
"c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"db4fe783-ef79-4a12-bda9-32b2b1bc3b2c"
],
"transformationSpecs": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
}
}
}
}
],
"attributes": {
"uiAttributes": {
"apiFeatures": {
"flowRunsSupported": false
}
}
},
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"write"
]
}
]
}
},
]
}
Creación de un flujo de datos
El último paso para recopilar datos de flujo continuo es crear un flujo de datos. Por ahora, tiene preparados los siguientes valores obligatorios:
Un flujo de datos es responsable de programar y recopilar datos de una fuente. Puede crear un flujo de datos realizando una solicitud de POST mientras proporciona los valores mencionados anteriormente dentro de la carga útil.
Formato de API
POST /flows
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming dataflow",
"description": "Streaming dataflow",
"flowSpec": {
"id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
"version": "1.0"
},
"sourceConnectionIds": [
"e96d6135-4b50-446e-922c-6dd66672b6b2"
],
"targetConnectionIds": [
"723222e2-6ab9-4b0b-b222-e26ab9bb0bc2"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "380b032b445a46008e77585e046efe5e",
"mappingVersion": 0
}
}
]
}'
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
Respuesta
Una respuesta correcta devuelve el identificador (id
) del flujo de datos recién creado.
{
"id": "1f086c23-2ea8-4d06-886c-232ea8bd061d",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Datos de Post para la ingesta
Consulte la carga útil de ejemplo a continuación para ver ejemplos de json sin procesar o compatible con XDM que puede enviar para su ingesta.
Los siguientes ejemplos se aplican a todos:
code language-json |
---|
|
code language-json |
---|
|
Pasos siguientes
Al seguir este tutorial, ha creado un flujo de datos para recopilar datos de flujo continuo desde el conector de flujo continuo. Ahora, los servicios de la plataforma descendente como Real-Time Customer Profile y Data Science Workspace pueden usar los datos entrantes. Consulte los siguientes documentos para obtener más información: