Create a streaming dataflow for raw data using the Flow Service API

This tutorial covers the steps for retrieving raw data from a streaming source connector and bringing them to Experience Platform using the Flow Service API.

Getting started

This tutorial requires you to have a working understanding of the following components of Adobe Experience Platform:

  • Experience Data Model (XDM) System: The standardized framework by which Experience Platform organizes customer experience data.

    • Basics of schema composition: Learn about the basic building blocks of XDM schemas, including key principles and best practices in schema composition.
    • Schema Registry developer guide: Includes important information that you need to know in order to successfully perform calls to the Schema Registry API. This includes your {TENANT_ID}, the concept of “containers”, and the required headers for making requests (with special attention to the Accept header and its possible values).
  • Catalog Service: Catalog is the system of record for data location and lineage within Experience Platform.

  • Streaming ingestion: Streaming ingestion for Platform provides users a method to send data from client and server-side devices to Experience Platform in real time…

  • Sandboxes: Experience Platform provides virtual sandboxes which partition a single Platform instance into separate virtual environments to help develop and evolve digital experience applications.

Using Platform APIs

For information on how to successfully make calls to Platform APIs, see the guide on getting started with Platform APIs.

Create a source connection source

This tutorial also requires you to have a valid source connection ID for a streaming connector. If you do not have this information, see the following tutorials on create a streaming source connection before attempting this tutorial:

Create a target XDM schema target-schema

In order for the source data to be used in Platform, a target schema must be created to structure the source data according to your needs. The target schema is then used to create a Platform dataset in which the source data is contained. This target XDM schema also extends the XDM Individual Profile class.

To create a target XDM schema, make a POST request to the /schemas endpoint of the Schema Registry API.

API format

POST /tenant/schemas

Request

The following example request creates an XDM schema that extends the XDM Individual Profile class.

curl -X POST \
    'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "type": "object",
        "title": "Sample schema for a streaming connector",
        "description": "Sample schema for a streaming connector",
        "allOf": [
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
            }
        ],
        "meta:containerId": "tenant",
        "meta:resourceType": "schemas",
        "meta:xdmType": "object",
        "meta:class": "https://ns.adobe.com/xdm/context/profile"
    }'

Response

A successful response returns details of the newly created schema including its unique identifier ($id). This ID is required in later steps to create a target dataset, mapping, and dataflow.

{
    "$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
    "meta:altId": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
    "meta:resourceType": "schemas",
    "version": "1.0",
    "title": "Sample schema for a streaming connector",
    "type": "object",
    "description": "Sample schema for a streaming connector",
    "allOf": [
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
            "type": "object",
            "meta:xdmType": "object"
        }
    ],
    "refs": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "imsOrg": "{ORG_ID}",
    "meta:extensible": false,
    "meta:abstract": false,
    "meta:extends": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/common/auditable",
        "https://ns.adobe.com/xdm/data/record",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "meta:xdmType": "object",
    "meta:registryMetadata": {
        "repo:createdDate": 1604960074752,
        "repo:lastModifiedDate": 1604960074752,
        "xdm:createdClientId": "{CREATED_CLIENT_ID}",
        "xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
        "xdm:createdUserId": "{CREATED_USER_ID}",
        "xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
        "eTag": "8522a151effd974429518ed90c3eaf6efc9bf6ffb6644087a85c6d4455dcd045",
        "meta:globalLibVersion": "1.16.1"
    },
    "meta:class": "https://ns.adobe.com/xdm/context/profile",
    "meta:containerId": "tenant",
    "meta:sandboxId": "{SANDBOX_ID}",
    "meta:sandboxType": "production",
    "meta:tenantNamespace": "_{TENANT_ID}"
}

Create a target dataset

With a target XDM schema created and its unique $id you can now create a target dataset to contain your source data. To create a target dataset, make a POST request to the dataSets endpoint of the Catalog Service API, while providing the ID of the target schema within the payload.

API format

POST /catalog/dataSets

Request

curl -X POST \
    'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Test streaming dataset",
        "schemaRef": {
            "id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
            "contentType": "application/vnd.adobe.xed-full-notext+json; version=1"
        },
        "tags": {
            "identity": [
            "enabled:true"
            ],
            "profile": [
            "enabled:true"
            ]
        }
    }'
Property
Description
name
The name of the dataset to be created.
schemaRef.id
The URI $id for the XDM schema the dataset will be based on.
schemaRef.contentType
The version of the schema. This value must be set to application/vnd.adobe.xed-full-notext+json;version=1, which returns the latest minor version of the schema. See the section on schema versioning in the XDM API guide for more information.

Response

A successful response returns an array containing the ID of the newly created dataset in the format "@/datasets/{DATASET_ID}". The dataset ID is a read-only, system-generated string that is used to reference the dataset in API calls. The target dataset ID is required in later steps to create a target connection and a dataflow.

[
    "@/dataSets/5f7187bac6d00f194fb937c0"
]

Create a target connection target-connection

Target connections create and manage a destination connection to Platform or any location where the transferred data will land. Target connections contain information regarding data destination, data format, and the target connection ID required to create a dataflow. Target connection instances are specific to a tenant and organization.

To create a target connection, make a POST request to the /targetConnections endpoint of the Flow Service API. As part of the request, you must provide the data format, the dataSetId retrieved in the previous step, and the fixed connection specification ID tied to Data Lake. This ID is c604ff05-7f1a-43c0-8e18-33bf874cb11c.

API format

POST /targetConnections

Request

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Streaming target connection",
        "description": "Streaming target connection",
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        },
        "data": {
            "format": "parquet_xdm",
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "5f7187bac6d00f194fb937c0"
        }
    }'
Property
Description
data.format
The specified format of the data you are bringing to data lake.
params.dataSetId
The ID of the target dataset generated in the previous step. Note: You must provide a valid dataset ID when creating a target connection. An invalid dataset ID will result in an error.
connectionSpec.id
The connection spec ID used to connect to the data lake. This ID is: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Response

A successful response returns the new target connection’s unique identifier (id). This ID is required in later steps.

{
    "id": "d9300194-6a82-4163-b001-946a821163b8",
    "etag": "\"4006d3e4-0000-0200-0000-5f7189220000\""
}

Create a mapping mapping

In order for the source data to be ingested into a target dataset, it must first be mapped to the target schema that the target dataset adheres to.

To create a mapping set, make a POST request to the mappingSets endpoint of the Data Prep API while providing your target XDM schema $id and the details of the mapping sets you want to create.

API format

POST /mappingSets

Request

curl -X POST \
    'https://platform.adobe.io/data/foundation/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
        "xdmVersion": "1.0",
        "mappings": [
            {
                "destinationXdmPath": "person.name.firstName",
                "sourceAttribute": "firstName",
                "identity": false,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.lastName",
                "sourceAttribute": "lastName",
                "identity": false,
                "version": 0
            }
        ]
    }'
Property
Description
xdmSchema
The $id of the target XDM schema.

Response

A successful response returns details of the newly created mapping including its unique identifier (id). This ID is required in a later step to create a dataflow.

{
    "id": "380b032b445a46008e77585e046efe5e",
    "version": 0,
    "createdDate": 1604960750613,
    "modifiedDate": 1604960750613,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

Retrieve a list of dataflow specifications specs

A dataflow is responsible for collecting data from sources and bringing them into Platform. In order to create a dataflow, you must first obtain the dataflow specifications by performing a GET request to the Flow Service API.

API format

GET /flowSpecs

Request

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

Response

A successful response returns a list of dataflow specifications. The dataflow specification ID that you need to retrieve to create a dataflow using any of Amazon Kinesis, Azure Event Hubs, or Google PubSub, is d69717ba-71b4-4313-b654-49f9cf126d7a.

{
    "items": [
        {
            "id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
            "name": "Stream data with optional transformation",
            "providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
            "version": "1.0",
            "sourceConnectionSpecIds": [
                "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
                "bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
                "86043421-563b-46ec-8e6c-e23184711bf6",
                "70116022-a743-464a-bbfe-e226a7f8210c"
            ],
            "targetConnectionSpecIds": [
                "bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
                "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
                "db4fe783-ef79-4a12-bda9-32b2b1bc3b2c"
            ],
            "transformationSpecs": [
                {
                    "name": "Mapping",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "description": "defines various params required for different mapping from source to target",
                        "properties": {
                            "mappingId": {
                                "type": "string"
                            }
                        }
                    }
                }
            ],
            "attributes": {
                "uiAttributes": {
                    "apiFeatures": {
                        "flowRunsSupported": false
                    }
                }
            },
            "permissionsInfo": {
                "view": [
                    {
                        "@type": "lowLevel",
                        "name": "StreamingSource",
                        "permissions": [
                            "read"
                        ]
                    }
                ],
                "manage": [
                    {
                        "@type": "lowLevel",
                        "name": "StreamingSource",
                        "permissions": [
                            "write"
                        ]
                    }
                ]
            }
        },
    ]
}

Create a dataflow

The last step towards collecting streaming data is to create a dataflow. By now, you have the following required values prepared:

A dataflow is responsible for scheduling and collecting data from a source. You can create a dataflow by performing a POST request while providing the previously mentioned values within the payload.

API format

POST /flows

Request

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Streaming dataflow",
        "description": "Streaming dataflow",
        "flowSpec": {
            "id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "e96d6135-4b50-446e-922c-6dd66672b6b2"
        ],
        "targetConnectionIds": [
            "723222e2-6ab9-4b0b-b222-e26ab9bb0bc2"
        ],
        "transformations": [
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "380b032b445a46008e77585e046efe5e",
                    "mappingVersion": 0
                }
            }
        ]
    }'
Property
Description
flowSpec.id
The flow spec ID retrieved in the previous step.
sourceConnectionIds
The source connection ID retrieved in an earlier step.
targetConnectionIds
The target connection ID retrieved in an earlier step.
transformations.params.mappingId
The mapping ID retrieved in an earlier step.

Response

A successful response returns the ID (id) of the newly created dataflow.

{
    "id": "1f086c23-2ea8-4d06-886c-232ea8bd061d",
    "etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}

Post data for ingestion

View the sample payload below for examples of raw or XDM-compliant json that you can send for ingestion.

NOTE
You must add a delay of at least ~5 minutes between creation of dataflow and ingesting any streaming data. This allows the dataflow to be fully enabled, before any data is ingested.

The following examples apply to all of:

Raw data
code language-json
'{
      "name": "Johnson Smith",
      "location": {
          "city": "Seattle",
          "country": "United State of America",
          "address": "3692 Main Street"
      },
      "gender": "Male",
      "birthday": {
          "year": 1984,
          "month": 6,
          "day": 9
      }
  }'
XDM data
code language-json
{
  "header": {
    "schemaRef": {
      "id": "https://ns.adobe.com/aepstreamingservicesint/schemas/73cae7e6db06ebca535cd973e3ece85e66253962f504e7d8",
      "contentType": "application/vnd.adobe.xed-full-notext+json; version=1.0"
    }
  },
  "body": {
    "xdmMeta": {
      "schemaRef": {
        "id": "https://ns.adobe.com/aepstreamingservicesint/schemas/73cae7e6db06ebca535cd973e3ece85e66253962f504e7d8",
        "contentType": "application/vnd.adobe.xed-full-notext+json; version=1.0"
      }
    },
    "xdmEntity": {
      "_id": "acme",
      "workEmail": {
        "address": "mike@acme.com",
        "primary": true,
        "type": "work",
        "status": "active"
      },
      "person": {
        "gender": "male",
        "name": {
          "firstName": "Mike",
          "lastName": "Wazowski"
        },
        "birthDate": "1985-01-01"
      },
      "identityMap": {
        "ecid": [
          {
            "id": "01262118050522051420082102000000000000"
          }
        ]
      }
    }
  }
}

Next steps

By following this tutorial, you have created a dataflow to collect streaming data from your streaming connector. Incoming data can now be used by downstream Platform services such as Real-Time Customer Profile and Data Science Workspace. See the following documents for more details:

recommendation-more-help
337b99bb-92fb-42ae-b6b7-c7042161d089