Crie um fluxo de dados para fontes de sucesso do cliente usando a API Flow Service

Este tutorial aborda as etapas para recuperar dados de uma fonte de sucesso do cliente e trazê-los para a plataforma usando a Flow Service API.

NOTE
  • Para criar um fluxo de dados, você já deve ter uma ID de conexão base válida com uma fonte de sucesso do cliente. Se você não tiver essa ID, consulte a visão geral das fontes para obter uma lista de fontes de sucesso do cliente com as quais você pode criar uma conexão base.
  • Para Experience Platform assimilar dados, os fusos horários de todas as fontes de lote baseadas em tabela devem ser configurados como UTC.

Introdução

Este tutorial requer que você tenha uma compreensão funcional dos seguintes componentes do Adobe Experience Platform:

  • Experience Data Model (XDM) System: a estrutura padronizada pela qual o Experience Platform organiza os dados de experiência do cliente.

    • Noções básicas sobre a composição de esquema: saiba mais sobre os blocos de construção básicos de esquemas XDM, incluindo princípios-chave e práticas recomendadas na composição de esquema.
    • Guia do desenvolvedor do Registro de Esquema: inclui informações importantes que você precisa saber para executar com êxito chamadas para a API do Registro de Esquema. Isso inclui o {TENANT_ID}, o conceito de "contêineres" e os cabeçalhos necessários para fazer solicitações (com atenção especial ao cabeçalho Aceitar e seus valores possíveis).
  • Catalog Service: Catálogo é o sistema de registro para localização e linhagem de dados em Experience Platform.

  • Batch ingestion: a API de assimilação em lote permite assimilar dados em Experience Platform como arquivos em lote.

  • Sandboxes: Experience Platform fornece sandboxes virtuais que particionam uma única instância do Platform em ambientes virtuais separados para ajudar a desenvolver aplicativos de experiência digital.

Uso de APIs da plataforma

Para obter informações sobre como fazer chamadas para APIs da Platform com êxito, consulte o manual sobre introdução às APIs da Platform.

Criar uma conexão de origem source

Você pode criar uma conexão de origem fazendo uma solicitação POST para a API Flow Service. Uma conexão de origem consiste em uma ID de conexão, um caminho para o arquivo de dados de origem e uma ID de especificação de conexão.

Para criar uma conexão de origem, você também deve definir um valor de enumeração para o atributo de formato de dados.

Use os seguintes valores de enumeração para conectores baseados em arquivo:

Formato dos dados
Valor de enumeração
Delimitado
delimited
JSON
json
Parquet
parquet

Para todos os conectores baseados em tabela, defina o valor tabular.

Formato da API

POST /sourceConnections

Solicitação

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Source connection for Customer Success",
        "baseConnectionId": "f1da3694-38a9-403d-9a36-9438a9203d42",
        "description": "Source connection for a Customer Success connector",
        "data": {
            "format": "tabular",
        },
        "params": {
            "tableName": "Account",
            "columns": [
                {
                    "name": "Id",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "Name",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "Phone",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "CreatedDate",
                    "type": "string",
                    "meta:xdmType": "date-time",
                    "xdm": {
                        "type": "string",
                        "format": "date-time"
                    }
                }
            ]
        },
        "connectionSpec": {
            "id": "cb66ab34-8619-49cb-96d1-39b37ede86ea",
            "version": "1.0"
        }
    }'
Propriedade
Descrição
baseConnectionId
O identificador de conexão exclusivo do sistema de sucesso do cliente de terceiros que você está acessando.
params.path
O caminho do arquivo de origem.
connectionSpec.id
A ID de especificação de conexão associada ao seu sistema de sucesso de cliente de terceiros específico. Consulte o apêndice para obter uma lista de IDs de especificação de conexão.

Resposta

Uma resposta bem-sucedida retorna o identificador exclusivo (id) da conexão de origem recém-criada. Essa ID é necessária nas etapas posteriores para criar uma conexão de destino.

{
    "id": "17faf955-2cf8-4b15-baf9-552cf88b1540",
    "etag": "\"2900a761-0000-0200-0000-5ed18cea0000\""
}

Criar um esquema XDM de destino target-schema

Para que os dados de origem sejam usados na Platform, um esquema de destino deve ser criado para estruturar os dados de origem de acordo com suas necessidades. O esquema de destino é usado para criar um conjunto de dados da Platform no qual os dados de origem estão contidos.

Um esquema XDM de destino pode ser criado executando uma solicitação POST para a API do Registro de Esquema.

Para obter etapas detalhadas sobre como criar um esquema XDM de destino, consulte o tutorial sobre criação de um esquema usando a API.

Criar um conjunto de dados de destino target-dataset

Um conjunto de dados de destino pode ser criado por meio de uma solicitação POST para a API de Serviço de Catálogo, fornecendo a ID do esquema de destino na carga.

Para obter etapas detalhadas sobre como criar um conjunto de dados de destino, consulte o tutorial sobre criação de um conjunto de dados usando a API.

Criar uma conexão de destino target-connection

Uma conexão de destino representa a conexão com o destino onde os dados assimilados chegam. Para criar uma conexão de destino, você deve fornecer a ID de especificação da conexão fixa associada ao Data Lake. Esta ID de especificação de conexão é: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Agora você tem os identificadores exclusivos, um esquema de destino, um conjunto de dados de destino e a ID de especificação da conexão para o data lake. Usando a API Flow Service, você pode criar uma conexão de destino especificando esses identificadores junto com o conjunto de dados que conterá os dados de origem de entrada.

Formato da API

POST /targetConnections
curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Target Connection for a customer success connector",
        "description": "Target Connection for a customer success connector",
        "data": {
            "format": "parquet_xdm",
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/deb3e1096c35d8311b5d80868c4bd5b3cdfd4b3150e7345f",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "5e543e8a60b15218ad44b95f"
        },
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        }
    }'
Propriedade
Descrição
data.schema.id
O $id do esquema XDM de destino.
data.schema.version
A versão do esquema. Este valor deve ser definido como application/vnd.adobe.xed-full+json;version=1, que retorna a versão secundária mais recente do esquema.
params.dataSetId
A ID do conjunto de dados de destino gerado na etapa anterior. Observação: você deve fornecer uma ID de conjunto de dados válida ao criar uma conexão de destino. Uma ID de conjunto de dados inválida resultará em um erro.
connectionSpec.id
A ID de especificação da conexão usada para se conectar ao data lake. Esta ID é: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Resposta

Uma resposta bem-sucedida retorna o identificador exclusivo (id) da nova conexão de destino. Esse valor é necessário em uma etapa posterior para criar um fluxo de dados.

{
    "id": "1f5af99c-f1ef-4076-9af9-9cf1ef507678",
    "etag": "\"530013e2-0000-0200-0000-5ebc4c110000\""
}

Criar um mapeamento mapping

Para que os dados de origem sejam assimilados em um conjunto de dados de destino, eles devem primeiro ser mapeados para o esquema de destino ao qual o conjunto de dados de destino adere.

Para criar um conjunto de mapeamento, faça uma solicitação POST para o ponto de extremidade mappingSets da Data Prep API enquanto fornece o esquema XDM de destino $id e os detalhes dos conjuntos de mapeamento que deseja criar.

Formato da API

POST /mappingSets

Solicitação

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/b750bd161fef405bc324d0c8809b02c494d73e60e7ae9b3e",
        "xdmVersion": "1.0",
        "id": null,
        "mappings": [
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "Id",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.fullName",
                "sourceAttribute": "Name",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "_repo.createDate",
                "sourceAttribute": "CreatedDate",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            }
        ]
    }'
Propriedade
Descrição
xdmSchema
O $id do esquema XDM de destino.

Resposta

Uma resposta bem-sucedida retorna detalhes do mapeamento recém-criado, incluindo seu identificador exclusivo (id). Essa ID é necessária em uma etapa posterior para criar um fluxo de dados.

{
    "id": "7c3547d3cfc14f568a51c32b4c0ed739",
    "version": 0,
    "createdDate": 1590792069173,
    "modifiedDate": 1590792069173,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

Recuperar especificações de fluxo de dados specs

Um fluxo de dados é responsável por coletar dados de fontes e trazê-los para a Platform. Para criar um fluxo de dados, primeiro obtenha as especificações do fluxo de dados executando uma solicitação GET para a API do Serviço de Fluxo. As especificações de fluxo de dados são responsáveis por coletar dados de um sistema de sucesso do cliente de terceiros.

Formato da API

GET /flowSpecs?property=name=="CRMToAEP"

Solicitação

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

Resposta

Uma resposta bem-sucedida retorna os detalhes da especificação do fluxo de dados responsáveis por trazer os dados de sua origem para a Platform. A resposta inclui a especificação de fluxo exclusiva id necessária para criar um novo fluxo de dados.

NOTE
A carga de resposta JSON abaixo está oculta por brevidade. Selecione "carga" para ver a carga de resposta.
Exibir conteúdo
code language-json
{
  "id": "14518937-270c-4525-bdec-c2ba7cce3860",
  "name": "CRMToAEP",
  "providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
  "version": "1.0",
  "attributes": {
    "isSourceFlow": true,
    "flacValidationSupported": true,
    "frequency": "batch",
    "notification": {
      "category": "sources",
      "flowRun": {
        "enabled": true
      }
    }
  },
  "sourceConnectionSpecIds": [
    "3416976c-a9ca-4bba-901a-1f08f66978ff",
    "38ad80fe-8b06-4938-94f4-d4ee80266b07",
    "d771e9c1-4f26-40dc-8617-ce58c4b53702",
    "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
    "cc6a4487-9e91-433e-a3a3-9cf6626c1806",
    "3000eb99-cd47-43f3-827c-43caf170f015",
    "26d738e0-8963-47ea-aadf-c60de735468a",
    "74a1c565-4e59-48d7-9d67-7c03b8a13137",
    "cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
    "4f63aa36-bd48-4e33-bb83-49fbcd11c708",
    "cb66ab34-8619-49cb-96d1-39b37ede86ea",
    "eb13cb25-47ab-407f-ba89-c0125281c563",
    "1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
    "37b6bf40-d318-4655-90be-5cd6f65d334b",
    "a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
    "221c7626-58f6-4eec-8ee2-042b0226f03b",
    "a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
    "6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
    "aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
    "8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
    "ecde33f2-c56f-46cc-bdea-ad151c16cd69",
    "102706fb-a5cd-42ee-afe0-bc42f017ff43",
    "09182899-b429-40c9-a15a-bf3ddbc8ced7",
    "0479cc14-7651-4354-b233-7480606c2ac3",
    "d6b52d86-f0f8-475f-89d4-ce54c8527328",
    "a8f4d393-1a6b-43f3-931f-91a16ed857f4",
    "1fe283f6-9bec-11ea-bb37-0242ac130002",
    "fcad62f3-09b0-41d3-be11-449d5a621b69",
    "ea1c2a08-b722-11eb-8529-0242ac130003",
    "35d6c4d8-c9a9-11eb-b8bc-0242ac130003",
    "ff4274f2-c9a9-11eb-b8bc-0242ac130003",
    "ba5126ec-c9ac-11eb-b8bc-0242ac130003",
    "b2e08744-4f1a-40ce-af30-7abac3e23cf3",
    "929e4450-0237-4ed2-9404-b7e1e0a00309",
    "2acf109f-9b66-4d5e-bc18-ebb2adcff8d5",
    "2fa8af9c-2d1a-43ea-a253-f00a00c74412"
  ],
  "targetConnectionSpecIds": [
    "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
  ],
  "permissionsInfo": {
    "view": [
      {
        "@type": "lowLevel",
        "name": "EnterpriseSource",
        "permissions": [
          "read"
        ]
      }
    ],
    "manage": [
      {
        "@type": "lowLevel",
        "name": "EnterpriseSource",
        "permissions": [
          "write"
        ]
      }
    ]
  },
  "optionSpec": {
    "name": "OptionSpec",
    "spec": {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "properties": {
        "errorDiagnosticsEnabled": {
          "title": "Error diagnostics.",
          "description": "Flag to enable detailed and sample error diagnostics summary.",
          "type": "boolean",
          "default": false
        },
        "partialIngestionPercent": {
          "title": "Partial ingestion threshold.",
          "description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
          "type": "number",
          "exclusiveMinimum": 0
        }
      }
    }
  },
  "scheduleSpec": {
    "name": "PeriodicSchedule",
    "type": "Periodic",
    "spec": {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "properties": {
        "startTime": {
          "description": "epoch time",
          "type": "integer"
        },
        "frequency": {
          "type": "string",
          "enum": [
            "once",
            "minute",
            "hour",
            "day",
            "week"
          ]
        },
        "interval": {
          "type": "integer"
        },
        "backfill": {
          "type": "boolean",
          "default": true
        }
      },
      "required": [
        "startTime",
        "frequency"
      ],
      "if": {
        "properties": {
          "frequency": {
            "const": "once"
          }
        }
      },
      "then": {
        "allOf": [
          {
            "not": {
              "required": [
                "interval"
              ]
            }
          },
          {
            "not": {
              "required": [
                "backfill"
              ]
            }
          }
        ]
      },
      "else": {
        "required": [
          "interval"
        ],
        "if": {
          "properties": {
            "frequency": {
              "const": "minute"
            }
          }
        },
        "then": {
          "properties": {
            "interval": {
              "minimum": 15
            }
          }
        },
        "else": {
          "properties": {
            "interval": {
              "minimum": 1
            }
          }
        }
      }
    }
  },
  "transformationSpec": [
    {
      "name": "Copy",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "properties": {
          "deltaColumn": {
            "type": "object",
            "properties": {
              "name": {
                "type": "string"
              },
              "dateFormat": {
                "type": "string"
              },
              "timezone": {
                "type": "string"
              }
            },
            "required": [
              "name"
            ]
          }
        },
        "required": [
          "deltaColumn"
        ]
      }
    },
    {
      "name": "Mapping",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "description": "defines various params required for different mapping from source to target",
        "properties": {
          "mappingId": {
            "type": "string"
          },
          "mappingVersion": {
            "type": "string"
          }
        }
      }
    }
  ],
  "runSpec": {
      "name": "ProviderParams",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "description": "defines various params required for creating flow run.",
        "properties": {
          "startTime": {
            "type": "integer",
            "description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
          },
          "windowStartTime": {
            "type": "integer",
            "description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
          },
          "windowEndTime": {
            "type": "integer",
            "description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
          },
          "deltaColumn": {
            "type": "object",
            "description": "The delta column is required to partition the data and separate newly ingested data from historic data.",
            "properties": {
              "name": {
                "type": "string"
              },
              "dateFormat": {
                "type": "string"
              },
              "timezone": {
                "type": "string"
              }
            },
            "required": [
              "name"
            ]
          }
        },
        "required": [
          "startTime",
          "windowStartTime",
          "windowEndTime",
          "deltaColumn"
        ]
      }
    }
}
Propriedade
Descrição
flowSpec.id
A ID de especificação de fluxo recuperou na etapa anterior.
sourceConnectionIds
A ID da conexão de origem recuperou em uma etapa anterior.
targetConnectionIds
A ID da conexão de destino recuperou em uma etapa anterior.
transformations.params.mappingId
A ID de mapeamento recuperou em uma etapa anterior.
transformations.params.deltaColum
A coluna designada usada para diferenciar entre dados novos e existentes. Os dados incrementais serão assimilados com base no carimbo de data e hora da coluna selecionada. O formato de data com suporte para deltaColumn é yyyy-MM-dd HH:mm:ss.
transformations.params.mappingId
A ID de mapeamento associada ao banco de dados.
scheduleParams.startTime
A hora de início do fluxo de dados em época.
scheduleParams.frequency
A frequência com que o fluxo de dados coletará dados. Os valores aceitáveis incluem: once, minute, hour, day ou week.
scheduleParams.interval

O intervalo designa o período entre duas execuções de fluxo consecutivas. O valor do intervalo deve ser um inteiro diferente de zero. O valor mínimo de intervalo aceito para cada frequência é o seguinte:

  • Uma vez: n/d
  • Minuto: 15
  • Hora: 1
  • Dia: 1
  • Semana: 1

Resposta

Uma resposta bem-sucedida retorna a ID id do fluxo de dados recém-criado.

{
    "id": "e0bd8463-0913-4ca1-bd84-6309134ca1f6",
    "etag": "\"04004fe9-0000-0200-0000-5ebc4c8b0000\""
}

Monitorar seu fluxo de dados

Depois que o fluxo de dados for criado, você poderá monitorar os dados que estão sendo assimilados por meio dele para ver informações sobre execuções de fluxo, status de conclusão e erros. Para obter mais informações sobre como monitorar fluxos de dados, consulte o tutorial sobre monitoramento de fluxos de dados na API

Próximas etapas

Seguindo este tutorial, você criou um conector de origem para coletar dados de um sistema de sucesso do cliente de forma programada. Os dados de entrada agora podem ser usados por serviços Platform downstream, como Real-Time Customer Profile e Data Science Workspace. Consulte os seguintes documentos para obter mais detalhes:

Apêndice

A seção a seguir lista os diferentes conectores de origem de armazenamento na nuvem e suas especificações de conexão.

Especificação de conexão

Nome do conector
Especificação da conexão
Salesforce Service Cloud
cb66ab34-8619-49cb-96d1-39b37ede86ea
ServiceNow
eb13cb25-47ab-407f-ba89-c0125281c563
recommendation-more-help
337b99bb-92fb-42ae-b6b7-c7042161d089