Este tutorial trata los pasos para recuperar datos sin procesar de un conector de origen de flujo continuo y llevarlos al Experience Platform mediante el Flow Service API.
Este tutorial requiere tener una comprensión práctica de los siguientes componentes de Adobe Experience Platform:
{TENANT_ID}
, el concepto de "contenedores" y los encabezados requeridos para realizar solicitudes (con especial atención al encabezado Accept y sus posibles valores).Para obtener información sobre cómo realizar llamadas correctamente a las API de Platform, consulte la guía de introducción a las API de Platform.
Este tutorial también requiere que tenga un ID de conexión de origen válido para un conector de flujo continuo. Si no tiene esta información, consulte los siguientes tutoriales sobre la creación de una conexión de origen de flujo continuo antes de intentar este tutorial:
Para que los datos de origen se utilicen en Platform, se debe crear un esquema de destino para estructurar los datos de origen según sus necesidades. A continuación, el esquema de destino se utiliza para crear un conjunto de datos de Platform en el que se contienen los datos de origen. Este esquema XDM de destino también amplía el XDM Individual Profile Clase .
Para crear un esquema XDM de destino, realice una solicitud de POST al /schemas
punto final del Schema Registry API.
Formato de API
POST /tenant/schemas
Solicitud
La siguiente solicitud de ejemplo crea un esquema XDM que amplía el XDM Individual Profile Clase .
curl -X POST \
'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"type": "object",
"title": "Sample schema for a streaming connector",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
}
],
"meta:containerId": "tenant",
"meta:resourceType": "schemas",
"meta:xdmType": "object",
"meta:class": "https://ns.adobe.com/xdm/context/profile"
}'
Respuesta
Una respuesta correcta devuelve detalles del esquema recién creado, incluido su identificador único ($id
). Este ID es necesario en pasos posteriores para crear un conjunto de datos de destinatario, una asignación y un flujo de datos.
{
"$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:altId": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:resourceType": "schemas",
"version": "1.0",
"title": "Sample schema for a streaming connector",
"type": "object",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
"type": "object",
"meta:xdmType": "object"
}
],
"refs": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/context/profile"
],
"imsOrg": "{ORG_ID}",
"meta:extensible": false,
"meta:abstract": false,
"meta:extends": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/common/auditable",
"https://ns.adobe.com/xdm/data/record",
"https://ns.adobe.com/xdm/context/profile"
],
"meta:xdmType": "object",
"meta:registryMetadata": {
"repo:createdDate": 1604960074752,
"repo:lastModifiedDate": 1604960074752,
"xdm:createdClientId": "{CREATED_CLIENT_ID}",
"xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
"xdm:createdUserId": "{CREATED_USER_ID}",
"xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
"eTag": "8522a151effd974429518ed90c3eaf6efc9bf6ffb6644087a85c6d4455dcd045",
"meta:globalLibVersion": "1.16.1"
},
"meta:class": "https://ns.adobe.com/xdm/context/profile",
"meta:containerId": "tenant",
"meta:sandboxId": "{SANDBOX_ID}",
"meta:sandboxType": "production",
"meta:tenantNamespace": "_{TENANT_ID}"
}
Con un esquema XDM de destino creado y su $id
ahora puede crear un conjunto de datos de destino para contener los datos de origen. Para crear un conjunto de datos de destinatario, realice una solicitud de POST al dataSets
punto final del API del servicio de catálogo, mientras proporciona el ID del esquema de destino dentro de la carga útil.
Formato de API
POST /catalog/dataSets
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Test streaming dataset",
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"contentType": "application/vnd.adobe.xed-full-notext+json; version=1"
},
"tags": {
"identity": [
"enabled:true"
],
"profile": [
"enabled:true"
]
}
}'
Propiedad | Descripción |
---|---|
name |
Nombre del conjunto de datos que se va a crear. |
schemaRef.id |
El URI $id para el esquema XDM , el conjunto de datos se basará en . |
schemaRef.contentType |
Versión del esquema. Este valor debe establecerse en application/vnd.adobe.xed-full-notext+json;version=1 , que devuelve la última versión secundaria del esquema. Consulte la sección sobre versiones de esquema en la guía de la API XDM para obtener más información. |
Respuesta
Una respuesta correcta devuelve una matriz que contiene el ID del conjunto de datos recién creado con el formato "@/datasets/{DATASET_ID}"
. El ID del conjunto de datos es una cadena de solo lectura generada por el sistema que se utiliza para hacer referencia al conjunto de datos en las llamadas API. El ID del conjunto de datos de destino es necesario en pasos posteriores para crear una conexión de destino y un flujo de datos.
[
"@/dataSets/5f7187bac6d00f194fb937c0"
]
Las conexiones de destino crean y administran una conexión de destino a Platform o a cualquier ubicación en la que aterricen los datos transferidos. Las conexiones de destino contienen información sobre el destino de los datos, el formato de los datos y el ID de conexión de destino necesario para crear un flujo de datos. Las instancias de conexión de Target son específicas de un inquilino y de una organización de IMS.
Para crear una conexión de destino, realice una solicitud de POST al /targetConnections
punto final del Flow Service API. Como parte de la solicitud, debe proporcionar el formato de datos, la variable dataSetId
recuperado en el paso anterior y el ID de especificación de conexión fija vinculado a Data Lake. Este ID es c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Formato de API
POST /targetConnections
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming target connection",
"description": "Streaming target connection",
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
},
"data": {
"format": "parquet_xdm",
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f7187bac6d00f194fb937c0"
}
}'
Propiedad | Descripción |
---|---|
connectionSpec.id |
El ID de especificación de conexión utilizado para conectarse a la variable Data Lake. Este ID es: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
data.format |
El formato especificado de los datos que está trayendo a Data Lake. |
params.dataSetId |
ID del conjunto de datos de destino recuperado en el paso anterior. |
Respuesta
Una respuesta correcta devuelve el identificador único de la nueva conexión de destino (id
). Este ID se requiere en pasos posteriores.
{
"id": "d9300194-6a82-4163-b001-946a821163b8",
"etag": "\"4006d3e4-0000-0200-0000-5f7189220000\""
}
Para que los datos de origen se introduzcan en un conjunto de datos de destino, primero deben asignarse al esquema de destino al que se adhiera el conjunto de datos de destino.
Para crear un conjunto de asignaciones, realice una solicitud de POST al mappingSets
punto final del Data Prep API al proporcionar el esquema XDM de destino $id
y los detalles de los conjuntos de asignación que desea crear.
Formato de API
POST /mappingSets
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"xdmVersion": "1.0",
"mappings": [
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "firstName",
"identity": false,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "lastName",
"identity": false,
"version": 0
}
]
}'
Propiedad | Descripción |
---|---|
xdmSchema |
La variable $id del esquema XDM de destino. |
Respuesta
Una respuesta correcta devuelve detalles de la asignación recién creada, incluido su identificador único (id
). Este ID es necesario en un paso posterior para crear un flujo de datos.
{
"id": "380b032b445a46008e77585e046efe5e",
"version": 0,
"createdDate": 1604960750613,
"modifiedDate": 1604960750613,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Un flujo de datos es responsable de recopilar datos de las fuentes y traerlos a Platform. Para crear un flujo de datos, primero debe obtener las especificaciones del flujo de datos realizando una solicitud de GET al Flow Service API.
Formato de API
GET /flowSpecs
Solicitud
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Respuesta
Una respuesta correcta devuelve una lista de especificaciones de flujo de datos. El ID de especificación del flujo de datos que debe recuperar para crear un flujo de datos mediante cualquiera de los Amazon Kinesis, Azure Event Hubso Google PubSub, es d69717ba-71b4-4313-b654-49f9cf126d7a
.
{
"items": [
{
"id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
"name": "Stream data with optional transformation",
"providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
"version": "1.0",
"sourceConnectionSpecIds": [
"bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
"bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
"86043421-563b-46ec-8e6c-e23184711bf6",
"70116022-a743-464a-bbfe-e226a7f8210c"
],
"targetConnectionSpecIds": [
"bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
"c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"db4fe783-ef79-4a12-bda9-32b2b1bc3b2c"
],
"transformationSpecs": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
}
}
}
}
],
"attributes": {
"uiAttributes": {
"apiFeatures": {
"flowRunsSupported": false
}
}
},
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"write"
]
}
]
}
},
]
}
El último paso para recopilar datos de flujo continuo es crear un flujo de datos. A partir de ahora, se han preparado los siguientes valores obligatorios:
Un flujo de datos es responsable de programar y recopilar datos de un origen. Puede crear un flujo de datos realizando una solicitud de POST mientras proporciona los valores mencionados anteriormente dentro de la carga útil.
Formato de API
POST /flows
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming dataflow",
"description": "Streaming dataflow",
"flowSpec": {
"id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
"version": "1.0"
},
"sourceConnectionIds": [
"e96d6135-4b50-446e-922c-6dd66672b6b2"
],
"targetConnectionIds": [
"723222e2-6ab9-4b0b-b222-e26ab9bb0bc2"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "380b032b445a46008e77585e046efe5e",
"mappingVersion": 0
}
}
]
}'
Propiedad | Descripción |
---|---|
flowSpec.id |
La variable ID de especificación de flujo recuperado en el paso anterior. |
sourceConnectionIds |
La variable ID de conexión de origen recuperado en un paso anterior. |
targetConnectionIds |
La variable ID de conexión de target recuperado en un paso anterior. |
transformations.params.mappingId |
La variable ID de asignación recuperado en un paso anterior. |
Respuesta
Una respuesta correcta devuelve el ID (id
) del flujo de datos recién creado.
{
"id": "1f086c23-2ea8-4d06-886c-232ea8bd061d",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Al seguir este tutorial, ha creado un flujo de datos para recopilar datos de flujo continuo del conector de flujo continuo. Los datos entrantes ahora se pueden usar en servicios de Platform descendentes como Real-Time Customer Profile y Data Science Workspace. Consulte los siguientes documentos para obtener más información: