Create a dataflow for database sources using the Flow Service API

This tutorial covers the steps for retrieving data from a database source and bringing them to Platform using Flow Service API.

NOTE
In order to create a dataflow, you must already have a valid base connection ID with a database source. If you do not have this ID, then see the sources overview for a list of database sources that you can create a base connection with.

Getting started

This tutorial requires you to have a working understanding of the following components of Adobe Experience Platform:

  • Experience Data Model (XDM) System: The standardized framework by which Experience Platform organizes customer experience data.

    • Basics of schema composition: Learn about the basic building blocks of XDM schemas, including key principles and best practices in schema composition.
    • Schema Registry developer guide: Includes important information that you need to know in order to successfully perform calls to the Schema Registry API. This includes your {TENANT_ID}, the concept of “containers”, and the required headers for making requests (with special attention to the Accept header and its possible values).
  • Catalog Service: Catalog is the system of record for data location and lineage within Experience Platform.

  • Batch ingestion: The Batch Ingestion API allows you to ingest data into Experience Platform as batch files.

  • Sandboxes: Experience Platform provides virtual sandboxes which partition a single Platform instance into separate virtual environments to help develop and evolve digital experience applications.

Using Platform APIs

For information on how to successfully make calls to Platform APIs, see the guide on getting started with Platform APIs.

Create a source connection source

You can create a source connection by making a POST request to the Flow Service API. A source connection consists of a connection ID, a path to the source data file, and a connection spec ID.

To create a source connection, you must also define an enum value for the data format attribute.

Use the following enum values for file-based connectors:

Data format
Enum value
Delimited
delimited
JSON
json
Parquet
parquet

For all table-based connectors, set the value to tabular.

API format

POST /sourceConnections

Request

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database source connection",
        "baseConnectionId": "6990abad-977d-41b9-a85d-17ea8cf1c0e4",
        "description": "Database source connection",
        "data": {
            "format": "tabular"
        },
        "params": {
            "tableName": "test1.Mytable",
            "columns": [
                {
                    "name": "TestID",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "Name",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "Datefield",
                    "type": "string",
                    "meta:xdmType": "date-time",
                    "xdm": {
                        "type": "string",
                        "format": "date-time"
                    }
                }
            ]
        },
        "connectionSpec": {
            "id": "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
            "version": "1.0"
        }
    }'
Property
Description
baseConnectionId
The connection ID of your database source.
params.path
The path of the source file.
connectionSpec.id
The connection specification ID of your database source. See the Appendix for a list of database spec IDs.

Response

A successful response returns the unique identifier (id) of the newly created source connection. This ID is required in later steps to create a target connection.

{
    "id": "b7581b59-c603-4df1-a689-d23d7ac440f3",
    "etag": "\"ef05d265-0000-0200-0000-6019e0080000\""
}

Create a target XDM schema target-schema

In order for the source data to be used in Platform, a target schema must be created to structure the source data according to your needs. The target schema is then used to create a Platform dataset in which the source data is contained.

A target XDM schema can be created by performing a POST request to the Schema Registry API.

For detailed steps on how to create a target XDM schema, see the tutorial on creating a schema using the API.

Create a target dataset target-dataset

A target dataset can be created by performing a POST request to the Catalog Service API, providing the ID of the target schema within the payload.

For detailed steps on how to create a target dataset, see the tutorial on creating a dataset using the API.

Create a target connection target-connection

A target connection represents the connection to the destination where the ingested data lands in. To create a target connection, you must provide the fixed connection spec ID associated to the Data Lake… This connection spec ID is: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

You now have the unique identifiers a target schema a target dataset and the connection spec ID to data lake. Using the Flow Service API, you can create a target connection by specifying these identifiers along with the dataset that will contain the inbound source data.

API format

POST /targetConnections

Request

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database target connection",
        "description": "Database target connection",
        "data": {
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
                "version": "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "6019e0e7c5dcf718db5ebc71"
        },
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        }
    }'
Property
Description
data.schema.id
The $id of the target XDM schema.
data.schema.version
The version of the schema. This value must be set application/vnd.adobe.xed-full+json;version=1, which returns the latest minor version of the schema.
params.dataSetId
The ID of the target dataset generated in the previous step. Note: You must provide a valid dataset ID when creating a target connection. An invalid dataset ID will result in an error.
connectionSpec.id
The connection specification ID used to connect to the data lake. This ID is: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

Response

A successful response returns the new target connection’s unique identifier (id). This value is required in a later step to create a dataflow.

{
    "id": "320f119a-5ac1-4ab1-88ea-eb19e674ea2e",
    "etag": "\"c0038936-0000-0200-0000-6019e1190000\""
}

Create a mapping mapping

In order for the source data to be ingested into a target dataset, it must first be mapped to the target schema that the target dataset adheres to.

To create a mapping set, make a POST request to the mappingSets endpoint of the Data Prep API while providing your target XDM schema $id and the details of the mapping sets you want to create.

API format

POST /mappingSets

Request

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
        "xdmVersion": "1.0",
        "id": null,
        "mappings": [
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "TestID",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.name.fullName",
                "sourceAttribute": "Name",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "person.birthDate",
                "sourceAttribute": "Datefield",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            }
        ]
    }'
Property
Description
xdmSchema
The $id of the target XDM schema.

Response

A successful response returns details of the newly created mapping including its unique identifier (id). This ID is required in a later step to create a dataflow.

{
    "id": "0b090130b58b4819afc78b6dc98b484d",
    "version": 0,
    "createdDate": 1612309018666,
    "modifiedDate": 1612309018666,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

Retrieve dataflow specifications specs

A dataflow is responsible for collecting data from sources and bringing them into Platform. In order to create a dataflow, you must first obtain the dataflow specifications by performing a GET request to the Flow Service API. Dataflow specifications are responsible for collecting data from an external database or NoSQL system.

API format

GET /flowSpecs?property=name=="CRMToAEP"

Request

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

Response

A successful response returns the details of the dataflow specification responsible for bringing data from your source into Platform. The response includes the unique flow spec id required to create a new dataflow.

NOTE
The JSON response payload below is hidden for brevity. Select “payload” to see the response payload.
View payload
code language-json
{
  "id": "14518937-270c-4525-bdec-c2ba7cce3860",
  "name": "CRMToAEP",
  "providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
  "version": "1.0",
  "attributes": {
    "isSourceFlow": true,
    "flacValidationSupported": true,
    "frequency": "batch",
    "notification": {
      "category": "sources",
      "flowRun": {
        "enabled": true
      }
    }
  },
  "sourceConnectionSpecIds": [
    "3416976c-a9ca-4bba-901a-1f08f66978ff",
    "38ad80fe-8b06-4938-94f4-d4ee80266b07",
    "d771e9c1-4f26-40dc-8617-ce58c4b53702",
    "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
    "cc6a4487-9e91-433e-a3a3-9cf6626c1806",
    "3000eb99-cd47-43f3-827c-43caf170f015",
    "26d738e0-8963-47ea-aadf-c60de735468a",
    "74a1c565-4e59-48d7-9d67-7c03b8a13137",
    "cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
    "4f63aa36-bd48-4e33-bb83-49fbcd11c708",
    "cb66ab34-8619-49cb-96d1-39b37ede86ea",
    "eb13cb25-47ab-407f-ba89-c0125281c563",
    "1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
    "37b6bf40-d318-4655-90be-5cd6f65d334b",
    "a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
    "221c7626-58f6-4eec-8ee2-042b0226f03b",
    "a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
    "6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
    "aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
    "8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
    "ecde33f2-c56f-46cc-bdea-ad151c16cd69",
    "102706fb-a5cd-42ee-afe0-bc42f017ff43",
    "09182899-b429-40c9-a15a-bf3ddbc8ced7",
    "0479cc14-7651-4354-b233-7480606c2ac3",
    "d6b52d86-f0f8-475f-89d4-ce54c8527328",
    "a8f4d393-1a6b-43f3-931f-91a16ed857f4",
    "1fe283f6-9bec-11ea-bb37-0242ac130002",
    "fcad62f3-09b0-41d3-be11-449d5a621b69",
    "ea1c2a08-b722-11eb-8529-0242ac130003",
    "35d6c4d8-c9a9-11eb-b8bc-0242ac130003",
    "ff4274f2-c9a9-11eb-b8bc-0242ac130003",
    "ba5126ec-c9ac-11eb-b8bc-0242ac130003",
    "b2e08744-4f1a-40ce-af30-7abac3e23cf3",
    "929e4450-0237-4ed2-9404-b7e1e0a00309",
    "2acf109f-9b66-4d5e-bc18-ebb2adcff8d5",
    "2fa8af9c-2d1a-43ea-a253-f00a00c74412"
  ],
  "targetConnectionSpecIds": [
    "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
  ],
  "permissionsInfo": {
    "view": [
      {
        "@type": "lowLevel",
        "name": "EnterpriseSource",
        "permissions": [
          "read"
        ]
      }
    ],
    "manage": [
      {
        "@type": "lowLevel",
        "name": "EnterpriseSource",
        "permissions": [
          "write"
        ]
      }
    ]
  },
  "optionSpec": {
    "name": "OptionSpec",
    "spec": {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "properties": {
        "errorDiagnosticsEnabled": {
          "title": "Error diagnostics.",
          "description": "Flag to enable detailed and sample error diagnostics summary.",
          "type": "boolean",
          "default": false
        },
        "partialIngestionPercent": {
          "title": "Partial ingestion threshold.",
          "description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
          "type": "number",
          "exclusiveMinimum": 0
        }
      }
    }
  },
  "scheduleSpec": {
    "name": "PeriodicSchedule",
    "type": "Periodic",
    "spec": {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "properties": {
        "startTime": {
          "description": "epoch time",
          "type": "integer"
        },
        "frequency": {
          "type": "string",
          "enum": [
            "once",
            "minute",
            "hour",
            "day",
            "week"
          ]
        },
        "interval": {
          "type": "integer"
        },
        "backfill": {
          "type": "boolean",
          "default": true
        }
      },
      "required": [
        "startTime",
        "frequency"
      ],
      "if": {
        "properties": {
          "frequency": {
            "const": "once"
          }
        }
      },
      "then": {
        "allOf": [
          {
            "not": {
              "required": [
                "interval"
              ]
            }
          },
          {
            "not": {
              "required": [
                "backfill"
              ]
            }
          }
        ]
      },
      "else": {
        "required": [
          "interval"
        ],
        "if": {
          "properties": {
            "frequency": {
              "const": "minute"
            }
          }
        },
        "then": {
          "properties": {
            "interval": {
              "minimum": 15
            }
          }
        },
        "else": {
          "properties": {
            "interval": {
              "minimum": 1
            }
          }
        }
      }
    }
  },
  "transformationSpec": [
    {
      "name": "Copy",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "properties": {
          "deltaColumn": {
            "type": "object",
            "properties": {
              "name": {
                "type": "string"
              },
              "dateFormat": {
                "type": "string"
              },
              "timezone": {
                "type": "string"
              }
            },
            "required": [
              "name"
            ]
          }
        },
        "required": [
          "deltaColumn"
        ]
      }
    },
    {
      "name": "Mapping",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "description": "defines various params required for different mapping from source to target",
        "properties": {
          "mappingId": {
            "type": "string"
          },
          "mappingVersion": {
            "type": "string"
          }
        }
      }
    }
  ],
  "runSpec": {
      "name": "ProviderParams",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "description": "defines various params required for creating flow run.",
        "properties": {
          "startTime": {
            "type": "integer",
            "description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
          },
          "windowStartTime": {
            "type": "integer",
            "description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
          },
          "windowEndTime": {
            "type": "integer",
            "description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
          },
          "deltaColumn": {
            "type": "object",
            "description": "The delta column is required to partition the data and separate newly ingested data from historic data.",
            "properties": {
              "name": {
                "type": "string"
              },
              "dateFormat": {
                "type": "string"
              },
              "timezone": {
                "type": "string"
              }
            },
            "required": [
              "name"
            ]
          }
        },
        "required": [
          "startTime",
          "windowStartTime",
          "windowEndTime",
          "deltaColumn"
        ]
      }
    }
}

Create a dataflow

The last step towards collecting data is to create a dataflow. At this point, you should have the following required values prepared:

A dataflow is responsible for scheduling and collecting data from a source. You can create a dataflow by performing a POST request while providing the previously mentioned values within the request payload.

To schedule an ingestion, you must first set the start time value to epoch time in seconds. Then, you must set the frequency value to one of the five options: once, minute, hour, day, or week. The interval value designates the period between two consecutive ingestions and creating a one-time ingestion does not require an interval to be set. For all other frequencies, the interval value must be set to equal or greater than 15.

API format

POST /flows

Request

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database dataflow using BigQuery",
        "description": "collecting test1.Mytable",
        "flowSpec": {
            "id": "14518937-270c-4525-bdec-c2ba7cce3860",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "b7581b59-c603-4df1-a689-d23d7ac440f3"
        ],
        "targetConnectionIds": [
            "320f119a-5ac1-4ab1-88ea-eb19e674ea2e"
        ],
        "transformations": [
            {
                "name": "Copy",
                "params": {
                    "deltaColumn": {
                        "name": "Datefield",
                        "dateFormat": "YYYY-MM-DD",
                        "timezone": "UTC"
                    }
                }
            },
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "0b090130b58b4819afc78b6dc98b484d",
                    "mappingVersion": 0
                }
            }
        ],
        "scheduleParams": {
            "startTime": "1612310466",
            "frequency":"minute",
            "interval":"15",
            "backfill": "true"
        }
    }'

+++

Property
Description
flowSpec.id
The flow spec ID retrieved in the previous step.
sourceConnectionIds
The source connection ID retrieved in an earlier step.
targetConnectionIds
The target connection ID retrieved in an earlier step.
transformations.params.mappingId
The mapping ID retrieved in an earlier step.
transformations.params.deltaColum
The designated column used to differentiate between new and existing data. Incremental data will be ingested based on the timestamp of selected column. The supported date format for deltaColumn is yyyy-MM-dd HH:mm:ss. If you are using Azure Table Storage, the supported format for deltaColumn is yyyy-MM-ddTHH:mm:ssZ.
transformations.params.mappingId
The mapping ID associated with your database.
scheduleParams.startTime
The start time for the dataflow in epoch time.
scheduleParams.frequency
The frequency at which the dataflow will collect data. Acceptable values include: once, minute, hour, day, or week.
scheduleParams.interval
The interval designates the period between two consecutive flow runs. The interval’s value should be a non-zero integer. Interval is not required when frequency is set as once and should be greater than or equal to 15 for other frequency values.

Response

A successful response returns the ID (id) of the newly created dataflow.

{
    "id": "2edc08ac-4df5-4fe6-936f-81a19ce92f5c",
    "etag": "\"770029f8-0000-0200-0000-6019e7d40000\""
}

Monitor your dataflow

Once your dataflow has been created, you can monitor the data that is being ingested through it to see information on flow runs, completion status, and errors. For more information on how to monitor dataflows, see the tutorial on monitoring dataflows in the API

Next steps

By following this tutorial, you have created a source connector to collect data from a database on a scheduled basis. Incoming data can now be used by downstream Platform services such as Real-Time Customer Profile and Data Science Workspace. See the following documents for more details:

Appendix

The following section lists the different cloud storage source connectors and their connections specifications.

Connection specification

Connector name
Connection spec ID
Amazon Redshift
3416976c-a9ca-4bba-901a-1f08f66978ff
Apache Hive on Azure HDInsights
aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f
Apache Spark on Azure HDInsights
6a8d82bc-1caf-45d1-908d-cadabc9d63a6
Azure Data Explorer
0479cc14-7651-4354-b233-7480606c2ac3
Azure Synapse Analytics
a49bcc7d-8038-43af-b1e4-5a7a089a7d79
Azure Table Storage
ecde33f2-c56f-46cc-bdea-ad151c16cd69
Couchbase
1fe283f6-9bec-11ea-bb37-0242ac130002
Google BigQuery
3c9b37f8-13a6-43d8-bad3-b863b941fedd
Greenplum
37b6bf40-d318-4655-90be-5cd6f65d334b
IBM DB2
09182899-b429-40c9-a15a-bf3ddbc8ced7
MariaDB
000eb99-cd47-43f3-827c-43caf170f015
Microsoft SQL Server
1f372ff9-38a4-4492-96f5-b9a4e4bd00ec
MySQL
26d738e0-8963-47ea-aadf-c60de735468a
Oracle
d6b52d86-f0f8-475f-89d4-ce54c8527328
Phoenix
102706fb-a5cd-42ee-afe0-bc42f017ff43
PostgreSQL
74a1c565-4e59-48d7-9d67-7c03b8a13137
recommendation-more-help
337b99bb-92fb-42ae-b6b7-c7042161d089