Crear un flujo de datos para orígenes de base de datos utilizando la variable Flow Service API

Este tutorial trata los pasos para recuperar datos de un origen de base de datos y llevarlos a Platform mediante Flow Service API.

NOTA

Para crear un flujo de datos, ya debe tener un ID de conexión base válido con un origen de base de datos. Si no tiene este ID, consulte la información general sobre fuentes para obtener una lista de orígenes de base de datos con los que puede crear una conexión base.

Primeros pasos

Este tutorial requiere tener una comprensión práctica de los siguientes componentes de Adobe Experience Platform:

  • Experience Data Model (XDM) System: El marco estandarizado mediante el cual el Experience Platform organiza los datos de experiencia del cliente.
    • Aspectos básicos de la composición del esquema: Obtenga información sobre los componentes básicos de los esquemas XDM, incluidos los principios clave y las prácticas recomendadas en la composición de esquemas.
    • Guía para desarrolladores de Schema Registry: Incluye información importante que debe conocer para realizar correctamente llamadas a la API del Registro de esquemas. Esto incluye el {TENANT_ID}, el concepto de "contenedores" y los encabezados requeridos para realizar solicitudes (con especial atención al encabezado Accept y sus posibles valores).
  • Catalog Service: Catálogo es el sistema de registro para la ubicación y linaje de los datos dentro del Experience Platform.
  • Batch ingestion: La API de ingesta de lotes permite introducir datos en el Experience Platform como archivos por lotes.
  • Sandboxes: Experience Platform proporciona entornos limitados virtuales que dividen una sola instancia de Platform en entornos virtuales independientes para ayudar a desarrollar y desarrollar aplicaciones de experiencia digital.

Uso de las API de plataforma

Para obtener información sobre cómo realizar llamadas correctamente a las API de Platform, consulte la guía de introducción a las API de Platform.

Crear una conexión de origen

Puede crear una conexión de origen realizando una solicitud de POST al Flow Service API. Una conexión de origen consiste en un ID de conexión, una ruta al archivo de datos de origen y un ID de especificación de conexión.

Para crear una conexión de origen, también debe definir un valor de enumeración para el atributo de formato de datos.

Utilice los siguientes valores de enumeración para conectores basados en archivos:

Formato de datos Valor de enumeración
Delimitado delimited
JSON json
Parqué parquet

Para todos los conectores basados en tablas, establezca el valor en tabular.

Formato de API

POST /sourceConnections

Solicitud

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database source connection",
        "baseConnectionId": "6990abad-977d-41b9-a85d-17ea8cf1c0e4",
        "description": "Database source connection",
        "data": {
            "format": "tabular"
        },
        "params": {
            "tableName": "test1.Mytable",
            "columns": [
                {
                    "name": "TestID",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "Name",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "Datefield",
                    "type": "string",
                    "meta:xdmType": "date-time",
                    "xdm": {
                        "type": "string",
                        "format": "date-time"
                    }
                }
            ]
        },
        "connectionSpec": {
            "id": "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
            "version": "1.0"
        }
    }'
Propiedad Descripción
baseConnectionId El ID de conexión del origen de la base de datos.
params.path Ruta del archivo de origen.
connectionSpec.id El ID de especificación de conexión del origen de la base de datos. Consulte la Apéndice para obtener una lista de ID de especificaciones de base de datos.

Respuesta

Una respuesta correcta devuelve el identificador único (id) de la conexión de origen recién creada. Este ID es necesario en pasos posteriores para crear una conexión de destino.

{
    "id": "b7581b59-c603-4df1-a689-d23d7ac440f3",
    "etag": "\"ef05d265-0000-0200-0000-6019e0080000\""
}

Creación de un esquema XDM de destino

Para que los datos de origen se utilicen en Platform, se debe crear un esquema de destino para estructurar los datos de origen según sus necesidades. A continuación, el esquema de destino se utiliza para crear un conjunto de datos de Platform en el que se contienen los datos de origen.

Se puede crear un esquema XDM de destino realizando una solicitud de POST al API del Registro de esquemas.

Para ver los pasos detallados sobre cómo crear un esquema XDM de destino, consulte el tutorial sobre creación de un esquema con la API.

Creación de un conjunto de datos de destino

Se puede crear un conjunto de datos de destino realizando una solicitud de POST al API del servicio de catálogo, que proporciona el ID del esquema de destino dentro de la carga útil.

Para ver los pasos detallados sobre cómo crear un conjunto de datos de destinatario, consulte el tutorial en creación de un conjunto de datos mediante la API.

Creación de una conexión de destino

Una conexión de destino representa la conexión con el destino en el que llegan los datos introducidos. Para crear una conexión de destino, debe proporcionar el ID de especificación de conexión fija asociado al lago de datos. Este ID de especificación de conexión es: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Ahora tiene los identificadores únicos, un esquema de destino, un conjunto de datos de destino y el ID de especificación de conexión a un lago de datos. Al usar la variable Flow Service API, puede crear una conexión de destino especificando estos identificadores junto con el conjunto de datos que contendrán los datos de origen entrantes.

Formato de API

POST /targetConnections

Solicitud

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database target connection",
        "description": "Database target connection",
        "data": {
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "6019e0e7c5dcf718db5ebc71"
        },
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        }
    }'
Propiedad Descripción
data.schema.id La variable $id del esquema XDM de destino.
data.schema.version Versión del esquema. Este valor debe establecerse application/vnd.adobe.xed-full+json;version=1, que devuelve la última versión secundaria del esquema.
params.dataSetId El ID del conjunto de datos de destino recopilado en el paso anterior.
connectionSpec.id El ID de especificación de conexión que se utiliza para conectar con el lago de datos. Este ID es: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Respuesta

Una respuesta correcta devuelve el identificador único de la nueva conexión de destino (id). Este valor es necesario en un paso posterior para crear un flujo de datos.

{
    "id": "320f119a-5ac1-4ab1-88ea-eb19e674ea2e",
    "etag": "\"c0038936-0000-0200-0000-6019e1190000\""
}

Creación de una asignación

Para que los datos de origen se introduzcan en un conjunto de datos de destino, primero deben asignarse al esquema de destino al que se adhiera el conjunto de datos de destino.

Para crear un conjunto de asignaciones, realice una solicitud de POST al mappingSets punto final del Data Prep API al proporcionar el esquema XDM de destino $id y los detalles de los conjuntos de asignación que desea crear.

Formato de API

POST /mappingSets

Solicitud

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
        "xdmVersion": "1.0",
        "id": null,
        "mappings": [
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "TestID",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.fullName",
                "sourceAttribute": "Name",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.birthDate",
                "sourceAttribute": "Datefield",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            }
        ]
    }'
Propiedad Descripción
xdmSchema La variable $id del esquema XDM de destino.

Respuesta

Una respuesta correcta devuelve detalles de la asignación recién creada, incluido su identificador único (id). Este ID es necesario en un paso posterior para crear un flujo de datos.

{
    "id": "0b090130b58b4819afc78b6dc98b484d",
    "version": 0,
    "createdDate": 1612309018666,
    "modifiedDate": 1612309018666,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

Recuperar especificaciones de flujo de datos

Un flujo de datos es responsable de recopilar datos de las fuentes y traerlos a Platform. Para crear un flujo de datos, primero debe obtener las especificaciones del flujo de datos realizando una solicitud de GET al Flow Service API. Las especificaciones de flujo de datos son responsables de recopilar datos de una base de datos externa o sistema NoSQL.

Formato de API

GET /flowSpecs?property=name=="CRMToAEP"

Solicitud

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

Respuesta

Una respuesta correcta devuelve los detalles de la especificación de flujo de datos responsable de traer los datos de su origen a Platform. La respuesta incluye la especificación de flujo único id necesario para crear un nuevo flujo de datos.

{
    "items": [
        {
            "id": "14518937-270c-4525-bdec-c2ba7cce3860",
            "name": "CRMToAEP",
            "providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
            "version": "1.0",
            "sourceConnectionSpecIds": [
                "3416976c-a9ca-4bba-901a-1f08f66978ff",
                "38ad80fe-8b06-4938-94f4-d4ee80266b07",
                "d771e9c1-4f26-40dc-8617-ce58c4b53702",
                "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
                "cc6a4487-9e91-433e-a3a3-9cf6626c1806",
                "3000eb99-cd47-43f3-827c-43caf170f015",
                "26d738e0-8963-47ea-aadf-c60de735468a",
                "74a1c565-4e59-48d7-9d67-7c03b8a13137",
                "cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
                "4f63aa36-bd48-4e33-bb83-49fbcd11c708",
                "cb66ab34-8619-49cb-96d1-39b37ede86ea",
                "eb13cb25-47ab-407f-ba89-c0125281c563",
                "1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
                "37b6bf40-d318-4655-90be-5cd6f65d334b",
                "a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
                "221c7626-58f6-4eec-8ee2-042b0226f03b",
                "a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
                "6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
                "aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
                "8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
                "ecde33f2-c56f-46cc-bdea-ad151c16cd69",
                "102706fb-a5cd-42ee-afe0-bc42f017ff43",
                "09182899-b429-40c9-a15a-bf3ddbc8ced7",
                "0479cc14-7651-4354-b233-7480606c2ac3",
                "d6b52d86-f0f8-475f-89d4-ce54c8527328",
                "a8f4d393-1a6b-43f3-931f-91a16ed857f4",
                "1fe283f6-9bec-11ea-bb37-0242ac130002"
            ],
            "targetConnectionSpecIds": [
                "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
            ],
            "optionSpec": {
                "name": "OptionSpec",
                "spec": {
                    "$schema": "http://json-schema.org/draft-07/schema#",
                    "type": "object",
                    "properties": {
                        "errorDiagnosticsEnabled": {
                            "title": "Error diagnostics.",
                            "description": "Flag to enable detailed and sample error diagnostics summary.",
                            "type": "boolean",
                            "default": false
                        },
                        "partialIngestionPercent": {
                            "title": "Partial ingestion threshold.",
                            "description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
                            "type": "number",
                            "exclusiveMinimum": 0
                        }
                    }
                }
            },
            "transformationSpecs": [
                {
                    "name": "Copy",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "properties": {
                            "deltaColumn": {
                                "type": "object",
                                "properties": {
                                    "name": {
                                        "type": "string"
                                    },
                                    "dateFormat": {
                                        "type": "string"
                                    },
                                    "timezone": {
                                        "type": "string"
                                    }
                                },
                                "required": [
                                    "name"
                                ]
                            }
                        },
                        "required": [
                            "deltaColumn"
                        ]
                    }
                },
                {
                    "name": "Mapping",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "description": "defines various params required for different mapping from source to target",
                        "properties": {
                            "mappingId": {
                                "type": "string"
                            },
                            "mappingVersion": {
                                "type": "string"
                            }
                        }
                    }
                }
            ],
            "scheduleSpec": {
                "name": "PeriodicSchedule",
                "type": "Periodic",
                "spec": {
                    "$schema": "http://json-schema.org/draft-07/schema#",
                    "type": "object",
                    "properties": {
                        "startTime": {
                            "description": "epoch time",
                            "type": "integer"
                        },
                        "frequency": {
                            "type": "string",
                            "enum": [
                                "once",
                                "minute",
                                "hour",
                                "day",
                                "week"
                            ]
                        },
                        "interval": {
                            "type": "integer"
                        },
                        "backfill": {
                            "type": "boolean",
                            "default": true
                        }
                    },
                    "required": [
                        "startTime",
                        "frequency"
                    ],
                    "if": {
                        "properties": {
                            "frequency": {
                                "const": "once"
                            }
                        }
                    },
                    "then": {
                        "allOf": [
                            {
                                "not": {
                                    "required": [
                                        "interval"
                                    ]
                                }
                            },
                            {
                                "not": {
                                    "required": [
                                        "backfill"
                                    ]
                                }
                            }
                        ]
                    },
                    "else": {
                        "required": [
                            "interval"
                        ],
                        "if": {
                            "properties": {
                                "frequency": {
                                    "const": "minute"
                                }
                            }
                        },
                        "then": {
                            "properties": {
                                "interval": {
                                    "minimum": 15
                                }
                            }
                        },
                        "else": {
                            "properties": {
                                "interval": {
                                    "minimum": 1
                                }
                            }
                        }
                    }
                }
            },
            "attributes": {
                "notification": {
                    "category": "sources",
                    "flowRun": {
                        "enabled": true
                    }
                }
            },
            "permissionsInfo": {
                "view": [
                    {
                        "@type": "lowLevel",
                        "name": "EnterpriseSource",
                        "permissions": [
                            "read"
                        ]
                    }
                ],
                "manage": [
                    {
                        "@type": "lowLevel",
                        "name": "EnterpriseSource",
                        "permissions": [
                            "write"
                        ]
                    }
                ]
            }
        }
    ]
}

Crear un flujo de datos

El último paso para recopilar datos es crear un flujo de datos. En este punto, debe tener preparados los siguientes valores obligatorios:

Un flujo de datos es responsable de programar y recopilar datos de un origen. Puede crear un flujo de datos realizando una solicitud de POST mientras proporciona los valores antes mencionados dentro de la carga útil de la solicitud.

Para programar una ingesta, primero debe establecer el valor de la hora de inicio en hora de inicio en segundos. A continuación, debe establecer el valor de frecuencia en una de las cinco opciones: once, minute, hour, dayo week. El valor de intervalo designa el periodo entre dos ingestas consecutivas y la creación de una ingesta única no requiere que se establezca un intervalo. Para las demás frecuencias, el valor del intervalo debe establecerse en igual o bueno que 15.

Formato de API

POST /flows

Solicitud

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database dataflow using BigQuery",
        "description": "collecting test1.Mytable",
        "flowSpec": {
            "id": "14518937-270c-4525-bdec-c2ba7cce3860",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "b7581b59-c603-4df1-a689-d23d7ac440f3"
        ],
        "targetConnectionIds": [
            "320f119a-5ac1-4ab1-88ea-eb19e674ea2e"
        ],
        "transformations": [
            {
                "name": "Copy",
                "params": {
                    "deltaColumn": {
                        "name": "Datefield",
                        "dateFormat": "YYYY-MM-DD",
                        "timezone": "UTC"
                    }
                }
            },
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "0b090130b58b4819afc78b6dc98b484d",
                    "mappingVersion": 0
                }
            }
        ],
        "scheduleParams": {
            "startTime": "1612310466",
            "frequency":"minute",
            "interval":"15",
            "backfill": "true"
        }
    }'
Propiedad Descripción
flowSpec.id La variable ID de especificación de flujo recuperado en el paso anterior.
sourceConnectionIds La variable ID de conexión de origen recuperado en un paso anterior.
targetConnectionIds La variable ID de conexión de target recuperado en un paso anterior.
transformations.params.mappingId La variable ID de asignación recuperado en un paso anterior.
transformations.params.deltaColum La columna designada se utiliza para diferenciar entre datos nuevos y existentes. Los datos incrementales se incorporarán en función de la marca de tiempo de la columna seleccionada. El formato de fecha admitido para deltaColumn es yyyy-MM-dd HH:mm:ss. Si utiliza el almacenamiento de tablas de Azure, el formato admitido para deltaColumn es yyyy-MM-ddTHH:mm:ssZ.
transformations.params.mappingId ID de asignación asociado a la base de datos.
scheduleParams.startTime Hora de inicio del flujo de datos en tiempo de época.
scheduleParams.frequency Frecuencia con la que el flujo de datos recopilará datos. Los valores aceptables incluyen: once, minute, hour, dayo week.
scheduleParams.interval El intervalo designa el periodo entre dos ejecuciones de flujo consecutivas. El valor del intervalo debe ser un número entero distinto de cero. El intervalo no es necesario cuando la frecuencia está establecida como once y debe ser bueno o igual que 15 para otros valores de frecuencia.

Respuesta

Una respuesta correcta devuelve el ID (id) del flujo de datos recién creado.

{
    "id": "2edc08ac-4df5-4fe6-936f-81a19ce92f5c",
    "etag": "\"770029f8-0000-0200-0000-6019e7d40000\""
}

Monitorizar el flujo de datos

Una vez creado el flujo de datos, puede supervisar los datos que se están incorporando a través de él para ver información sobre ejecuciones de flujo, estado de finalización y errores. Para obtener más información sobre cómo monitorizar los flujos de datos, consulte el tutorial en monitorización de flujos de datos en la API

Pasos siguientes

Siguiendo este tutorial, ha creado un conector de origen para recopilar datos de una base de datos de forma programada. Los datos entrantes ahora se pueden usar en servicios de Platform descendentes como Real-time Customer Profile y Data Science Workspace. Consulte los siguientes documentos para obtener más información:

Apéndice

La siguiente sección enumera los diferentes conectores de origen de almacenamiento en la nube y sus especificaciones de conexiones.

Especificación de conexión

Nombre del conector ID de especificación de conexión
Amazon Redshift 3416976c-a9ca-4bba-901a-1f08f66978ff
Apache Hive en Azure HDInsights aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f
Apache Spark en Azure HDInsights 6a8d82bc-1caf-45d1-908d-cadabc9d63a6
Azure Data Explorer 0479cc14-7651-4354-b233-7480606c2ac3
Azure Synapse Analytics a49bcc7d-8038-43af-b1e4-5a7a089a7d79
Azure Table Storage ecde33f2-c56f-46cc-bdea-ad151c16cd69
Couchbase 1fe283f6-9bec-11ea-bb37-0242ac130002
Google BigQuery 3c9b37f8-13a6-43d8-bad3-b863b941fedd
Greenplum 37b6bf40-d318-4655-90be-5cd6f65d334b
IBM DB2 09182899-b429-40c9-a15a-bf3ddbc8ced7
MariaDB 000eb99-cd47-43f3-827c-43caf170f015
Microsoft SQL Server 1f372ff9-38a4-4492-96f5-b9a4e4bd00ec
MySQL 26d738e0-8963-47ea-aadf-c60de735468a
Oracle d6b52d86-f0f8-475f-89d4-ce54c8527328
Phoenix 102706fb-a5cd-42ee-afe0-bc42f017ff43
PostgreSQL 74a1c565-4e59-48d7-9d67-7c03b8a13137

En esta página