Este tutorial aborda as etapas para recuperar dados de uma fonte de automação de marketing e trazê-los para a Platform usando Flow Service API.
Para criar um fluxo de dados, você já deve ter uma ID de conexão base válida com uma fonte de automação de marketing. Se você não tiver essa ID, consulte a visão geral das fontes para obter uma lista de fontes de automação de marketing com as quais você pode criar uma conexão básica.
Este tutorial também requer uma compreensão funcional dos seguintes componentes do Adobe Experience Platform:
{TENANT_ID}
, o conceito de "contêineres" e os cabeçalhos necessários para fazer solicitações (com especial atenção ao cabeçalho Accept e seus possíveis valores).Para obter informações sobre como fazer chamadas para APIs da plataforma com êxito, consulte o guia em introdução às APIs do Platform.
Você pode criar uma conexão de origem fazendo uma solicitação de POST para o Flow Service API. Uma conexão de origem consiste em uma ID de conexão, um caminho para o arquivo de dados de origem e uma ID de especificação de conexão.
Para criar uma conexão de origem, você também deve definir um valor enum para o atributo de formato de dados.
Use os seguintes valores enum para conectores baseados em arquivo:
Formato dos dados | Valor de enumeração |
---|---|
Delimitado | delimited |
JSON | json |
Parquet | parquet |
Para todos os conectores baseados em tabela, defina o valor como tabular
.
Formato da API
POST /sourceConnections
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "HubSpot source connection",
"baseConnectionId": "c6d4ee17-6752-4e83-94ee-1767522e83fa",
"description": "HubSpot source connection",
"data": {
"format": "tabular",
},
"params": {
"path": "Hubspot.Contacts"
},
"connectionSpec": {
"id": "cc6a4487-9e91-433e-a3a3-9cf6626c1806",
"version": "1.0"
}
}'
Propriedade | Descrição |
---|---|
baseConnectionId |
A ID de conexão exclusiva do sistema de automação de marketing de terceiros que você está acessando. |
params.path |
O caminho do arquivo de origem que você está acessando. |
connectionSpec.id |
A ID de especificação de conexão do seu sistema de automação de marketing. |
Resposta
Uma resposta bem-sucedida retorna o identificador exclusivo (id
) da conexão de origem recém-criada. Armazene esse valor conforme for necessário em etapas posteriores para criar uma conexão de destino.
{
"id": "f44dbef2-a4f0-4978-8dbe-f2a4f0e978cf",
"etag": "\"5f00fba7-0000-0200-0000-5ed560520000\""
}
Para que os dados de origem sejam usados na Platform, um schema de target deve ser criado para estruturar os dados de origem de acordo com suas necessidades. O schema de destino é usado para criar um conjunto de dados da plataforma no qual os dados de origem estão contidos.
Um esquema XDM de destino pode ser criado executando-se uma solicitação de POST para a API do Registro de Schema.
Para obter etapas detalhadas sobre como criar um esquema XDM de destino, consulte o tutorial em criação de um schema usando a API.
Um conjunto de dados de destino pode ser criado executando uma solicitação de POST para a API do Serviço de catálogo, fornecendo a ID do schema do target no payload.
Para obter etapas detalhadas sobre como criar um conjunto de dados de destino, consulte o tutorial em criação de um conjunto de dados usando a API.
Uma conexão target representa a conexão com o destino onde os dados assimilados chegam. Para criar uma conexão de destino, você deve fornecer a ID de especificação de conexão fixa associada ao Data Lake. Essa ID de especificação de conexão é: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Agora você tem os identificadores exclusivos em um esquema de destino em um conjunto de dados de destino e a ID de especificação de conexão em um lago de dados. Usar o Flow Service Você pode criar uma conexão de destino especificando esses identificadores junto com o conjunto de dados que conterá os dados de origem de entrada.
Formato da API
POST /targetConnections
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "HubSpot target connection",
"description": "HubSpot target connection",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/da411446eec78026c28d9fafd9e406e304b771d55b07b91b",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5ed5639d798a22191b6987b2"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
Propriedade | Descrição |
---|---|
data.schema.id |
O $id do esquema XDM de destino. |
data.schema.version |
A versão do schema. Este valor deve ser definido application/vnd.adobe.xed-full+json;version=1 , que retorna a versão secundária mais recente do esquema. |
params.dataSetId |
A ID do conjunto de dados de destino. |
connectionSpec.id |
A ID de especificação de conexão usada para conexão com o Data Lake. Essa ID é: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
{
"id": "4b3d05d8-b7aa-40de-bd05-d8b7aa80de65",
"etag": "\"dd00a1a2-0000-0200-0000-5ed564850000\""
}
Para que os dados de origem sejam assimilados em um conjunto de dados de destino, eles devem primeiro ser mapeados para o schema de destino ao qual o conjunto de dados de destino adere.
Para criar um conjunto de mapeamento, faça uma solicitação de POST para a função mappingSets
endpoint da variável Data Prep API ao fornecer seu esquema XDM de destino $id
e os detalhes dos conjuntos de mapeamento que deseja criar.
Formato da API
POST /conversion/mappingSets
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/da411446eec78026c28d9fafd9e406e304b771d55b07b91b",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Vid",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "Properties_Firstname_Value",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "_repo.createDate",
"sourceAttribute": "Added_At",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
Propriedade | Descrição |
---|---|
xdmSchema |
A ID do esquema XDM de destino. |
Resposta
Uma resposta bem-sucedida retorna detalhes do mapeamento recém-criado, incluindo seu identificador exclusivo (id
). Armazene esse valor conforme for necessário na etapa posterior para criar um fluxo de dados.
{
"id": "500a9b747fcf4908a21917d49bd61780",
"version": 0,
"createdDate": 1591043336298,
"modifiedDate": 1591043336298,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Um fluxo de dados é responsável por coletar dados de fontes e trazê-los para a plataforma. Para criar um fluxo de dados, primeiro você deve obter as especificações do fluxo de dados responsáveis pela coleta de dados de automação de marketing.
Formato da API
GET /flowSpecs?property=name=="CRMToAEP"
Solicitação
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CRMToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Resposta
Uma resposta bem-sucedida retorna os detalhes da especificação de fluxo de dados responsável por trazer os dados da fonte para a Plataforma. A resposta inclui as especificações exclusivas de fluxo id
necessário para criar um novo fluxo de dados.
A carga de resposta JSON abaixo está oculta para brevidade. Selecione "payload" para ver a carga da resposta.
{
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"name": "CRMToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"attributes": {
"isSourceFlow": true,
"flacValidationSupported": true,
"frequency": "batch",
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"sourceConnectionSpecIds": [
"3416976c-a9ca-4bba-901a-1f08f66978ff",
"38ad80fe-8b06-4938-94f4-d4ee80266b07",
"d771e9c1-4f26-40dc-8617-ce58c4b53702",
"3c9b37f8-13a6-43d8-bad3-b863b941fedd",
"cc6a4487-9e91-433e-a3a3-9cf6626c1806",
"3000eb99-cd47-43f3-827c-43caf170f015",
"26d738e0-8963-47ea-aadf-c60de735468a",
"74a1c565-4e59-48d7-9d67-7c03b8a13137",
"cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
"4f63aa36-bd48-4e33-bb83-49fbcd11c708",
"cb66ab34-8619-49cb-96d1-39b37ede86ea",
"eb13cb25-47ab-407f-ba89-c0125281c563",
"1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
"37b6bf40-d318-4655-90be-5cd6f65d334b",
"a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
"221c7626-58f6-4eec-8ee2-042b0226f03b",
"a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
"6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
"aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
"8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
"ecde33f2-c56f-46cc-bdea-ad151c16cd69",
"102706fb-a5cd-42ee-afe0-bc42f017ff43",
"09182899-b429-40c9-a15a-bf3ddbc8ced7",
"0479cc14-7651-4354-b233-7480606c2ac3",
"d6b52d86-f0f8-475f-89d4-ce54c8527328",
"a8f4d393-1a6b-43f3-931f-91a16ed857f4",
"1fe283f6-9bec-11ea-bb37-0242ac130002",
"fcad62f3-09b0-41d3-be11-449d5a621b69",
"ea1c2a08-b722-11eb-8529-0242ac130003",
"35d6c4d8-c9a9-11eb-b8bc-0242ac130003",
"ff4274f2-c9a9-11eb-b8bc-0242ac130003",
"ba5126ec-c9ac-11eb-b8bc-0242ac130003",
"b2e08744-4f1a-40ce-af30-7abac3e23cf3",
"929e4450-0237-4ed2-9404-b7e1e0a00309",
"2acf109f-9b66-4d5e-bc18-ebb2adcff8d5",
"2fa8af9c-2d1a-43ea-a253-f00a00c74412"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
},
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"transformationSpec": [
{
"name": "Copy",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"deltaColumn": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"dateFormat": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"required": [
"deltaColumn"
]
}
},
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"runSpec": {
"name": "ProviderParams",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for creating flow run.",
"properties": {
"startTime": {
"type": "integer",
"description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
},
"windowStartTime": {
"type": "integer",
"description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"windowEndTime": {
"type": "integer",
"description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"deltaColumn": {
"type": "object",
"description": "The delta column is required to partition the data and separate newly ingested data from historic data.",
"properties": {
"name": {
"type": "string"
},
"dateFormat": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"required": [
"startTime",
"windowStartTime",
"windowEndTime",
"deltaColumn"
]
}
}
}
A última etapa para coletar dados de automação de marketing é criar um fluxo de dados. Por enquanto, você terá os seguintes valores obrigatórios preparados:
Um fluxo de dados é responsável por agendar e coletar dados de uma fonte. Você pode criar um fluxo de dados executando uma solicitação de POST e, ao mesmo tempo, fornecendo os valores mencionados anteriormente dentro da carga útil.
Para agendar uma assimilação, primeiro defina o valor de hora de início como época em segundos. Em seguida, você deve definir o valor de frequência para uma das cinco opções: once
, minute
, hour
, day
ou week
. O valor de intervalo designa o período entre duas ingestões consecutivas e a criação de uma ingestão única não requer a definição de um intervalo. Para todas as outras frequências, o valor do intervalo deve ser definido como igual ou superior a 15
.
Formato da API
POST /flows
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "HubSpot dataflow",
"description": "collecting Hubspot.Contacts",
"flowSpec": {
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"version": "1.0"
},
"sourceConnectionIds": [
"f44dbef2-a4f0-4978-8dbe-f2a4f0e978cf"
],
"targetConnectionIds": [
"4b3d05d8-b7aa-40de-bd05-d8b7aa80de65"
],
"transformations": [
{
"name": "Copy",
"params": {
"deltaColumn": {
"name": "updatedAt",
"dateFormat": "YYYY-MM-DD",
"timezone": "UTC"
}
}
},
{
"name": "Mapping",
"params": {
"mappingId": "500a9b747fcf4908a21917d49bd61780",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1591043454",
"frequency":"once",
"interval":"15"
}
}'
Propriedade | Descrição |
---|---|
flowSpec.id |
O ID de especificação de fluxo recuperada na etapa anterior. |
sourceConnectionIds |
O ID de conexão de origem recuperada em uma etapa anterior. |
targetConnectionIds |
O target connection ID recuperada em uma etapa anterior. |
transformations.params.mappingId |
O ID de mapeamento recuperada em uma etapa anterior. |
transformations.params.deltaColum |
A coluna designada usada para diferenciar dados novos e existentes. Os dados incrementais serão assimilados com base no carimbo de data e hora da coluna selecionada. O formato de data compatível para deltaColumn é yyyy-MM-dd HH:mm:ss . |
transformations.params.mappingId |
A ID de mapeamento associada ao banco de dados. |
scheduleParams.startTime |
A hora de início do fluxo de dados em época. |
scheduleParams.frequency |
A frequência com que o fluxo de dados coletará dados. Os valores aceitáveis incluem: once , minute , hour , day ou week . |
scheduleParams.interval |
O intervalo designa o período entre duas execuções consecutivas de fluxo. O valor do intervalo deve ser um número inteiro diferente de zero. O intervalo não é necessário quando a frequência é definida como once e deve ser maior que ou igual a 15 para outros valores de frequência. |
Resposta
Uma resposta bem-sucedida retorna a ID (id
) do fluxo de dados recém-criado.
{
"id": "e0bd8463-0913-4ca1-bd84-6309134ca1f6",
"etag": "\"04004fe9-0000-0200-0000-5ebc4c8b0000\""
}
Depois que o fluxo de dados tiver sido criado, você poderá monitorar os dados que estão sendo assimilados por meio dele para ver informações sobre execuções de fluxo, status de conclusão e erros. Para obter mais informações sobre como monitorar fluxos de dados, consulte o tutorial em monitoramento de fluxos de dados na API
Ao seguir este tutorial, você criou um conector de origem para coletar dados de um sistema de automação de marketing de forma agendada. Os dados recebidos agora podem ser usados por serviços downstream da plataforma, como Real-Time Customer Profile e Data Science Workspace. Consulte os seguintes documentos para obter mais detalhes: