Este tutorial aborda as etapas para recuperar dados brutos de um conector de fonte de transmissão e trazê-los para o Experience Platform usando a variável Flow Service API.
Este tutorial requer uma compreensão funcional dos seguintes componentes do Adobe Experience Platform:
{TENANT_ID}
, o conceito de "contêineres" e os cabeçalhos necessários para fazer solicitações (com especial atenção ao cabeçalho Accept e seus possíveis valores).Para obter informações sobre como fazer chamadas para APIs da plataforma com êxito, consulte o guia em introdução às APIs do Platform.
Este tutorial também exige que você tenha uma ID de conexão de origem válida para um conector de transmissão. Se não tiver essas informações, consulte os seguintes tutoriais sobre como criar uma conexão de origem de transmissão antes de tentar este tutorial:
Para que os dados de origem sejam usados na Platform, um schema de target deve ser criado para estruturar os dados de origem de acordo com suas necessidades. O schema de destino é usado para criar um conjunto de dados da plataforma no qual os dados de origem estão contidos. Esse esquema XDM de destino também estende o XDM Individual Profile classe .
Para criar um esquema XDM de destino, faça uma solicitação de POST para a variável /schemas
endpoint da variável Schema Registry API.
Formato da API
POST /tenant/schemas
Solicitação
A seguinte solicitação de exemplo cria um esquema XDM que estende o XDM Individual Profile classe .
curl -X POST \
'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"type": "object",
"title": "Sample schema for a streaming connector",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
}
],
"meta:containerId": "tenant",
"meta:resourceType": "schemas",
"meta:xdmType": "object",
"meta:class": "https://ns.adobe.com/xdm/context/profile"
}'
Resposta
Uma resposta bem-sucedida retorna detalhes do schema recém-criado, incluindo seu identificador exclusivo ($id
). Essa ID é necessária em etapas posteriores para criar um conjunto de dados de destino, mapeamento e fluxo de dados.
{
"$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:altId": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:resourceType": "schemas",
"version": "1.0",
"title": "Sample schema for a streaming connector",
"type": "object",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
"type": "object",
"meta:xdmType": "object"
}
],
"refs": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/context/profile"
],
"imsOrg": "{ORG_ID}",
"meta:extensible": false,
"meta:abstract": false,
"meta:extends": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/common/auditable",
"https://ns.adobe.com/xdm/data/record",
"https://ns.adobe.com/xdm/context/profile"
],
"meta:xdmType": "object",
"meta:registryMetadata": {
"repo:createdDate": 1604960074752,
"repo:lastModifiedDate": 1604960074752,
"xdm:createdClientId": "{CREATED_CLIENT_ID}",
"xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
"xdm:createdUserId": "{CREATED_USER_ID}",
"xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
"eTag": "8522a151effd974429518ed90c3eaf6efc9bf6ffb6644087a85c6d4455dcd045",
"meta:globalLibVersion": "1.16.1"
},
"meta:class": "https://ns.adobe.com/xdm/context/profile",
"meta:containerId": "tenant",
"meta:sandboxId": "{SANDBOX_ID}",
"meta:sandboxType": "production",
"meta:tenantNamespace": "_{TENANT_ID}"
}
Com um esquema XDM de destino criado e seu exclusivo $id
agora é possível criar um conjunto de dados de destino para conter seus dados de origem. Para criar um conjunto de dados de destino, faça uma solicitação de POST para a dataSets
endpoint da variável API do Serviço de catálogo, enquanto fornece a ID do schema do target no payload.
Formato da API
POST /catalog/dataSets
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Test streaming dataset",
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"contentType": "application/vnd.adobe.xed-full-notext+json; version=1"
},
"tags": {
"identity": [
"enabled:true"
],
"profile": [
"enabled:true"
]
}
}'
Propriedade | Descrição |
---|---|
name |
O nome do conjunto de dados a ser criado. |
schemaRef.id |
O URI $id para o esquema XDM, o conjunto de dados será baseado em. |
schemaRef.contentType |
A versão do schema. Esse valor deve ser definido como application/vnd.adobe.xed-full-notext+json;version=1 , que retorna a versão secundária mais recente do esquema. Consulte a seção sobre versão do schema no guia da API XDM para obter mais informações. |
Resposta
Uma resposta bem-sucedida retorna uma matriz contendo a ID do conjunto de dados recém-criado no formato "@/datasets/{DATASET_ID}"
. A ID do conjunto de dados é uma string gerada pelo sistema e somente leitura, usada para fazer referência ao conjunto de dados nas chamadas de API. A ID do conjunto de dados de destino é necessária em etapas posteriores para criar uma conexão de destino e um fluxo de dados.
[
"@/dataSets/5f7187bac6d00f194fb937c0"
]
As conexões do Target criam e gerenciam uma conexão de destino com a Platform ou qualquer local onde os dados transferidos chegarão. As conexões do Target contêm informações sobre o destino de dados, o formato de dados e a ID de conexão do Target necessárias para criar um fluxo de dados. As instâncias de conexão do Target são específicas de um locatário e da Organização IMS.
Para criar uma conexão do target, faça uma solicitação de POST para o /targetConnections
endpoint da variável Flow Service API. Como parte da solicitação, você deve fornecer o formato de dados, a variável dataSetId
recuperada na etapa anterior e a ID de especificação de conexão fixa vinculada ao Data Lake. Esta ID é c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Formato da API
POST /targetConnections
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming target connection",
"description": "Streaming target connection",
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
},
"data": {
"format": "parquet_xdm",
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f7187bac6d00f194fb937c0"
}
}'
Propriedade | Descrição |
---|---|
connectionSpec.id |
A ID de especificação de conexão usada para se conectar ao Data Lake. Essa ID é: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
data.format |
O formato especificado dos dados que você está trazendo para o Data Lake. |
params.dataSetId |
A ID do conjunto de dados de destino recuperado na etapa anterior. |
Resposta
Uma resposta bem-sucedida retorna o identificador exclusivo da nova conexão de destino (id
). Essa ID é necessária em etapas posteriores.
{
"id": "d9300194-6a82-4163-b001-946a821163b8",
"etag": "\"4006d3e4-0000-0200-0000-5f7189220000\""
}
Para que os dados de origem sejam assimilados em um conjunto de dados de destino, eles devem primeiro ser mapeados para o schema de destino ao qual o conjunto de dados de destino adere.
Para criar um conjunto de mapeamento, faça uma solicitação de POST para a função mappingSets
endpoint da variável Data Prep API ao fornecer seu esquema XDM de destino $id
e os detalhes dos conjuntos de mapeamento que deseja criar.
Formato da API
POST /mappingSets
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"xdmVersion": "1.0",
"mappings": [
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "firstName",
"identity": false,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "lastName",
"identity": false,
"version": 0
}
]
}'
Propriedade | Descrição |
---|---|
xdmSchema |
O $id do esquema XDM de destino. |
Resposta
Uma resposta bem-sucedida retorna detalhes do mapeamento recém-criado, incluindo seu identificador exclusivo (id
). Essa ID é necessária em uma etapa posterior para criar um fluxo de dados.
{
"id": "380b032b445a46008e77585e046efe5e",
"version": 0,
"createdDate": 1604960750613,
"modifiedDate": 1604960750613,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Um fluxo de dados é responsável por coletar dados de fontes e trazê-los para a plataforma. Para criar um fluxo de dados, primeiro obtenha as especificações do fluxo de dados executando uma solicitação do GET para a Flow Service API.
Formato da API
GET /flowSpecs
Solicitação
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Resposta
Uma resposta bem-sucedida retorna uma lista de especificações de fluxo de dados. A ID de especificação do fluxo de dados que você precisa recuperar para criar um fluxo de dados usando qualquer um dos Amazon Kinesis, Azure Event Hubsou Google PubSub, é d69717ba-71b4-4313-b654-49f9cf126d7a
.
{
"items": [
{
"id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
"name": "Stream data with optional transformation",
"providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
"version": "1.0",
"sourceConnectionSpecIds": [
"bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
"bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
"86043421-563b-46ec-8e6c-e23184711bf6",
"70116022-a743-464a-bbfe-e226a7f8210c"
],
"targetConnectionSpecIds": [
"bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
"c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"db4fe783-ef79-4a12-bda9-32b2b1bc3b2c"
],
"transformationSpecs": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
}
}
}
}
],
"attributes": {
"uiAttributes": {
"apiFeatures": {
"flowRunsSupported": false
}
}
},
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"write"
]
}
]
}
},
]
}
A última etapa para coletar dados de transmissão é criar um fluxo de dados. Por enquanto, você terá os seguintes valores obrigatórios preparados:
Um fluxo de dados é responsável por agendar e coletar dados de uma fonte. Você pode criar um fluxo de dados executando uma solicitação de POST e, ao mesmo tempo, fornecendo os valores mencionados anteriormente dentro da carga útil.
Formato da API
POST /flows
Solicitação
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming dataflow",
"description": "Streaming dataflow",
"flowSpec": {
"id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
"version": "1.0"
},
"sourceConnectionIds": [
"e96d6135-4b50-446e-922c-6dd66672b6b2"
],
"targetConnectionIds": [
"723222e2-6ab9-4b0b-b222-e26ab9bb0bc2"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "380b032b445a46008e77585e046efe5e",
"mappingVersion": 0
}
}
]
}'
Propriedade | Descrição |
---|---|
flowSpec.id |
O ID de especificação de fluxo recuperada na etapa anterior. |
sourceConnectionIds |
O ID de conexão de origem recuperada em uma etapa anterior. |
targetConnectionIds |
O target connection ID recuperada em uma etapa anterior. |
transformations.params.mappingId |
O ID de mapeamento recuperada em uma etapa anterior. |
Resposta
Uma resposta bem-sucedida retorna a ID (id
) do fluxo de dados recém-criado.
{
"id": "1f086c23-2ea8-4d06-886c-232ea8bd061d",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Ao seguir este tutorial, você criou um fluxo de dados para coletar dados de transmissão do seu conector de transmissão. Os dados recebidos agora podem ser usados por serviços downstream da plataforma, como Real-Time Customer Profile e Data Science Workspace. Consulte os seguintes documentos para obter mais detalhes: