Este tutorial aborda as etapas para recuperar dados de uma fonte de armazenamento na nuvem e trazê-los para a Platform usando Flow Service API.
Para criar um fluxo de dados, você já deve ter uma ID de conexão base válida com uma fonte de armazenamento na nuvem. Se você não tiver essa ID, consulte a visão geral das origens para obter uma lista de fontes de armazenamento na nuvem com as quais você pode criar uma conexão básica.
Este tutorial requer que você tenha uma compreensão funcional dos seguintes componentes do Adobe Experience Platform:
{TENANT_ID}
, o conceito de "contêineres" e os cabeçalhos necessários para fazer solicitações (com atenção especial ao cabeçalho Aceitar e seus valores possíveis).Para obter informações sobre como fazer chamadas para APIs da Platform com êxito, consulte o manual em introdução às APIs da Platform.
Você pode criar uma conexão de origem fazendo uma solicitação POST para o sourceConnections
endpoint de Flow Service ao fornecer a ID de conexão básica, o caminho para o arquivo de origem que você deseja assimilar e a ID de especificação de conexão correspondente à origem.
Ao criar uma conexão de origem, você também deve definir um valor de enumeração para o atributo de formato de dados.
Use os seguintes valores de enumeração para fontes baseadas em arquivo:
Formato dos dados | Valor de enumeração |
---|---|
Delimitado | delimited |
JSON | json |
Parquet | parquet |
Para todas as fontes baseadas em tabela, defina o valor como tabular
.
Formato da API
POST /sourceConnections
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}",
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Propriedade | Descrição |
---|---|
baseConnectionId |
A ID de conexão básica da sua fonte de armazenamento na nuvem. |
data.format |
O formato dos dados que você deseja trazer para a Platform. Os valores compatíveis são: delimited , JSON , e parquet . |
data.properties |
(Opcional) Um conjunto de propriedades que você pode aplicar aos seus dados ao criar uma conexão de origem. |
data.properties.columnDelimiter |
(Opcional) Um delimitador de coluna de caractere único que você pode especificar ao coletar arquivos simples. Qualquer valor de caractere único é um delimitador de coluna permitido. Se não for fornecido, uma vírgula (, ) é usado como valor padrão. Nota: A variável columnDelimiter A propriedade só pode ser usada ao assimilar arquivos delimitados. |
data.properties.encoding |
(Opcional) Uma propriedade que define o tipo de codificação a ser usado ao assimilar seus dados na Platform. Os tipos de codificação compatíveis são: UTF-8 e ISO-8859-1 . Nota: A variável encoding O parâmetro só está disponível ao assimilar arquivos CSV delimitados. Outros tipos de arquivo serão assimilados com a codificação padrão, UTF-8 . |
data.properties.compressionType |
(Opcional) Uma propriedade que define o tipo de arquivo compactado para assimilação. Os tipos de arquivos compactados compatíveis são: bzip2 , gzip , deflate , zipDeflate , tarGzip , e tar . Nota: A variável compressionType A propriedade só pode ser usada ao assimilar arquivos delimitados ou JSON. |
params.path |
O caminho do arquivo de origem que você está acessando. Esse parâmetro aponta para um arquivo individual ou uma pasta inteira. Nota: Você pode usar um asterisco no lugar do nome do arquivo para especificar a assimilação de uma pasta inteira. Por exemplo: /acme/summerCampaign/*.csv assimilará todo o /acme/summerCampaign/ pasta. |
params.type |
O tipo do arquivo de dados de origem que você está assimilando. Tipo de uso file para assimilar um arquivo individual e usar o tipo folder para assimilar uma pasta inteira. |
connectionSpec.id |
A ID de especificação de conexão associada à sua fonte de armazenamento em nuvem específica. Consulte a apêndice para obter uma lista de IDs de especificação de conexão. |
Resposta
Uma resposta bem-sucedida retorna o identificador exclusivo (id
) da conexão de origem recém-criada. Essa ID é necessária em uma etapa posterior para criar um fluxo de dados.
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
Você pode usar expressões regulares para assimilar um conjunto específico de arquivos da origem para a Platform ao criar uma conexão de origem.
Formato da API
POST /sourceConnections
Solicitação
No exemplo abaixo, a expressão regular é usada no caminho do arquivo para especificar a assimilação de todos os arquivos CSV que têm premium
em seu nome.
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/*premium*.csv",
"type": "folder"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Ao criar uma conexão de origem, você pode usar o recursive
para assimilar dados de pastas profundamente aninhadas.
Formato da API
POST /sourceConnections
Solicitação
No exemplo abaixo, a variável recursive: true
informações do parâmetro Flow Service para ler todas as subpastas recursivamente durante o processo de assimilação.
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source with recursive ingestion",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/customers/premium/buyers/recursive",
"type": "folder",
"recursive": true
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Para que os dados de origem sejam usados na Platform, um esquema de destino deve ser criado para estruturar os dados de origem de acordo com suas necessidades. O esquema de destino é usado para criar um conjunto de dados da Platform no qual os dados de origem estão contidos.
Um schema XDM de destino pode ser criado executando uma solicitação POST para o API do registro de esquema.
Para obter etapas detalhadas sobre como criar um esquema XDM de destino, consulte o tutorial sobre criação de um schema usando a API.
Um conjunto de dados de destino pode ser criado executando uma solicitação POST para o API do serviço de catálogo, fornecendo a ID do schema de destino na carga útil.
Para obter etapas detalhadas sobre como criar um conjunto de dados de destino, consulte o tutorial sobre criação de um conjunto de dados usando a API.
Uma conexão de destino representa a conexão com o destino onde os dados assimilados chegam. Para criar uma conexão de destino, você deve fornecer a ID de especificação da conexão fixa associada ao Data Lake. Esta ID de especificação de conexão é: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Agora você tem os identificadores exclusivos, um esquema de destino, um conjunto de dados de destino e a ID de especificação da conexão para o Data Lake. Usando esses identificadores, você pode criar uma conexão de destino usando o Flow Service API para especificar o conjunto de dados que conterá os dados de origem de entrada.
Formato da API
POST /targetConnections
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
Propriedade | Descrição |
---|---|
data.schema.id |
A variável $id do esquema XDM do público-alvo. |
data.schema.version |
A versão do esquema. Este valor deve ser definido application/vnd.adobe.xed-full+json;version=1 , que retorna a versão secundária mais recente do esquema. |
params.dataSetId |
A ID do conjunto de dados de destino. |
connectionSpec.id |
A ID de especificação da conexão fixa para o Data Lake. Essa ID é: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
Resposta
Uma resposta bem-sucedida retorna o identificador exclusivo da nova conexão de destino (id
). Essa ID é necessária nas etapas posteriores.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Para que os dados de origem sejam assimilados em um conjunto de dados de destino, eles devem primeiro ser mapeados para o esquema de destino ao qual o conjunto de dados de destino adere.
Para criar um conjunto de mapeamento, faça uma solicitação POST ao mappingSets
endpoint do Data Prep API ao fornecer o esquema XDM do público-alvo $id
e os detalhes dos conjuntos de mapeamento que deseja criar.
Você pode mapear tipos de dados complexos, como matrizes em arquivos JSON, usando um conector de origem de armazenamento em nuvem.
Formato da API
POST /conversion/mappingSets
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
Propriedade | Descrição |
---|---|
xdmSchema |
A ID do esquema XDM do público-alvo. |
Resposta
Uma resposta bem-sucedida retorna detalhes do mapeamento recém-criado, incluindo seu identificador exclusivo (id
). Esse valor é necessário em uma etapa posterior para criar um fluxo de dados.
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Um fluxo de dados é responsável por coletar dados de fontes e trazê-los para a Platform. Para criar um fluxo de dados, primeiro obtenha as especificações do fluxo de dados responsáveis pela coleta dos dados de armazenamento na nuvem.
Formato da API
GET /flowSpecs?property=name=="CloudStorageToAEP"
Solicitação
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
A carga de resposta JSON abaixo está oculta por brevidade. Selecione "carga" para ver a carga de resposta.
Resposta
Uma resposta bem-sucedida retorna os detalhes da especificação do fluxo de dados responsáveis por trazer os dados de sua origem para a Platform. A resposta inclui a especificação de fluxo exclusiva id
necessário para criar um novo fluxo de dados.
{
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"name": "CloudStorageToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"attributes": {
"isSourceFlow": true,
"flacValidationSupported": true,
"frequency": "batch",
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"sourceConnectionSpecIds": [
"b3ba5556-48be-44b7-8b85-ff2b69b46dc4",
"ecadc60c-7455-4d87-84dc-2a0e293d997b",
"b7829c2f-2eb0-4f49-a6ee-55e33008b629",
"4c10e202-c428-4796-9208-5f1f5732b1cf",
"fb2e94c9-c031-467d-8103-6bd6e0a432f2",
"32e8f412-cdf7-464c-9885-78184cb113fd",
"b7bf2577-4520-42c9-bae9-cad01560f7bc",
"998b8ae3-cec0-43b7-8abe-40b1eb4ee069",
"be5ec48c-5b78-49d5-b8fa-7c89ec4569b8",
"54e221aa-d342-4707-bcff-7a4bceef0001",
"c85f9425-fb21-426c-ad0b-405e9bd8a46c",
"26f526f2-58f4-4712-961d-e41bf1ccc0e8"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
},
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"transformationSpec": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"runSpec": {
"name": "ProviderParams",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for creating flow run.",
"properties": {
"startTime": {
"type": "integer",
"description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
},
"windowStartTime": {
"type": "integer",
"description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"windowEndTime": {
"type": "integer",
"description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
}
},
"required": [
"startTime",
"windowStartTime",
"windowEndTime"
]
}
}
}
A última etapa para coletar dados de armazenamento na nuvem é criar um fluxo de dados. Até agora, você tem os seguintes valores necessários preparados:
Um fluxo de dados é responsável por agendar e coletar dados de uma origem. Você pode criar um fluxo de dados executando uma solicitação POST enquanto fornece os valores mencionados anteriormente na carga.
Para assimilação em lote, cada fluxo de dados subsequente seleciona arquivos que serão assimilados de sua origem com base em seus última modificação carimbo de data e hora. Isso significa que os fluxos de dados em lote selecionam arquivos da origem que são novos ou que foram modificados desde a última execução do fluxo de dados.
Para agendar uma assimilação, primeiro defina o valor da hora inicial como a época em segundos. Em seguida, defina o valor de frequência como uma das cinco opções: once
, minute
, hour
, day
ou week
. O valor do intervalo designa o período entre duas assimilações consecutivas e a criação de uma assimilação única não requer que um intervalo seja definido. Para todas as outras frequências, o valor do intervalo deve ser definido como igual ou maior que 15
.
É altamente recomendável agendar seu fluxo de dados para assimilação única ao usar o Conector FTP.
Formato da API
POST /flows
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Platform",
"description": "Cloud Storage flow to Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
Propriedade | Descrição |
---|---|
flowSpec.id |
A variável ID de especificação de fluxo recuperado na etapa anterior. |
sourceConnectionIds |
A variável ID da conexão de origem recuperado em uma etapa anterior. |
targetConnectionIds |
A variável ID da conexão de destino recuperado em uma etapa anterior. |
transformations.params.mappingId |
A variável ID do mapeamento recuperado em uma etapa anterior. |
scheduleParams.startTime |
A hora de início do fluxo de dados em época. |
scheduleParams.frequency |
A frequência com que o fluxo de dados coletará dados. Os valores aceitáveis incluem: once , minute , hour , day ou week . |
scheduleParams.interval |
O intervalo designa o período entre duas execuções de fluxo consecutivas. O valor do intervalo deve ser um inteiro diferente de zero. O intervalo não é necessário quando a frequência está definida como once e deve ser maior ou igual a 15 para outros valores de frequência. |
Resposta
Uma resposta bem-sucedida retorna a ID (id
) do fluxo de dados recém-criado.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Depois que o fluxo de dados for criado, você poderá monitorar os dados que estão sendo assimilados por meio dele para ver informações sobre execuções de fluxo, status de conclusão e erros. Para obter mais informações sobre como monitorar fluxos de dados, consulte o tutorial sobre monitoramento de fluxos de dados na API
Seguindo este tutorial, você criou um conector de origem para coletar dados do armazenamento na nuvem de forma programada. Os dados de entrada agora podem ser usados por serviços downstream da plataforma, como Real-Time Customer Profile e Data Science Workspace. Consulte os seguintes documentos para obter mais detalhes:
A seção a seguir lista os diferentes conectores de origem de armazenamento na nuvem e suas especificações de conexão.
Nome do conector | Especificação da conexão |
---|---|
Amazon S3 (S3) | ecadc60c-7455-4d87-84dc-2a0e293d997b |
Amazon Kinesis (Kinesis) | 86043421-563b-46ec-8e6c-e23184711bf6 |
Azure Blob (Blob) | 4c10e202-c428-4796-9208-5f1f5732b1cf |
Azure Data Lake Storage Gen2 (ADLS Gen2) | b3ba5556-48be-44b7-8b85-ff2b69b46dc4 |
Azure Event Hubs (Hubs de Eventos) | bf9f5905-92b7-48bf-bf20-455bc6b60a4e |
Azure File Storage | be5ec48c-5b78-49d5-b8fa-7c89ec4569b8 |
Google Cloud Storage | 32e8f412-cdf7-464c-9885-78184cb113fd |
HDFS | 54e221aa-d342-4707-bcff-7a4bceef0001 |
Oracle Object Storage | c85f9425-fb21-426c-ad0b-405e9bd8a46c |
SFTP | bf367b0d-3d9b-4060-b67b-0d3d9bd06094 |