Crie um fluxo de dados para fontes de armazenamento em nuvem usando o Flow Service API

Este tutorial aborda as etapas para recuperar dados de uma fonte de armazenamento em nuvem e trazê-los para a Platform usando Flow Service API.

OBSERVAÇÃO

Para criar um fluxo de dados, você já deve ter uma ID de conexão base válida com uma fonte de armazenamento na nuvem. Se você não tiver essa ID, consulte a visão geral das fontes para obter uma lista de fontes de armazenamento em nuvem com as quais você pode criar uma conexão básica.

Introdução

Este tutorial requer uma compreensão funcional dos seguintes componentes do Adobe Experience Platform:

  • Experience Data Model (XDM) System: A estrutura padronizada pela qual o Experience Platform organiza os dados de experiência do cliente.
    • Noções básicas da composição do schema: Saiba mais sobre os elementos básicos dos esquemas XDM, incluindo princípios-chave e práticas recomendadas na composição do schema.
    • Guia do desenvolvedor do Registro de Schema: Inclui informações importantes que você precisa saber para executar com sucesso chamadas para a API do Registro de Esquema. Isso inclui as {TENANT_ID}, o conceito de "contêineres" e os cabeçalhos necessários para fazer solicitações (com especial atenção ao cabeçalho Accept e seus possíveis valores).
  • Catalog Service: Catálogo é o sistema de registro para localização e linhagem de dados no Experience Platform.
  • Batch ingestion: A API de assimilação em lote permite assimilar dados no Experience Platform como arquivos em lote.
  • Sandboxes: O Experience Platform fornece sandboxes virtuais que particionam uma única instância da Platform em ambientes virtuais separados para ajudar a desenvolver aplicativos de experiência digital.

Uso de APIs da plataforma

Para obter informações sobre como fazer chamadas para APIs da plataforma com êxito, consulte o guia em introdução às APIs do Platform.

Criar uma conexão de origem

Você pode criar uma conexão de origem fazendo uma solicitação de POST para o sourceConnections ponto final de Flow Service API ao fornecer a ID de conexão básica, o caminho para o arquivo de origem que você deseja assimilar e a ID de especificação de conexão correspondente da fonte.

Ao criar uma conexão de origem, você também deve definir um valor enum para o atributo de formato de dados.

Use os seguintes valores enum para fontes baseadas em arquivo:

Formato dos dados Valor de enumeração
Delimitado delimited
JSON json
Parquet parquet

Para todas as fontes baseadas em tabela, defina o valor como tabular.

Formato da API

POST /sourceConnections

Solicitação

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Cloud Storage source connection",
        "description: "Source connection for a cloud storage source",
        "baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
        "data": {
            "format": "delimited",
            "properties": {
                "columnDelimiter": "{COLUMN_DELIMITER}",
                "encoding": "{ENCODING}"
                "compressionType": "{COMPRESSION_TYPE}"
            }
        },
        "params": {
            "path": "/acme/summerCampaign/account.csv",
            "type": "file"
        },
        "connectionSpec": {
            "id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
            "version": "1.0"
        }
    }'
Propriedade Descrição
baseConnectionId A ID de conexão básica da fonte de armazenamento em nuvem.
data.format O formato dos dados que você deseja trazer para a plataforma. Os valores compatíveis são: delimited, JSONe parquet.
data.properties (Opcional) Um conjunto de propriedades que você pode aplicar aos seus dados ao criar uma conexão de origem.
data.properties.columnDelimiter (Opcional) Um delimitador de coluna de caractere único que pode ser especificado ao coletar arquivos simples. Qualquer valor de caractere único é um delimitador de coluna permitido. Se não for fornecida, uma vírgula (,) é usada como o valor padrão. Observação: O columnDelimiter só pode ser usada ao assimilar arquivos delimitados.
data.properties.encoding (Opcional) Uma propriedade que define o tipo de codificação a ser usado ao assimilar seus dados na Platform. Os tipos de codificação compatíveis são: UTF-8 e ISO-8859-1. Observação: O encoding só está disponível ao assimilar arquivos CSV delimitados. Outros tipos de arquivos serão assimilados com a codificação padrão, UTF-8.
data.properties.compressionType (Opcional) Uma propriedade que define o tipo de arquivo compactado para assimilação. Os tipos de arquivos compactados compatíveis são: bzip2, gzip, deflate, zipDeflate, tarGzipe tar. Observação: O compressionType A propriedade só pode ser usada ao assimilar arquivos delimitados ou JSON.
params.path O caminho do arquivo de origem que você está acessando.
connectionSpec.id A ID da especificação de conexão associada à fonte de armazenamento em nuvem específica. Consulte a apêndice para obter uma lista de IDs de especificação de conexão.

Resposta

Uma resposta bem-sucedida retorna o identificador exclusivo (id) da conexão de origem recém-criada. Essa ID é necessária em uma etapa posterior para criar um fluxo de dados.

{
    "id": "26b53912-1005-49f0-b539-12100559f0e2",
    "etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}

Criar um esquema XDM de destino

Para que os dados de origem sejam usados na Platform, um schema de target deve ser criado para estruturar os dados de origem de acordo com suas necessidades. O schema de destino é usado para criar um conjunto de dados da plataforma no qual os dados de origem estão contidos.

Um esquema XDM de destino pode ser criado executando-se uma solicitação de POST para a API do Registro de Schema.

Para obter etapas detalhadas sobre como criar um esquema XDM de destino, consulte o tutorial em criação de um schema usando a API.

Criar um conjunto de dados de destino

Um conjunto de dados de destino pode ser criado executando uma solicitação de POST para a API do Serviço de catálogo, fornecendo a ID do schema do target no payload.

Para obter etapas detalhadas sobre como criar um conjunto de dados de destino, consulte o tutorial em criação de um conjunto de dados usando a API.

Criar uma conexão de destino

Uma conexão target representa a conexão com o destino onde os dados assimilados chegam. Para criar uma conexão de destino, você deve fornecer a ID de especificação de conexão fixa associada ao Data Lake. Essa ID de especificação de conexão é: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Agora você tem os identificadores exclusivos em um esquema de destino em um conjunto de dados de destino e a ID de especificação de conexão no Data Lake. Usando esses identificadores, você pode criar uma conexão de target usando o Flow Service API para especificar o conjunto de dados que conterá os dados de origem de entrada.

Formato da API

POST /targetConnections

Solicitação

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Target Connection for a Cloud Storage connector",
        "description": "Target Connection for a Cloud Storage connector",
        "data": {
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "5f3c3cedb2805c194ff0b69a"
        },
            "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        }
    }'
Propriedade Descrição
data.schema.id O $id do esquema XDM de destino.
data.schema.version A versão do schema. Este valor deve ser definido application/vnd.adobe.xed-full+json;version=1, que retorna a versão secundária mais recente do esquema.
params.dataSetId A ID do conjunto de dados de destino.
connectionSpec.id A ID de especificação de conexão fixa para o Data Lake. Essa ID é: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Resposta

Uma resposta bem-sucedida retorna o identificador exclusivo da nova conexão de destino (id). Essa ID é necessária em etapas posteriores.

{
    "id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

Criar um mapeamento

Para que os dados de origem sejam assimilados em um conjunto de dados de destino, eles devem primeiro ser mapeados para o schema de destino ao qual o conjunto de dados de destino adere.

Para criar um conjunto de mapeamento, faça uma solicitação de POST para a função mappingSets endpoint da variável Data Prep API ao fornecer seu esquema XDM de destino $id e os detalhes dos conjuntos de mapeamento que deseja criar.

DICA

Você pode mapear tipos de dados complexos, como matrizes em arquivos JSON, usando um conector de origem de armazenamento na nuvem.

Formato da API

POST /conversion/mappingSets

Solicitação

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
        "xdmVersion": "1.0",
        "id": null,
        "mappings": [
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "Id",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.firstName",
                "sourceAttribute": "FirstName",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.lastName",
                "sourceAttribute": "LastName",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            }
        ]
    }'
Propriedade Descrição
xdmSchema A ID do esquema XDM de destino.

Resposta

Uma resposta bem-sucedida retorna detalhes do mapeamento recém-criado, incluindo seu identificador exclusivo (id). Esse valor é necessário em uma etapa posterior para criar um fluxo de dados.

{
    "id": "bf5286a9c1ad4266baca76ba3adc9366",
    "version": 0,
    "createdDate": 1597784069368,
    "modifiedDate": 1597784069368,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

Recuperar especificações do fluxo de dados

Um fluxo de dados é responsável por coletar dados de fontes e trazê-los para a plataforma. Para criar um fluxo de dados, primeiro você deve obter as especificações do fluxo de dados responsáveis pela coleta de dados do armazenamento em nuvem.

Formato da API

GET /flowSpecs?property=name=="CloudStorageToAEP"

Solicitação

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

Resposta

Uma resposta bem-sucedida retorna os detalhes da especificação de fluxo de dados responsável por trazer os dados da fonte para a Plataforma. A resposta inclui as especificações exclusivas de fluxo id necessário para criar um novo fluxo de dados.

{
    "items": [
        {
            "id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
            "name": "CloudStorageToAEP",
            "providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
            "version": "1.0",
            "sourceConnectionSpecIds": [
                "b3ba5556-48be-44b7-8b85-ff2b69b46dc4",
                "ecadc60c-7455-4d87-84dc-2a0e293d997b",
                "b7829c2f-2eb0-4f49-a6ee-55e33008b629",
                "4c10e202-c428-4796-9208-5f1f5732b1cf",
                "fb2e94c9-c031-467d-8103-6bd6e0a432f2",
                "32e8f412-cdf7-464c-9885-78184cb113fd",
                "b7bf2577-4520-42c9-bae9-cad01560f7bc",
                "998b8ae3-cec0-43b7-8abe-40b1eb4ee069",
                "be5ec48c-5b78-49d5-b8fa-7c89ec4569b8"
            ],
            "targetConnectionSpecIds": [
                "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
            ],
            "transformationSpecs": [
                {
                    "name": "Mapping",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "description": "defines various params required for different mapping from source to target",
                        "properties": {
                            "mappingId": {
                                "type": "string"
                            },
                            "mappingVersion": {
                                "type": "string"
                            }
                        }
                    }
                }
            ],
            "scheduleSpec": {
                "name": "PeriodicSchedule",
                "type": "Periodic",
                "spec": {
                    "$schema": "http://json-schema.org/draft-07/schema#",
                    "type": "object",
                    "properties": {
                        "startTime": {
                            "description": "epoch time",
                            "type": "integer"
                        },
                        "endTime": {
                            "description": "epoch time",
                            "type": "integer"
                        },
                        "interval": {
                            "type": "integer"
                        },
                        "frequency": {
                            "type": "string",
                            "enum": [
                                "minute",
                                "hour",
                                "day",
                                "week"
                            ]
                        },
                        "backfill": {
                            "type": "boolean",
                            "default": true
                        }
                    },
                    "required": [
                        "startTime",
                        "frequency",
                        "interval"
                    ],
                    "if": {
                        "properties": {
                            "frequency": {
                                "const": "minute"
                            }
                        }
                    },
                    "then": {
                        "properties": {
                            "interval": {
                                "minimum": 15
                            }
                        }
                    },
                    "else": {
                        "properties": {
                            "interval": {
                                "minimum": 1
                            }
                        }
                    }
                }
            },
            "permissionsInfo": {
                "view": [
                    {
                        "@type": "lowLevel",
                        "name": "EnterpriseSource",
                        "permissions": [
                            "read"
                        ]
                    }
                ],
                "manage": [
                    {
                        "@type": "lowLevel",
                        "name": "EnterpriseSource",
                        "permissions": [
                            "write"
                        ]
                    }
                ]
            }
        }
    ]
}

Criar um fluxo de dados

A última etapa para coletar dados de armazenamento em nuvem é criar um fluxo de dados. Por enquanto, você terá os seguintes valores obrigatórios preparados:

Um fluxo de dados é responsável por agendar e coletar dados de uma fonte. Você pode criar um fluxo de dados executando uma solicitação de POST e, ao mesmo tempo, fornecendo os valores mencionados anteriormente dentro da carga útil.

OBSERVAÇÃO

Para assimilação em lote, cada fluxo de dados subsequente seleciona arquivos a serem assimilados da sua origem com base em seus última modificação timestamp. Isso significa que os fluxos de dados em lote selecionam arquivos da origem que são novos ou foram modificados desde a última execução do fluxo de dados.

Para agendar uma assimilação, primeiro defina o valor de hora de início como época em segundos. Em seguida, você deve definir o valor de frequência para uma das cinco opções: once, minute, hour, dayou week. O valor de intervalo designa o período entre duas ingestões consecutivas e a criação de uma ingestão única não requer a definição de um intervalo. Para todas as outras frequências, o valor do intervalo deve ser definido como igual ou superior a 15.

IMPORTANTE

É altamente recomendável agendar seu fluxo de dados para uma ingestão única ao usar a variável Conector FTP.

Formato da API

POST /flows

Solicitação

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Cloud Storage flow to Platform",
        "description": "Cloud Storage flow to Platform",
        "flowSpec": {
            "id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "26b53912-1005-49f0-b539-12100559f0e2"
        ],
        "targetConnectionIds": [
            "f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
        ],
        "transformations": [
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
                    "mappingVersion": 0
                }
            }
        ],
        "scheduleParams": {
            "startTime": "1597784298",
            "frequency":"minute",
            "interval":"30"
        }
    }'
Propriedade Descrição
flowSpec.id O ID de especificação de fluxo recuperada na etapa anterior.
sourceConnectionIds O ID de conexão de origem recuperada em uma etapa anterior.
targetConnectionIds O target connection ID recuperada em uma etapa anterior.
transformations.params.mappingId O ID de mapeamento recuperada em uma etapa anterior.
scheduleParams.startTime A hora de início do fluxo de dados em época.
scheduleParams.frequency A frequência com que o fluxo de dados coletará dados. Os valores aceitáveis incluem: once, minute, hour, dayou week.
scheduleParams.interval O intervalo designa o período entre duas execuções consecutivas de fluxo. O valor do intervalo deve ser um número inteiro diferente de zero. O intervalo não é necessário quando a frequência é definida como once e deve ser maior que ou igual a 15 para outros valores de frequência.

Resposta

Uma resposta bem-sucedida retorna a ID (id) do fluxo de dados recém-criado.

{
    "id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

Monitorar o fluxo de dados

Depois que o fluxo de dados tiver sido criado, você poderá monitorar os dados que estão sendo assimilados por meio dele para ver informações sobre execuções de fluxo, status de conclusão e erros. Para obter mais informações sobre como monitorar fluxos de dados, consulte o tutorial em monitoramento de fluxos de dados na API

Próximas etapas

Ao seguir este tutorial, você criou um conector de origem para coletar dados do armazenamento em nuvem de forma agendada. Os dados recebidos agora podem ser usados por serviços downstream da plataforma, como Real-time Customer Profile e Data Science Workspace. Consulte os seguintes documentos para obter mais detalhes:

Apêndice

A seção a seguir lista os diferentes conectores de origem de armazenamento em nuvem e suas especificações de conexões.

Especificação de conexão

Nome do conector Especificação de conexão
Amazon S3 (S3) ecadc60c-7455-4d87-84dc-2a0e293d997b
Amazon Kinesis (Kinesis) 86043421-563b-46ec-8e6c-e23184711bf6
Azure Blob (Blob) 4c10e202-c428-4796-9208-5f1f5732b1cf
Azure Data Lake Storage Gen2 (ADLS Gen2) b3ba5556-48be-44b7-8b85-ff2b69b46dc4
Azure Event Hubs (Hubs de eventos) bf9f5905-92b7-48bf-bf20-455bc6b60a4e
Azure File Storage be5ec48c-5b78-49d5-b8fa-7c89ec4569b8
Google Cloud Storage 32e8f412-cdf7-464c-9885-78184cb113fd
HDFS 54e221aa-d342-4707-bcff-7a4bceef0001
Oracle Object Storage c85f9425-fb21-426c-ad0b-405e9bd8a46c
SFTP bf367b0d-3d9b-4060-b67b-0d3d9bd06094

Nesta página