Este tutorial aborda as etapas para recuperar dados de uma fonte de armazenamento em nuvem e trazê-los para a Platform usando Flow Service API.
Para criar um fluxo de dados, você já deve ter uma ID de conexão base válida com uma fonte de armazenamento na nuvem. Se você não tiver essa ID, consulte a visão geral das fontes para obter uma lista de fontes de armazenamento em nuvem com as quais você pode criar uma conexão básica.
Este tutorial requer uma compreensão funcional dos seguintes componentes do Adobe Experience Platform:
{TENANT_ID}
, o conceito de "contêineres" e os cabeçalhos necessários para fazer solicitações (com especial atenção ao cabeçalho Accept e seus possíveis valores).Para obter informações sobre como fazer chamadas para APIs da plataforma com êxito, consulte o guia em introdução às APIs do Platform.
Você pode criar uma conexão de origem fazendo uma solicitação de POST para o sourceConnections
ponto final de Flow Service API ao fornecer a ID de conexão básica, o caminho para o arquivo de origem que você deseja assimilar e a ID de especificação de conexão correspondente da fonte.
Ao criar uma conexão de origem, você também deve definir um valor enum para o atributo de formato de dados.
Use os seguintes valores enum para fontes baseadas em arquivo:
Formato dos dados | Valor de enumeração |
---|---|
Delimitado | delimited |
JSON | json |
Parquet | parquet |
Para todas as fontes baseadas em tabela, defina o valor como tabular
.
Formato da API
POST /sourceConnections
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}",
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Propriedade | Descrição |
---|---|
baseConnectionId |
A ID de conexão básica da fonte de armazenamento em nuvem. |
data.format |
O formato dos dados que você deseja trazer para a plataforma. Os valores compatíveis são: delimited , JSON e parquet . |
data.properties |
(Opcional) Um conjunto de propriedades que você pode aplicar aos seus dados ao criar uma conexão de origem. |
data.properties.columnDelimiter |
(Opcional) Um delimitador de coluna de caractere único que pode ser especificado ao coletar arquivos simples. Qualquer valor de caractere único é um delimitador de coluna permitido. Se não for fornecida, uma vírgula (, ) é usada como o valor padrão. Observação: O columnDelimiter só pode ser usada ao assimilar arquivos delimitados. |
data.properties.encoding |
(Opcional) Uma propriedade que define o tipo de codificação a ser usado ao assimilar seus dados na Platform. Os tipos de codificação compatíveis são: UTF-8 e ISO-8859-1 . Observação: O encoding só está disponível ao assimilar arquivos CSV delimitados. Outros tipos de arquivos serão assimilados com a codificação padrão, UTF-8 . |
data.properties.compressionType |
(Opcional) Uma propriedade que define o tipo de arquivo compactado para assimilação. Os tipos de arquivos compactados compatíveis são: bzip2 , gzip , deflate , zipDeflate , tarGzip e tar . Observação: O compressionType A propriedade só pode ser usada ao assimilar arquivos delimitados ou JSON. |
params.path |
O caminho do arquivo de origem que você está acessando. Esse parâmetro indica um arquivo individual ou uma pasta inteira. Observação: Você pode usar um asterisco no lugar do nome do arquivo para especificar a assimilação de uma pasta inteira. Por exemplo: /acme/summerCampaign/*.csv ingerirá todo o /acme/summerCampaign/ pasta. |
params.type |
O tipo de arquivo do arquivo de dados de origem que você está assimilando. Tipo de uso file para assimilar um arquivo individual e usar o tipo folder para assimilar uma pasta inteira. |
connectionSpec.id |
A ID da especificação de conexão associada à fonte de armazenamento em nuvem específica. Consulte a apêndice para obter uma lista de IDs de especificação de conexão. |
Resposta
Uma resposta bem-sucedida retorna o identificador exclusivo (id
) da conexão de origem recém-criada. Essa ID é necessária em uma etapa posterior para criar um fluxo de dados.
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
Você pode usar expressões regulares para assimilar um conjunto específico de arquivos da origem para a Plataforma ao criar uma conexão de origem.
Formato da API
POST /sourceConnections
Solicitação
No exemplo abaixo, a expressão regular é usada no caminho do arquivo para especificar a assimilação de todos os arquivos CSV que têm premium
em seu nome.
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/*premium*.csv",
"type": "folder"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Ao criar uma conexão de origem, você pode usar a variável recursive
para assimilar dados de pastas profundamente aninhadas.
Formato da API
POST /sourceConnections
Solicitação
No exemplo abaixo, a variável recursive: true
informa o parâmetro Flow Service para ler todas as subpastas recursivamente durante o processo de ingestão.
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source with recursive ingestion",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/customers/premium/buyers/recursive",
"type": "folder",
"recursive": true
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Para que os dados de origem sejam usados na Platform, um schema de target deve ser criado para estruturar os dados de origem de acordo com suas necessidades. O schema de destino é usado para criar um conjunto de dados da plataforma no qual os dados de origem estão contidos.
Um esquema XDM de destino pode ser criado executando-se uma solicitação de POST para a API do Registro de Schema.
Para obter etapas detalhadas sobre como criar um esquema XDM de destino, consulte o tutorial em criação de um schema usando a API.
Um conjunto de dados de destino pode ser criado executando uma solicitação de POST para a API do Serviço de catálogo, fornecendo a ID do schema do target no payload.
Para obter etapas detalhadas sobre como criar um conjunto de dados de destino, consulte o tutorial em criação de um conjunto de dados usando a API.
Uma conexão target representa a conexão com o destino onde os dados assimilados chegam. Para criar uma conexão de destino, você deve fornecer a ID de especificação de conexão fixa associada ao Data Lake. Essa ID de especificação de conexão é: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Agora você tem os identificadores exclusivos em um esquema de destino em um conjunto de dados de destino e a ID de especificação de conexão no Data Lake. Usando esses identificadores, você pode criar uma conexão de target usando o Flow Service API para especificar o conjunto de dados que conterá os dados de origem de entrada.
Formato da API
POST /targetConnections
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
Propriedade | Descrição |
---|---|
data.schema.id |
O $id do esquema XDM de destino. |
data.schema.version |
A versão do schema. Este valor deve ser definido application/vnd.adobe.xed-full+json;version=1 , que retorna a versão secundária mais recente do esquema. |
params.dataSetId |
A ID do conjunto de dados de destino. |
connectionSpec.id |
A ID de especificação de conexão fixa para o Data Lake. Essa ID é: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
Resposta
Uma resposta bem-sucedida retorna o identificador exclusivo da nova conexão de destino (id
). Essa ID é necessária em etapas posteriores.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Para que os dados de origem sejam assimilados em um conjunto de dados de destino, eles devem primeiro ser mapeados para o schema de destino ao qual o conjunto de dados de destino adere.
Para criar um conjunto de mapeamento, faça uma solicitação de POST para a função mappingSets
endpoint da variável Data Prep API ao fornecer seu esquema XDM de destino $id
e os detalhes dos conjuntos de mapeamento que deseja criar.
Você pode mapear tipos de dados complexos, como matrizes em arquivos JSON, usando um conector de origem de armazenamento na nuvem.
Formato da API
POST /conversion/mappingSets
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
Propriedade | Descrição |
---|---|
xdmSchema |
A ID do esquema XDM de destino. |
Resposta
Uma resposta bem-sucedida retorna detalhes do mapeamento recém-criado, incluindo seu identificador exclusivo (id
). Esse valor é necessário em uma etapa posterior para criar um fluxo de dados.
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Um fluxo de dados é responsável por coletar dados de fontes e trazê-los para a plataforma. Para criar um fluxo de dados, primeiro você deve obter as especificações do fluxo de dados responsáveis pela coleta de dados do armazenamento em nuvem.
Formato da API
GET /flowSpecs?property=name=="CloudStorageToAEP"
Solicitação
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
A carga de resposta JSON abaixo está oculta para brevidade. Selecione "payload" para ver a carga da resposta.
Resposta
Uma resposta bem-sucedida retorna os detalhes da especificação de fluxo de dados responsável por trazer os dados da fonte para a Plataforma. A resposta inclui as especificações exclusivas de fluxo id
necessário para criar um novo fluxo de dados.
{
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"name": "CloudStorageToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"attributes": {
"isSourceFlow": true,
"flacValidationSupported": true,
"frequency": "batch",
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"sourceConnectionSpecIds": [
"b3ba5556-48be-44b7-8b85-ff2b69b46dc4",
"ecadc60c-7455-4d87-84dc-2a0e293d997b",
"b7829c2f-2eb0-4f49-a6ee-55e33008b629",
"4c10e202-c428-4796-9208-5f1f5732b1cf",
"fb2e94c9-c031-467d-8103-6bd6e0a432f2",
"32e8f412-cdf7-464c-9885-78184cb113fd",
"b7bf2577-4520-42c9-bae9-cad01560f7bc",
"998b8ae3-cec0-43b7-8abe-40b1eb4ee069",
"be5ec48c-5b78-49d5-b8fa-7c89ec4569b8",
"54e221aa-d342-4707-bcff-7a4bceef0001",
"c85f9425-fb21-426c-ad0b-405e9bd8a46c",
"26f526f2-58f4-4712-961d-e41bf1ccc0e8"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
},
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"transformationSpec": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"runSpec": {
"name": "ProviderParams",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for creating flow run.",
"properties": {
"startTime": {
"type": "integer",
"description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
},
"windowStartTime": {
"type": "integer",
"description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"windowEndTime": {
"type": "integer",
"description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
}
},
"required": [
"startTime",
"windowStartTime",
"windowEndTime"
]
}
}
}
A última etapa para coletar dados de armazenamento em nuvem é criar um fluxo de dados. Por enquanto, você terá os seguintes valores obrigatórios preparados:
Um fluxo de dados é responsável por agendar e coletar dados de uma fonte. Você pode criar um fluxo de dados executando uma solicitação de POST e, ao mesmo tempo, fornecendo os valores mencionados anteriormente dentro da carga útil.
Para assimilação em lote, cada fluxo de dados subsequente seleciona arquivos a serem assimilados da sua origem com base em seus última modificação timestamp. Isso significa que os fluxos de dados em lote selecionam arquivos da origem que são novos ou foram modificados desde a última execução do fluxo de dados.
Para agendar uma assimilação, primeiro defina o valor de hora de início como época em segundos. Em seguida, você deve definir o valor de frequência para uma das cinco opções: once
, minute
, hour
, day
ou week
. O valor de intervalo designa o período entre duas ingestões consecutivas e a criação de uma ingestão única não requer a definição de um intervalo. Para todas as outras frequências, o valor do intervalo deve ser definido como igual ou superior a 15
.
É altamente recomendável agendar seu fluxo de dados para uma ingestão única ao usar a variável Conector FTP.
Formato da API
POST /flows
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Platform",
"description": "Cloud Storage flow to Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
Propriedade | Descrição |
---|---|
flowSpec.id |
O ID de especificação de fluxo recuperada na etapa anterior. |
sourceConnectionIds |
O ID de conexão de origem recuperada em uma etapa anterior. |
targetConnectionIds |
O target connection ID recuperada em uma etapa anterior. |
transformations.params.mappingId |
O ID de mapeamento recuperada em uma etapa anterior. |
scheduleParams.startTime |
A hora de início do fluxo de dados em época. |
scheduleParams.frequency |
A frequência com que o fluxo de dados coletará dados. Os valores aceitáveis incluem: once , minute , hour , day ou week . |
scheduleParams.interval |
O intervalo designa o período entre duas execuções consecutivas de fluxo. O valor do intervalo deve ser um número inteiro diferente de zero. O intervalo não é necessário quando a frequência é definida como once e deve ser maior que ou igual a 15 para outros valores de frequência. |
Resposta
Uma resposta bem-sucedida retorna a ID (id
) do fluxo de dados recém-criado.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Depois que o fluxo de dados tiver sido criado, você poderá monitorar os dados que estão sendo assimilados por meio dele para ver informações sobre execuções de fluxo, status de conclusão e erros. Para obter mais informações sobre como monitorar fluxos de dados, consulte o tutorial em monitoramento de fluxos de dados na API
Ao seguir este tutorial, você criou um conector de origem para coletar dados do armazenamento em nuvem de forma agendada. Os dados recebidos agora podem ser usados por serviços downstream da plataforma, como Real-Time Customer Profile e Data Science Workspace. Consulte os seguintes documentos para obter mais detalhes:
A seção a seguir lista os diferentes conectores de origem de armazenamento em nuvem e suas especificações de conexões.
Nome do conector | Especificação de conexão |
---|---|
Amazon S3 (S3) | ecadc60c-7455-4d87-84dc-2a0e293d997b |
Amazon Kinesis (Kinesis) | 86043421-563b-46ec-8e6c-e23184711bf6 |
Azure Blob (Blob) | 4c10e202-c428-4796-9208-5f1f5732b1cf |
Azure Data Lake Storage Gen2 (ADLS Gen2) | b3ba5556-48be-44b7-8b85-ff2b69b46dc4 |
Azure Event Hubs (Hubs de eventos) | bf9f5905-92b7-48bf-bf20-455bc6b60a4e |
Azure File Storage | be5ec48c-5b78-49d5-b8fa-7c89ec4569b8 |
Google Cloud Storage | 32e8f412-cdf7-464c-9885-78184cb113fd |
HDFS | 54e221aa-d342-4707-bcff-7a4bceef0001 |
Oracle Object Storage | c85f9425-fb21-426c-ad0b-405e9bd8a46c |
SFTP | bf367b0d-3d9b-4060-b67b-0d3d9bd06094 |