Create an HTTP API streaming connection using the Flow Service API
Flow Service is used to collect and centralize customer data from different sources within Adobe Experience Platform. The service provides a user interface and RESTful API from which all supported sources are connectable.
This tutorial uses the Flow Service API to walk you through the steps to create a streaming connection using the Flow Service API.
Getting started
This guide requires a working understanding of the following components of Adobe Experience Platform:
- Experience Data Model (XDM): The standardized framework by which Platform organizes experience data.
- Real-Time Customer Profile: Provides a unified, consumer profile in real time based on aggregated data from multiple sources.
Additionally, creating a streaming connection requires you to have a target XDM schema and a dataset. To learn how to create these, please read the tutorial on streaming record data or the tutorial on streaming time-series data.
Using Platform APIs
For information on how to successfully make calls to Platform APIs, see the guide on getting started with Platform APIs.
Create a base connection
A base connection specifies the source and contains the information required to make the flow compatible with streaming ingestion APIs. When creating a base connection, you have the option of creating a non-authenticated and an authenticated connection.
Non-authenticated connection
Non-authenticated connections are the standard streaming connection you can create when you want to stream data into Platform.
To create a non-authenticated base connection, make a POST request to the /connections
endpoint while providing a name for your connection, the data type, and the HTTP API connection specification ID. This ID is bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb
.
API format
POST /flowservice/connections
Request
The following request creates a base connection for HTTP API.
code language-shell |
---|
|
code language-shell |
---|
|
name
description
connectionSpec.id
bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb
.auth.params.dataType
xdm
and raw
.auth.params.name
Response
A successful response returns HTTP status 201 with details of the newly created connection, including its unique identifier (id
).
{
"id": "a59d368a-1152-4673-a46e-bd52e8cdb9a9",
"etag": "\"f50185ed-0000-0200-0000-637e8fad0000\""
}
id
id
of your newly created base connection.etag
Authenticated connection
Authenticated connections should be used when you need to differentiate between records coming from trusted and un-trusted sources. Users who want to send information with Personally Identifiable Information (PII) should create an authenticated connection when streaming information to Platform.
To create an authenticated base connection, you must include the authenticationRequired
parameter in your request and specify its value as true
. During this step, you can also provide a source ID for your authenticated base connection. This parameter is optional and will use the same value as the name
attribute, if it is not provided.
API format
POST /flowservice/connections
Request
The following request creates an authenticated base connection for HTTP API.
code language-shell |
---|
|
code language-shell |
---|
|
auth.params.sourceId
name
attribute, if it is not provided.auth.params.authenticationRequired
authenticationRequired
is set to true
then authentication must be provided for the streaming connection. If authenticationRequired
is set to false
then authentication is not required.Response
A successful response returns HTTP status 201 with details of the newly created connection, including its unique identifier (id
).
{
"id": "a59d368a-1152-4673-a46e-bd52e8cdb9a9",
"etag": "\"f50185ed-0000-0200-0000-637e8fad0000\""
}
Get streaming endpoint URL
With the base connection created, you can now retrieve your streaming endpoint URL.
API format
GET /flowservice/connections/{BASE_CONNECTION_ID}
{BASE_CONNECTION_ID}
id
value of the connection you previously created.Request
curl -X GET https://platform.adobe.io/data/foundation/flowservice/connections/{BASE_CONNECTION_ID} \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Response
A successful response returns HTTP status 200 with detailed information about the requested connection. The streaming endpoint URL is automatically created with the connection, and can be retrieved using the inletUrl
value.
{
"items": [
{
"id": "a59d368a-1152-4673-a46e-bd52e8cdb9a9",
"createdAt": 1669238699119,
"updatedAt": 1669238699119,
"createdBy": "acme@AdobeID",
"updatedBy": "acme@AdobeID",
"createdClient": "{CREATED_CLIENT}",
"updatedClient": "{UPDATED_CLIENT}",
"sandboxId": "{SANDBOX_ID}",
"sandboxName": "{SANDBOX_NAME}",
"imsOrgId": "{ORG_ID}",
"name": "ACME Streaming Connection XDM Data",
"description": "ACME streaming connection for customer data",
"connectionSpec": {
"id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
"version": "1.0"
},
"state": "enabled",
"auth": {
"specName": "Streaming Connection",
"params": {
"sourceId": "ACME Streaming Connection XDM Data",
"inletUrl": "https://dcs.adobedc.net/collection/667b41cf2dbf3509927da1ebf7e93c20afa727cc8d8373e51da18b62e1b985ec",
"authenticationRequired": false,
"inletId": "667b41cf2dbf3509927da1ebf7e93c20afa727cc8d8373e51da18b62e1b985ec",
"dataType": "xdm",
"name": "ACME Streaming Connection XDM Data"
}
},
"version": "\"f50185ed-0000-0200-0000-637e8fad0000\"",
"etag": "\"f50185ed-0000-0200-0000-637e8fad0000\""
}
]
}
Create a source connection source
To create a source connection, make a POST request to the /sourceConnections
endpoint while providing your base connection ID.
API format
POST /flowservice/sourceConnections
Request
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'Content-Type: application/json' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-d '{
"name": "ACME Streaming Source Connection for Customer Data",
"description": "A streaming source connection for ACME XDM Customer Data",
"baseConnectionId": "a59d368a-1152-4673-a46e-bd52e8cdb9a9",
"connectionSpec": {
"id": "bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
"version": "1.0"
}
}'
Response
A successful response returns HTTP status 201 with detailed of the newly created source connection, including its unique identifier (id
).
{
"id": "34ece231-294d-416c-ad2a-5a5dfb2bc69f",
"etag": "\"d505125b-0000-0200-0000-637eb7790000\""
}
Create a target XDM schema target-schema
In order for the source data to be used in Platform, a target schema must be created to structure the source data according to your needs. The target schema is then used to create a Platform dataset in which the source data is contained.
A target XDM schema can be created by performing a POST request to the Schema Registry API.
For detailed steps on how to create a target XDM schema, see the tutorial on creating a schema using the API.
Create a target dataset target-dataset
A target dataset can be created by performing a POST request to the Catalog Service API, providing the ID of the target schema within the payload.
For detailed steps on how to create a target dataset, see the tutorial on creating a dataset using the API.
Create a target connection target
A target connection represents the connection to the destination where the ingested data lands in. To create a target connection, make POST request to /targetConnections
while providing IDs for your target dataset and target XDM schema. During this step, you must also provide the data lake connection specification ID. This ID is c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
API format
POST /flowservice/targetConnections
Request
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'Content-Type: application/json' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-d '{
"name": "ACME Streaming Target Connection",
"description": "ACME Streaming Target Connection",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT}/schemas/7f682c29f887512a897791e7161b90a1ae7ed3dd07a177b1",
"version": "application/vnd.adobe.xed-full+json;version=1.0"
}
},
"params": {
"dataSetId": "637eb7fadc8a211b6312b65b"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
Response
A successful response returns HTTP status 201 with details of the newly created target connection, including its unique identifier (id
).
{
"id": "07f2f6ff-1da5-4704-916a-c615b873cba9",
"etag": "\"340680f7-0000-0200-0000-637eb8730000\""
}
Create a mapping mapping
In order for the source data to be ingested into a target dataset, it must first be mapped to the target schema that the target dataset adheres to.
To create a mapping set, make a POST request to the mappingSets
endpoint of the Data Prep API while providing your target XDM schema $id
and the details of the mapping sets you want to create.
API format
POST /mappingSets
Request
curl -X POST \
'https://platform.adobe.io/data/foundation/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT}/schemas/7f682c29f887512a897791e7161b90a1ae7ed3dd07a177b1",
"xdmVersion": "1.0",
"mappings": [
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "firstName",
"identity": false,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "lastName",
"identity": false,
"version": 0
}
]
}'
xdmSchema
$id
of the target XDM schema.Response
A successful response returns details of the newly created mapping including its unique identifier (id
). This ID is required in a later step to create a dataflow.
{
"id": "79a623960d3f4969835c9e00dc90c8df",
"version": 0,
"createdDate": 1669249214031,
"modifiedDate": 1669249214031,
"createdBy": "acme@AdobeID",
"modifiedBy": "acme@AdobeID"
}
Create a dataflow
With your source and target connections created, you can now create a dataflow. The dataflow is responsible for scheduling and collecting data from a source. You can create a dataflow by performing a POST request to the /flows
endpoint.
API format
POST /flows
Request
The following request creates a streaming dataflow for HTTP API without data transformations.
code language-shell |
---|
|
The following requests creates a streaming dataflow for HTTP API with mapping transformations applied to your data.
When creating a dataflow with transformations, the name
parameter cannot be changed. This value must always be set to Mapping
.
code language-shell |
---|
|
name
description
flowSpec.id
c1a19761-d2c7-4702-b9fa-fe91f0613e81
. To create a dataflow without transformations, use d8a6f005-7eaf-4153-983e-e8574508b877
.sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
Response
A successful response returns HTTP status 201 with details of your newly created dataflow, including its unique identifier (id
).
{
"id": "f2ae0194-8bd8-4a40-a4d9-f07bdc3e6ce2",
"etag": "\"dc0459ae-0000-0200-0000-637ebaec0000\""
}
Post data to be ingested to Platform ingest-data
Now that you’ve created your flow, you can send your JSON message to the streaming endpoint you previously created.
API format
POST /collection/{INLET_URL}
{INLET_URL}
/connections
endpoint while providing your base connection ID.{FLOW_ID}
Request
code language-shell |
---|
|
When sending raw data, you can specify your flow ID as either a query parameter or as part of your HTTP header. The following example specifies flow ID as an HTTP header.
code language-shell |
---|
|
When sending raw data, you can specify your flow ID as either a query parameter or as an HTTP header. The following example specifies flow ID as a query parameter.
code language-shell |
---|
|
Response
A successful response returns HTTP status 200 with details of the newly ingested information.
{
"inletId": "{BASE_CONNECTION_ID}",
"xactionId": "1584479347507:2153:240",
"receivedTimeMs": 1584479347507
}
{BASE_CONNECTION_ID}
xactionId
receivedTimeMs
Next steps
By following this tutorial, you have created a streaming HTTP connection, enabling you to use the streaming endpoint to ingest data into Platform. For instructions to create a streaming connection in the UI, please read the creating a streaming connection tutorial.
To learn how to stream data to Platform, please read either the tutorial on streaming time series data or the tutorial on streaming record data.
Appendix
This section provides supplemental information on creating streaming connections using the API.
Sending messages to an authenticated streaming connection
If a streaming connection has authentication enabled, the client will be required to add the Authorization
header to their request.
If the Authorization
header is not present, or an invalid/expired access token is sent, an HTTP 401 Unauthorized response will be returned, with a similar response as below:
Response
{
"type": "https://ns.adobe.com/adobecloud/problem/data-collection-service-authorization",
"status": "401",
"title": "Authorization",
"report": {
"message": "[id] Ims service token is empty"
}
}