Este tutorial trata los pasos para recuperar datos de un origen de almacenamiento en la nube y llevarlos a Platform mediante Flow Service API.
Para crear un flujo de datos, ya debe tener un ID de conexión base válido con un origen de almacenamiento en la nube. Si no tiene este ID, consulte la información general sobre fuentes para obtener una lista de los orígenes de almacenamiento en la nube con los que puede crear una conexión base.
Este tutorial requiere tener una comprensión práctica de los siguientes componentes de Adobe Experience Platform:
{TENANT_ID}
, el concepto de "contenedores" y los encabezados requeridos para realizar solicitudes (con especial atención al encabezado Accept y sus posibles valores).Para obtener información sobre cómo realizar llamadas correctamente a las API de Platform, consulte la guía de introducción a las API de Platform.
Puede crear una conexión de origen realizando una solicitud de POST al sourceConnections
punto final de Flow Service al proporcionar su ID de conexión base, la ruta al archivo de origen que desea introducir y el ID de especificación de conexión correspondiente de la fuente.
Al crear una conexión de origen, también debe definir un valor de enumeración para el atributo de formato de datos.
Utilice los siguientes valores de enumeración para orígenes basados en archivos:
Formato de datos | Valor de enumeración |
---|---|
Delimitado | delimited |
JSON | json |
Parqué | parquet |
Para todos los orígenes basados en tablas, establezca el valor en tabular
.
Formato de API
POST /sourceConnections
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}"
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Propiedad | Descripción |
---|---|
baseConnectionId |
El ID de conexión base del origen de almacenamiento en la nube. |
data.format |
El formato de los datos que desea traer a Platform. Los valores admitidos son: delimited , JSON y parquet . |
data.properties |
(Opcional) Conjunto de propiedades que puede aplicar a los datos al crear una conexión de origen. |
data.properties.columnDelimiter |
(Opcional) Un delimitador de columna de un solo carácter que puede especificar al recopilar archivos planos. Cualquier valor de carácter único es un delimitador de columna permitido. Si no se proporciona, una coma (, ) se utiliza como valor predeterminado. Nota: La variable columnDelimiter solo se puede utilizar al ingerir archivos delimitados. |
data.properties.encoding |
(Opcional) Una propiedad que define el tipo de codificación que se utilizará al ingerir los datos en Platform. Los tipos de codificación admitidos son: UTF-8 y ISO-8859-1 . Nota: La variable encoding solo está disponible al introducir archivos CSV delimitados. Otros tipos de archivo se incorporarán con la codificación predeterminada, UTF-8 . |
data.properties.compressionType |
(Opcional) Una propiedad que define el tipo de archivo comprimido que se va a introducir. Los tipos de archivo comprimido admitidos son: bzip2 , gzip , deflate , zipDeflate , tarGzip y tar . Nota: La variable compressionType solo se puede utilizar al introducir archivos delimitados o JSON. |
params.path |
Ruta del archivo de origen al que está accediendo. Este parámetro apunta a un archivo individual o a una carpeta entera. Nota: Puede utilizar un asterisco en lugar del nombre del archivo para especificar la ingesta de una carpeta entera. Por ejemplo: /acme/summerCampaign/*.csv ingerirá todo /acme/summerCampaign/ carpeta. |
params.type |
Tipo de archivo del archivo de datos de origen que está incorporando. Tipo de uso file para ingerir un archivo individual y utilizar el tipo folder para introducir una carpeta completa. |
connectionSpec.id |
El ID de especificación de conexión asociado al origen de almacenamiento en la nube específico. Consulte la apéndice para obtener una lista de ID de especificaciones de conexión. |
Respuesta
Una respuesta correcta devuelve el identificador único (id
) de la conexión de origen recién creada. Este ID es necesario en un paso posterior para crear un flujo de datos.
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
Para que los datos de origen se utilicen en Platform, se debe crear un esquema de destino para estructurar los datos de origen según sus necesidades. A continuación, el esquema de destino se utiliza para crear un conjunto de datos de Platform en el que se contienen los datos de origen.
Se puede crear un esquema XDM de destino realizando una solicitud de POST al API del Registro de esquemas.
Para ver los pasos detallados sobre cómo crear un esquema XDM de destino, consulte el tutorial sobre creación de un esquema con la API.
Se puede crear un conjunto de datos de destino realizando una solicitud de POST al API del servicio de catálogo, que proporciona el ID del esquema de destino dentro de la carga útil.
Para ver los pasos detallados sobre cómo crear un conjunto de datos de destinatario, consulte el tutorial en creación de un conjunto de datos mediante la API.
Una conexión de destino representa la conexión con el destino en el que llegan los datos introducidos. Para crear una conexión de destino, debe proporcionar el ID de especificación de conexión fija asociado al lago de datos. Este ID de especificación de conexión es: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Ahora tiene los identificadores únicos, un esquema de destino, un conjunto de datos de destino y el ID de especificación de conexión al lago de datos. Con estos identificadores, puede crear una conexión de destino utilizando la variable Flow Service API para especificar el conjunto de datos que contendrá los datos de origen entrantes.
Formato de API
POST /targetConnections
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
Propiedad | Descripción |
---|---|
data.schema.id |
La variable $id del esquema XDM de destino. |
data.schema.version |
Versión del esquema. Este valor debe establecerse application/vnd.adobe.xed-full+json;version=1 , que devuelve la última versión secundaria del esquema. |
params.dataSetId |
El ID del conjunto de datos de destino. |
connectionSpec.id |
El ID de especificación de conexión fija al lago de datos. Este ID es: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
Respuesta
Una respuesta correcta devuelve el identificador único de la nueva conexión de destino (id
). Este ID se requiere en pasos posteriores.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Para que los datos de origen se introduzcan en un conjunto de datos de destino, primero deben asignarse al esquema de destino al que se adhiera el conjunto de datos de destino.
Para crear un conjunto de asignaciones, realice una solicitud de POST al mappingSets
punto final del Data Prep API al proporcionar el esquema XDM de destino $id
y los detalles de los conjuntos de asignación que desea crear.
Puede asignar tipos de datos complejos, como matrices en archivos JSON, mediante un conector de origen de almacenamiento en la nube.
Formato de API
POST /conversion/mappingSets
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
Propiedad | Descripción |
---|---|
xdmSchema |
El ID del esquema XDM de destino. |
Respuesta
Una respuesta correcta devuelve detalles de la asignación recién creada, incluido su identificador único (id
). Este valor es necesario en un paso posterior para crear un flujo de datos.
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Un flujo de datos es responsable de recopilar datos de las fuentes y traerlos a Platform. Para crear un flujo de datos, primero debe obtener las especificaciones de flujo de datos responsables de recopilar datos de almacenamiento en la nube.
Formato de API
GET /flowSpecs?property=name=="CloudStorageToAEP"
Solicitud
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Respuesta
Una respuesta correcta devuelve los detalles de la especificación de flujo de datos responsable de traer los datos de su origen a Platform. La respuesta incluye la especificación de flujo único id
necesario para crear un nuevo flujo de datos.
{
"items": [
{
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"name": "CloudStorageToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"sourceConnectionSpecIds": [
"b3ba5556-48be-44b7-8b85-ff2b69b46dc4",
"ecadc60c-7455-4d87-84dc-2a0e293d997b",
"b7829c2f-2eb0-4f49-a6ee-55e33008b629",
"4c10e202-c428-4796-9208-5f1f5732b1cf",
"fb2e94c9-c031-467d-8103-6bd6e0a432f2",
"32e8f412-cdf7-464c-9885-78184cb113fd",
"b7bf2577-4520-42c9-bae9-cad01560f7bc",
"998b8ae3-cec0-43b7-8abe-40b1eb4ee069",
"be5ec48c-5b78-49d5-b8fa-7c89ec4569b8"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"transformationSpecs": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"endTime": {
"description": "epoch time",
"type": "integer"
},
"interval": {
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"minute",
"hour",
"day",
"week"
]
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency",
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
},
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
}
}
]
}
El último paso para recopilar datos de almacenamiento en la nube es crear un flujo de datos. A partir de ahora, se han preparado los siguientes valores obligatorios:
Un flujo de datos es responsable de programar y recopilar datos de un origen. Puede crear un flujo de datos realizando una solicitud de POST mientras proporciona los valores mencionados anteriormente dentro de la carga útil.
Para la ingesta por lotes, cada flujo de datos resultante selecciona los archivos que se van a ingerir de su origen en función de sus última modificación marca de tiempo. Esto significa que los flujos de datos por lotes seleccionan los archivos del origen que son nuevos o que se han modificado desde la última ejecución del flujo de datos.
Para programar una ingesta, primero debe establecer el valor de la hora de inicio en hora de inicio en segundos. A continuación, debe establecer el valor de frecuencia en una de las cinco opciones: once
, minute
, hour
, day
o week
. El valor de intervalo designa el periodo entre dos ingestas consecutivas y la creación de una ingesta única no requiere que se establezca un intervalo. Para las demás frecuencias, el valor del intervalo debe establecerse en igual o bueno que 15
.
Se recomienda programar el flujo de datos para una ingesta única al usar la variable Conector FTP.
Formato de API
POST /flows
Solicitud
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Platform",
"description": "Cloud Storage flow to Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
Propiedad | Descripción |
---|---|
flowSpec.id |
La variable ID de especificación de flujo recuperado en el paso anterior. |
sourceConnectionIds |
La variable ID de conexión de origen recuperado en un paso anterior. |
targetConnectionIds |
La variable ID de conexión de target recuperado en un paso anterior. |
transformations.params.mappingId |
La variable ID de asignación recuperado en un paso anterior. |
scheduleParams.startTime |
Hora de inicio del flujo de datos en tiempo de época. |
scheduleParams.frequency |
Frecuencia con la que el flujo de datos recopilará datos. Los valores aceptables incluyen: once , minute , hour , day o week . |
scheduleParams.interval |
El intervalo designa el periodo entre dos ejecuciones de flujo consecutivas. El valor del intervalo debe ser un número entero distinto de cero. El intervalo no es necesario cuando la frecuencia está establecida como once y debe ser bueno o igual que 15 para otros valores de frecuencia. |
Respuesta
Una respuesta correcta devuelve el ID (id
) del flujo de datos recién creado.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Una vez creado el flujo de datos, puede supervisar los datos que se están incorporando a través de él para ver información sobre ejecuciones de flujo, estado de finalización y errores. Para obtener más información sobre cómo monitorizar los flujos de datos, consulte el tutorial en monitorización de flujos de datos en la API
Siguiendo este tutorial, ha creado un conector de origen para recopilar datos del almacenamiento en la nube de forma programada. Los datos entrantes ahora se pueden usar en servicios de Platform descendentes como Real-time Customer Profile y Data Science Workspace. Consulte los siguientes documentos para obtener más información:
La siguiente sección enumera los diferentes conectores de origen de almacenamiento en la nube y sus especificaciones de conexiones.
Nombre del conector | Especificación de conexión |
---|---|
Amazon S3 (S3) | ecadc60c-7455-4d87-84dc-2a0e293d997b |
Amazon Kinesis (Kinesis) | 86043421-563b-46ec-8e6c-e23184711bf6 |
Azure Blob (Blob) | 4c10e202-c428-4796-9208-5f1f5732b1cf |
Azure Data Lake Storage Gen2 (ADLS Gen2) | b3ba5556-48be-44b7-8b85-ff2b69b46dc4 |
Azure Event Hubs (Centros de eventos) | bf9f5905-92b7-48bf-bf20-455bc6b60a4e |
Azure File Storage | be5ec48c-5b78-49d5-b8fa-7c89ec4569b8 |
Google Cloud Storage | 32e8f412-cdf7-464c-9885-78184cb113fd |
HDFS | 54e221aa-d342-4707-bcff-7a4bceef0001 |
Oracle Object Storage | c85f9425-fb21-426c-ad0b-405e9bd8a46c |
SFTP | bf367b0d-3d9b-4060-b67b-0d3d9bd06094 |