Este tutorial explica los pasos para recuperar datos sin procesar de un conector de origen de flujo continuo y llevarlos al Experience Platform mediante Flow Service API.
Este tutorial requiere una comprensión práctica de los siguientes componentes de Adobe Experience Platform:
{TENANT_ID}
, el concepto de "contenedores" y los encabezados necesarios para realizar solicitudes (con especial atención al encabezado Aceptar y sus posibles valores).Para obtener información sobre cómo realizar llamadas correctamente a las API de Platform, consulte la guía de introducción a las API de Platform.
Este tutorial también requiere que tenga un ID de conexión de origen válido para un conector de flujo continuo. Si no tiene esta información, consulte los siguientes tutoriales sobre la creación de una conexión de origen de flujo continuo antes de intentar realizar este tutorial:
Para que los datos de origen se utilicen en Platform, se debe crear un esquema de destino para estructurar los datos de origen según sus necesidades. A continuación, el esquema de destino se utiliza para crear un conjunto de datos de Platform en el que se incluyen los datos de origen. Este esquema XDM de destino también amplía el XDM Individual Profile clase.
Para crear un esquema XDM de destino, realice una solicitud de POST al /schemas
punto final del Schema Registry API.
Formato de API
POST /tenant/schemas
Solicitud
La siguiente solicitud de ejemplo crea un esquema XDM que amplía el XDM Individual Profile clase.
curl -X POST \
'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"type": "object",
"title": "Sample schema for a streaming connector",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
}
],
"meta:containerId": "tenant",
"meta:resourceType": "schemas",
"meta:xdmType": "object",
"meta:class": "https://ns.adobe.com/xdm/context/profile"
}'
Respuesta
Una respuesta correcta devuelve detalles del esquema recién creado, incluido su identificador único ($id
). Este ID es necesario en pasos posteriores para crear un conjunto de datos, una asignación y un flujo de datos de destino.
{
"$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:altId": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:resourceType": "schemas",
"version": "1.0",
"title": "Sample schema for a streaming connector",
"type": "object",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
"type": "object",
"meta:xdmType": "object"
}
],
"refs": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/context/profile"
],
"imsOrg": "{ORG_ID}",
"meta:extensible": false,
"meta:abstract": false,
"meta:extends": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/common/auditable",
"https://ns.adobe.com/xdm/data/record",
"https://ns.adobe.com/xdm/context/profile"
],
"meta:xdmType": "object",
"meta:registryMetadata": {
"repo:createdDate": 1604960074752,
"repo:lastModifiedDate": 1604960074752,
"xdm:createdClientId": "{CREATED_CLIENT_ID}",
"xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
"xdm:createdUserId": "{CREATED_USER_ID}",
"xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
"eTag": "8522a151effd974429518ed90c3eaf6efc9bf6ffb6644087a85c6d4455dcd045",
"meta:globalLibVersion": "1.16.1"
},
"meta:class": "https://ns.adobe.com/xdm/context/profile",
"meta:containerId": "tenant",
"meta:sandboxId": "{SANDBOX_ID}",
"meta:sandboxType": "production",
"meta:tenantNamespace": "_{TENANT_ID}"
}
Con un esquema XDM de destino creado y su exclusivo $id
ahora puede crear un conjunto de datos de destino para contener los datos de origen. Para crear un conjunto de datos de destinatario, realice una solicitud de POST al dataSets
punto final del API del servicio de catálogo, al tiempo que proporciona el ID del esquema de destino en la carga útil.
Formato de API
POST /catalog/dataSets
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Test streaming dataset",
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"contentType": "application/vnd.adobe.xed-full-notext+json; version=1"
},
"tags": {
"identity": [
"enabled:true"
],
"profile": [
"enabled:true"
]
}
}'
Propiedad | Descripción |
---|---|
name |
Nombre del conjunto de datos que se va a crear. |
schemaRef.id |
El URI $id para el esquema XDM en el que se basará el conjunto de datos. |
schemaRef.contentType |
La versión del esquema. Este valor debe establecerse en application/vnd.adobe.xed-full-notext+json;version=1 , que devuelve la última versión secundaria del esquema. Consulte la sección sobre versiones de esquema en la Guía de API de XDM para obtener más información. |
Respuesta
Una respuesta correcta devuelve una matriz que contiene el ID del conjunto de datos recién creado con el formato "@/datasets/{DATASET_ID}"
. El ID del conjunto de datos es una cadena generada por el sistema de solo lectura que se utiliza para hacer referencia al conjunto de datos en las llamadas a la API. El ID del conjunto de datos de destino es necesario en pasos posteriores para crear una conexión de destino y un flujo de datos.
[
"@/dataSets/5f7187bac6d00f194fb937c0"
]
Las conexiones de Target crean y administran una conexión de destino a Platform o a cualquier ubicación donde aterrizarán los datos transferidos. Las conexiones de destino contienen información sobre el destino de datos, el formato de datos y el ID de conexión de destino necesarios para crear un flujo de datos. Las instancias de conexión de destino son específicas de un inquilino y una organización.
Para crear una conexión de destino, realice una solicitud de POST al /targetConnections
punto final del Flow Service API. Como parte de la solicitud, debe proporcionar el formato de datos, el dataSetId
recuperado en el paso anterior y el ID de especificación de conexión fija vinculado a Data Lake. Este ID es c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Formato de API
POST /targetConnections
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming target connection",
"description": "Streaming target connection",
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
},
"data": {
"format": "parquet_xdm",
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f7187bac6d00f194fb937c0"
}
}'
Propiedad | Descripción |
---|---|
data.format |
El formato especificado de los datos que está trayendo al lago de datos. |
params.dataSetId |
El ID del conjunto de datos de destinatario generado en el paso anterior. Nota: Debe proporcionar un ID de conjunto de datos válido al crear una conexión de destino. Si la ID del conjunto de datos no es válida, se producirá un error. |
connectionSpec.id |
ID de especificación de conexión utilizado para conectarse al lago de datos. Este ID es: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
Respuesta
Una respuesta correcta devuelve el identificador único ( ) de la nueva conexión de destinoid
). Este ID es necesario en pasos posteriores.
{
"id": "d9300194-6a82-4163-b001-946a821163b8",
"etag": "\"4006d3e4-0000-0200-0000-5f7189220000\""
}
Para que los datos de origen se incorporen en un conjunto de datos de destino, primero deben asignarse al esquema de destino al que se adhiere el conjunto de datos de destino.
Para crear un conjunto de asignaciones, realice una solicitud de POST al mappingSets
punto final del Data Prep API al proporcionar el esquema XDM de destino $id
y los detalles de los conjuntos de asignaciones que desee crear.
Formato de API
POST /mappingSets
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"xdmVersion": "1.0",
"mappings": [
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "firstName",
"identity": false,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "lastName",
"identity": false,
"version": 0
}
]
}'
Propiedad | Descripción |
---|---|
xdmSchema |
El $id del esquema XDM de destino. |
Respuesta
Una respuesta correcta devuelve detalles de la asignación recién creada, incluido su identificador único (id
). Este ID es necesario en un paso posterior para crear un flujo de datos.
{
"id": "380b032b445a46008e77585e046efe5e",
"version": 0,
"createdDate": 1604960750613,
"modifiedDate": 1604960750613,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Un flujo de datos es responsable de recopilar datos de fuentes y llevarlos a Platform. GET Para crear un flujo de datos, primero debe obtener las especificaciones del flujo de datos realizando una solicitud al Flow Service API.
Formato de API
GET /flowSpecs
Solicitud
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Respuesta
Una respuesta correcta devuelve una lista de especificaciones de flujo de datos. El ID de especificación del flujo de datos que debe recuperar para crear un flujo de datos mediante cualquiera de los siguientes Amazon Kinesis, Azure Event Hubs, o Google PubSub, es d69717ba-71b4-4313-b654-49f9cf126d7a
.
{
"items": [
{
"id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
"name": "Stream data with optional transformation",
"providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
"version": "1.0",
"sourceConnectionSpecIds": [
"bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
"bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
"86043421-563b-46ec-8e6c-e23184711bf6",
"70116022-a743-464a-bbfe-e226a7f8210c"
],
"targetConnectionSpecIds": [
"bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
"c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"db4fe783-ef79-4a12-bda9-32b2b1bc3b2c"
],
"transformationSpecs": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
}
}
}
}
],
"attributes": {
"uiAttributes": {
"apiFeatures": {
"flowRunsSupported": false
}
}
},
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"write"
]
}
]
}
},
]
}
El último paso para recopilar datos de flujo continuo es crear un flujo de datos. Por ahora, tiene preparados los siguientes valores obligatorios:
Un flujo de datos es responsable de programar y recopilar datos de una fuente. Puede crear un flujo de datos realizando una solicitud de POST mientras proporciona los valores mencionados anteriormente dentro de la carga útil.
Formato de API
POST /flows
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming dataflow",
"description": "Streaming dataflow",
"flowSpec": {
"id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
"version": "1.0"
},
"sourceConnectionIds": [
"e96d6135-4b50-446e-922c-6dd66672b6b2"
],
"targetConnectionIds": [
"723222e2-6ab9-4b0b-b222-e26ab9bb0bc2"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "380b032b445a46008e77585e046efe5e",
"mappingVersion": 0
}
}
]
}'
Propiedad | Descripción |
---|---|
flowSpec.id |
El ID de especificación de flujo recuperado en el paso anterior. |
sourceConnectionIds |
El ID de conexión de origen recuperado en un paso anterior. |
targetConnectionIds |
El ID de conexión de destino recuperado en un paso anterior. |
transformations.params.mappingId |
El ID de asignación recuperado en un paso anterior. |
Respuesta
Una respuesta correcta devuelve el ID (id
) del flujo de datos recién creado.
{
"id": "1f086c23-2ea8-4d06-886c-232ea8bd061d",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Al seguir este tutorial, ha creado un flujo de datos para recopilar datos de flujo continuo desde el conector de flujo continuo. Ahora, los servicios de Platform posteriores pueden utilizar los datos entrantes, como Real-Time Customer Profile y Data Science Workspace. Consulte los siguientes documentos para obtener más información: