Criar um fluxo de dados para fontes de banco de dados usando a API Flow Service
Este tutorial aborda as etapas para recuperar dados de uma fonte de banco de dados e trazê-los para a Platform usando a Flow Service API.
- Para criar um fluxo de dados, você já deve ter uma ID de conexão base válida com uma origem de banco de dados. Se você não tiver essa ID, consulte a visão geral das fontes para obter uma lista de fontes de banco de dados com as quais você pode criar uma conexão base.
- Para Experience Platform assimilar dados, os fusos horários de todas as fontes de lote baseadas em tabela devem ser configurados como UTC. O único carimbo de data/hora com suporte para Snowflake origem é TIMESTAMP_NTZ com hora UTC.
Introdução
Este tutorial requer que você tenha uma compreensão funcional dos seguintes componentes do Adobe Experience Platform:
-
Experience Data Model (XDM) System: a estrutura padronizada pela qual o Experience Platform organiza os dados de experiência do cliente.
- Noções básicas sobre a composição de esquema: saiba mais sobre os blocos de construção básicos de esquemas XDM, incluindo princípios-chave e práticas recomendadas na composição de esquema.
- Guia do desenvolvedor do Registro de Esquema: inclui informações importantes que você precisa saber para executar com êxito chamadas para a API do Registro de Esquema. Isso inclui o
{TENANT_ID}
, o conceito de "contêineres" e os cabeçalhos necessários para fazer solicitações (com atenção especial ao cabeçalho Aceitar e seus valores possíveis).
-
Catalog Service: Catálogo é o sistema de registro para localização e linhagem de dados no Experience Platform.
-
Batch ingestion: a API de assimilação em lote permite assimilar dados no Experience Platform como arquivos em lote.
-
Sandboxes: o Experience Platform fornece sandboxes virtuais que particionam uma única instância da Platform em ambientes virtuais separados para ajudar a desenvolver aplicativos de experiência digital.
Uso de APIs da plataforma
Para obter informações sobre como fazer chamadas para APIs da Platform com êxito, consulte o manual sobre introdução às APIs da Platform.
Criar uma conexão de origem source
Você pode criar uma conexão de origem fazendo uma solicitação POST para a API Flow Service. Uma conexão de origem consiste em uma ID de conexão, um caminho para o arquivo de dados de origem e uma ID de especificação de conexão.
Para criar uma conexão de origem, você também deve definir um valor de enumeração para o atributo de formato de dados.
Use os seguintes valores de enumeração para conectores baseados em arquivo:
delimited
json
parquet
Para todos os conectores baseados em tabela, defina o valor como tabular
.
Formato da API
POST /sourceConnections
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database source connection",
"baseConnectionId": "6990abad-977d-41b9-a85d-17ea8cf1c0e4",
"description": "Database source connection",
"data": {
"format": "tabular"
},
"params": {
"tableName": "test1.Mytable",
"columns": [
{
"name": "TestID",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Name",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Datefield",
"type": "string",
"meta:xdmType": "date-time",
"xdm": {
"type": "string",
"format": "date-time"
}
}
]
},
"connectionSpec": {
"id": "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
"version": "1.0"
}
}'
baseConnectionId
params.path
connectionSpec.id
Resposta
Uma resposta bem-sucedida retorna o identificador exclusivo (id
) da conexão de origem recém-criada. Essa ID é necessária nas etapas posteriores para criar uma conexão de destino.
{
"id": "b7581b59-c603-4df1-a689-d23d7ac440f3",
"etag": "\"ef05d265-0000-0200-0000-6019e0080000\""
}
Criar um esquema XDM de destino target-schema
Para que os dados de origem sejam usados na Platform, um esquema de destino deve ser criado para estruturar os dados de origem de acordo com suas necessidades. O esquema de destino é usado para criar um conjunto de dados da Platform no qual os dados de origem estão contidos.
Um esquema XDM de destino pode ser criado executando uma solicitação POST para a API do Registro de Esquema.
Para obter etapas detalhadas sobre como criar um esquema XDM de destino, consulte o tutorial sobre criação de um esquema usando a API.
Criar um conjunto de dados de destino target-dataset
Um conjunto de dados de destino pode ser criado por meio de uma solicitação POST para a API de Serviço de Catálogo, fornecendo a ID do esquema de destino na carga.
Para obter etapas detalhadas sobre como criar um conjunto de dados de destino, consulte o tutorial sobre criação de um conjunto de dados usando a API.
Criar uma conexão de destino target-connection
Uma conexão de destino representa a conexão com o destino onde os dados assimilados chegam. Para criar uma conexão de destino, você deve fornecer a ID de especificação da conexão fixa associada ao Data Lake. Esta ID de especificação de conexão é: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Agora você tem os identificadores exclusivos, um esquema de destino, um conjunto de dados de destino e a ID de especificação da conexão para o data lake. Usando a API Flow Service, você pode criar uma conexão de destino especificando esses identificadores junto com o conjunto de dados que conterá os dados de origem de entrada.
Formato da API
POST /targetConnections
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database target connection",
"description": "Database target connection",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "6019e0e7c5dcf718db5ebc71"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
do esquema XDM de destino.data.schema.version
application/vnd.adobe.xed-full+json;version=1
, que retorna a versão secundária mais recente do esquema.params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
.Resposta
Uma resposta bem-sucedida retorna o identificador exclusivo (id
) da nova conexão de destino. Esse valor é necessário em uma etapa posterior para criar um fluxo de dados.
{
"id": "320f119a-5ac1-4ab1-88ea-eb19e674ea2e",
"etag": "\"c0038936-0000-0200-0000-6019e1190000\""
}
Criar um mapeamento mapping
Para que os dados de origem sejam assimilados em um conjunto de dados de destino, eles devem primeiro ser mapeados para o esquema de destino ao qual o conjunto de dados de destino adere.
Para criar um conjunto de mapeamento, faça uma solicitação POST para o ponto de extremidade mappingSets
da Data Prep API enquanto fornece o esquema XDM de destino $id
e os detalhes dos conjuntos de mapeamento que deseja criar.
Formato da API
POST /mappingSets
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "TestID",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.fullName",
"sourceAttribute": "Name",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.birthDate",
"sourceAttribute": "Datefield",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
xdmSchema
$id
do esquema XDM de destino.Resposta
Uma resposta bem-sucedida retorna detalhes do mapeamento recém-criado, incluindo seu identificador exclusivo (id
). Essa ID é necessária em uma etapa posterior para criar um fluxo de dados.
{
"id": "0b090130b58b4819afc78b6dc98b484d",
"version": 0,
"createdDate": 1612309018666,
"modifiedDate": 1612309018666,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Recuperar especificações de fluxo de dados specs
Um fluxo de dados é responsável por coletar dados de fontes e trazê-los para a Platform. Para criar um fluxo de dados, você deve primeiro obter as especificações do fluxo de dados executando uma solicitação GET para a API Flow Service. As especificações de fluxo de dados são responsáveis pela coleta de dados de um banco de dados externo ou sistema NoSQL.
Formato da API
GET /flowSpecs?property=name=="CRMToAEP"
Solicitação
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Resposta
Uma resposta bem-sucedida retorna os detalhes da especificação do fluxo de dados responsáveis por trazer os dados de sua origem para a Platform. A resposta inclui a especificação de fluxo exclusiva id
necessária para criar um novo fluxo de dados.
code language-json |
---|
|
Criar um fluxo de dados
A última etapa para coletar dados é criar um fluxo de dados. Nesse ponto, os seguintes valores obrigatórios devem ser preparados:
Um fluxo de dados é responsável por agendar e coletar dados de uma origem. Você pode criar um fluxo de dados executando uma solicitação POST enquanto fornece os valores mencionados anteriormente na carga da solicitação.
Para agendar uma assimilação, primeiro defina o valor da hora inicial como a época em segundos. Em seguida, defina o valor de frequência para uma das cinco opções: once
, minute
, hour
, day
ou week
. O valor do intervalo designa o período entre duas assimilações consecutivas e a criação de uma assimilação única não requer que um intervalo seja definido. Para todas as outras frequências, o valor do intervalo deve ser definido como igual ou maior que 15
.
Formato da API
POST /flows
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database dataflow using BigQuery",
"description": "collecting test1.Mytable",
"flowSpec": {
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"version": "1.0"
},
"sourceConnectionIds": [
"b7581b59-c603-4df1-a689-d23d7ac440f3"
],
"targetConnectionIds": [
"320f119a-5ac1-4ab1-88ea-eb19e674ea2e"
],
"transformations": [
{
"name": "Copy",
"params": {
"deltaColumn": {
"name": "Datefield",
"dateFormat": "YYYY-MM-DD",
"timezone": "UTC"
}
}
},
{
"name": "Mapping",
"params": {
"mappingId": "0b090130b58b4819afc78b6dc98b484d",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1612310466",
"frequency":"minute",
"interval":"15",
"backfill": "true"
}
}'
+++
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
transformations.params.deltaColum
deltaColumn
é yyyy-MM-dd HH:mm:ss
. Se você estiver usando o Armazenamento de Tabelas do Azure, o formato com suporte para deltaColumn
é yyyy-MM-ddTHH:mm:ssZ
.transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
once
, minute
, hour
, day
ou week
.scheduleParams.interval
O intervalo designa o período entre duas execuções de fluxo consecutivas. O valor do intervalo deve ser um inteiro diferente de zero. O valor mínimo de intervalo aceito para cada frequência é o seguinte:
- Uma vez: n/d
- Minuto: 15
- Hora: 1
- Dia: 1
- Semana: 1
Resposta
Uma resposta bem-sucedida retorna a ID (id
) do fluxo de dados recém-criado.
{
"id": "2edc08ac-4df5-4fe6-936f-81a19ce92f5c",
"etag": "\"770029f8-0000-0200-0000-6019e7d40000\""
}
Monitorar seu fluxo de dados
Depois que o fluxo de dados for criado, você poderá monitorar os dados que estão sendo assimilados por meio dele para ver informações sobre execuções de fluxo, status de conclusão e erros. Para obter mais informações sobre como monitorar fluxos de dados, consulte o tutorial sobre monitoramento de fluxos de dados na API
Próximas etapas
Seguindo este tutorial, você criou um conector de origem para coletar dados de um banco de dados de forma programada. Os dados de entrada agora podem ser usados por serviços downstream da Platform, como Real-Time Customer Profile e Data Science Workspace. Consulte os seguintes documentos para obter mais detalhes:
Apêndice
A seção a seguir lista os diferentes conectores de origem de armazenamento na nuvem e suas especificações de conexão.
Especificação de conexão
3416976c-a9ca-4bba-901a-1f08f66978ff
aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f
6a8d82bc-1caf-45d1-908d-cadabc9d63a6
0479cc14-7651-4354-b233-7480606c2ac3
a49bcc7d-8038-43af-b1e4-5a7a089a7d79
ecde33f2-c56f-46cc-bdea-ad151c16cd69
1fe283f6-9bec-11ea-bb37-0242ac130002
3c9b37f8-13a6-43d8-bad3-b863b941fedd
37b6bf40-d318-4655-90be-5cd6f65d334b
09182899-b429-40c9-a15a-bf3ddbc8ced7
000eb99-cd47-43f3-827c-43caf170f015
1f372ff9-38a4-4492-96f5-b9a4e4bd00ec
26d738e0-8963-47ea-aadf-c60de735468a
d6b52d86-f0f8-475f-89d4-ce54c8527328
102706fb-a5cd-42ee-afe0-bc42f017ff43
74a1c565-4e59-48d7-9d67-7c03b8a13137