Crear un flujo de datos para orígenes de almacenamiento en la nube mediante la API Flow Service
Este tutorial trata los pasos para recuperar datos de una fuente de almacenamiento en la nube y llevarlos a Platform mediante Flow Service API.
Introducción
Este tutorial requiere una comprensión práctica de los siguientes componentes de Adobe Experience Platform:
-
Experience Data Model (XDM) System: el marco estandarizado mediante el cual el Experience Platform organiza los datos de experiencia del cliente.
- Aspectos básicos de la composición de esquemas: obtenga información sobre los componentes básicos de los esquemas XDM, incluidos los principios clave y las prácticas recomendadas en la composición de esquemas.
- Guía para desarrolladores de Schema Registry: Incluye información importante que necesita conocer para realizar correctamente llamadas a la API de Schema Registry. Esto incluye su
{TENANT_ID}
, el concepto de "contenedores" y los encabezados necesarios para realizar solicitudes (con especial atención al encabezado Aceptar y sus posibles valores).
-
Catalog Service: el catálogo es el sistema de registro para la ubicación de datos y el linaje dentro del Experience Platform.
-
Batch ingestion: la API de ingesta por lotes le permite introducir datos en Experience Platform como archivos por lotes.
-
Zonas protegidas: El Experience Platform proporciona zonas protegidas virtuales que dividen una sola instancia de Platform en entornos virtuales independientes para ayudar a desarrollar y evolucionar aplicaciones de experiencia digital.
Uso de API de Platform
Para obtener información sobre cómo realizar llamadas correctamente a las API de Platform, consulte la guía sobre introducción a las API de Platform.
Crear una conexión de origen source
Puede crear una conexión de origen realizando una solicitud de POST al extremo sourceConnections
de la API Flow Service, al mismo tiempo que proporciona el identificador de conexión base, la ruta de acceso al archivo de origen que desea introducir y el identificador de especificación de conexión correspondiente del origen.
Al crear una conexión de origen, también debe definir un valor de enumeración para el atributo de formato de datos.
Utilice los siguientes valores de enumeración para orígenes basados en archivos:
delimited
json
parquet
Para todos los orígenes basados en tablas, establezca el valor en tabular
.
Formato de API
POST /sourceConnections
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}",
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
baseConnectionId
data.format
delimited
, JSON
y parquet
.data.properties
data.properties.columnDelimiter
,
) como valor predeterminado. Nota: la propiedad columnDelimiter
solo se puede usar al ingerir archivos delimitados.data.properties.encoding
UTF-8
y ISO-8859-1
. Nota: el parámetro encoding
solo está disponible cuando se ingieren archivos CSV delimitados. Se incorporarán otros tipos de archivo con la codificación predeterminada, UTF-8
.data.properties.compressionType
bzip2
, gzip
, deflate
, zipDeflate
, tarGzip
y tar
. Nota: la propiedad compressionType
solo se puede usar al ingerir archivos delimitados o JSON.params.path
/acme/summerCampaign/*.csv
ingerirá toda la carpeta /acme/summerCampaign/
.params.type
file
para ingerir un archivo individual y el tipo folder
para ingerir una carpeta completa.connectionSpec.id
Respuesta
Una respuesta correcta devuelve el identificador único (id
) de la conexión de origen recién creada. Este ID es necesario en un paso posterior para crear un flujo de datos.
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
Utilice expresiones regulares para seleccionar un conjunto específico de archivos para la ingesta regex
Puede utilizar expresiones regulares para introducir un conjunto concreto de archivos del origen a Platform al crear una conexión de origen.
Formato de API
POST /sourceConnections
Solicitud
En el ejemplo siguiente, se utiliza una expresión regular en la ruta de acceso del archivo para especificar la ingesta de todos los archivos CSV que tienen premium
en su nombre.
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/*premium*.csv",
"type": "folder"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Configurar una conexión de origen para introducir datos de forma recursiva
Al crear una conexión de origen, puede utilizar el parámetro recursive
para introducir datos de carpetas anidadas profundamente.
Formato de API
POST /sourceConnections
Solicitud
En el ejemplo siguiente, el parámetro recursive: true
indica a Flow Service que lea todas las subcarpetas de forma recursiva durante el proceso de ingesta.
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source with recursive ingestion",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/customers/premium/buyers/recursive",
"type": "folder",
"recursive": true
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Creación de un esquema XDM de destino target-schema
Para que los datos de origen se utilicen en Platform, se debe crear un esquema de destino para estructurar los datos de origen según sus necesidades. A continuación, el esquema de destino se utiliza para crear un conjunto de datos de Platform en el que se incluyen los datos de origen.
Se puede crear un esquema XDM de destino realizando una solicitud de POST a la API de Registro de esquemas.
Para ver los pasos detallados sobre cómo crear un esquema XDM de destino, consulte el tutorial de creación de un esquema mediante la API.
Crear un conjunto de datos de destinatario target-dataset
Se puede crear un conjunto de datos de destino realizando una solicitud de POST a la API de servicio de catálogo, que proporcione el ID del esquema de destino en la carga útil.
Para ver los pasos detallados sobre cómo crear un conjunto de datos de destino, consulte el tutorial de creación de un conjunto de datos mediante la API.
Creación de una conexión de destino target-connection
Una conexión de destino representa la conexión con el destino donde aterrizan los datos introducidos. Para crear una conexión de destino, debe proporcionar el ID de especificación de conexión fija asociado al lago de datos. Este id. de especificación de conexión es: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Ahora tiene los identificadores únicos de un esquema de destino, un conjunto de datos de destino y el ID de especificación de conexión al lago de datos. Con estos identificadores, puede crear una conexión de destino utilizando la API Flow Service para especificar el conjunto de datos que contendrá los datos de origen entrantes.
Formato de API
POST /targetConnections
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
del esquema XDM de destino.data.schema.version
application/vnd.adobe.xed-full+json;version=1
, que devuelve la última versión secundaria del esquema.params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
.Respuesta
Una respuesta correcta devuelve el identificador único (id
) de la nueva conexión de destino. Este ID es necesario en pasos posteriores.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Creación de una asignación mapping
Para que los datos de origen se incorporen en un conjunto de datos de destino, primero deben asignarse al esquema de destino al que se adhiere el conjunto de datos de destino.
Para crear un conjunto de asignaciones, realice una solicitud de POST al extremo mappingSets
de la Data Prep API y proporcione el esquema XDM de destino $id
y los detalles de los conjuntos de asignaciones que desee crear.
Formato de API
POST /conversion/mappingSets
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
xdmSchema
Respuesta
Una respuesta correcta devuelve detalles de la asignación recién creada, incluido su identificador único (id
). Este valor es necesario en un paso posterior para crear un flujo de datos.
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Recuperar especificaciones de flujo de datos specs
Un flujo de datos es responsable de recopilar datos de fuentes y llevarlos a Platform. Para crear un flujo de datos, primero debe obtener las especificaciones del flujo de datos responsables de recopilar datos de almacenamiento en la nube.
Formato de API
GET /flowSpecs?property=name=="CloudStorageToAEP"
Solicitud
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Respuesta
Una respuesta correcta devuelve los detalles de la especificación de flujo de datos responsable de traer datos de su origen a Platform. La respuesta incluye la especificación de flujo única id
necesaria para crear un nuevo flujo de datos.
code language-json |
---|
|
Creación de un flujo de datos
El último paso para recopilar datos de almacenamiento en la nube es crear un flujo de datos. Por ahora, tiene preparados los siguientes valores obligatorios:
Un flujo de datos es responsable de programar y recopilar datos de una fuente. Puede crear un flujo de datos realizando una solicitud de POST mientras proporciona los valores mencionados anteriormente dentro de la carga útil.
Para programar una ingesta, primero debe establecer el valor de la hora de inicio en un tiempo récord en segundos. A continuación, debe establecer el valor de frecuencia en una de las cinco opciones: once
, minute
, hour
, day
o week
. El valor de intervalo designa el periodo entre dos ingestas consecutivas y la creación de una ingesta única no requiere que se establezca un intervalo. Para todas las demás frecuencias, el valor del intervalo debe establecerse en igual o mayor que 15
.
Formato de API
POST /flows
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Platform",
"description": "Cloud Storage flow to Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
once
, minute
, hour
, day
o week
.scheduleParams.interval
El intervalo designa el período entre dos ejecuciones de flujo consecutivas. El valor del intervalo debe ser un entero distinto de cero. El valor mínimo del intervalo aceptado para cada frecuencia es el siguiente:
- Una vez: n/a
- Minuto: 15
- Hora: 1
- Día: 1
- Semana: 1
Respuesta
Una respuesta correcta devuelve el identificador (id
) del flujo de datos recién creado.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Monitorización del flujo de datos
Una vez creado el flujo de datos, puede monitorizar los datos que se están introduciendo a través de él para ver información sobre las ejecuciones de flujo, el estado de finalización y los errores. Para obtener más información sobre cómo supervisar flujos de datos, consulte el tutorial sobre supervisión de flujos de datos en la API
Pasos siguientes
Al seguir este tutorial, ha creado un conector de origen para recopilar datos del almacenamiento en la nube de forma programada. Ahora, los servicios de la plataforma descendente como Real-Time Customer Profile y Data Science Workspace pueden usar los datos entrantes. Consulte los siguientes documentos para obtener más información:
Apéndice appendix
En la siguiente sección se enumeran los diferentes conectores de origen de almacenamiento en la nube y sus especificaciones de conexiones.
Especificación de conexión
ecadc60c-7455-4d87-84dc-2a0e293d997b
86043421-563b-46ec-8e6c-e23184711bf6
4c10e202-c428-4796-9208-5f1f5732b1cf
b3ba5556-48be-44b7-8b85-ff2b69b46dc4
bf9f5905-92b7-48bf-bf20-455bc6b60a4e
be5ec48c-5b78-49d5-b8fa-7c89ec4569b8
32e8f412-cdf7-464c-9885-78184cb113fd
54e221aa-d342-4707-bcff-7a4bceef0001
c85f9425-fb21-426c-ad0b-405e9bd8a46c
bf367b0d-3d9b-4060-b67b-0d3d9bd06094