Skapa ett dataflöde för datakällor med API:t Flow Service
Skapat för:
- Utvecklare
I den här självstudiekursen beskrivs stegen för hur du hämtar data från en datakälla och överför dem till Experience Platform med Flow Service API.
- För att kunna skapa ett dataflöde måste du redan ha ett giltigt ID för basanslutning med en datakälla. Om du inte har det här ID:t kan du se Källöversikt för en lista över databaskällor som du kan skapa en basanslutning med.
- För att Experience Platform ska kunna importera data måste tidszoner för alla tabellbaserade batchkällor konfigureras till UTC. Den enda tidsstämpeln som stöds för Snowflake source är TIMESTAMP_NTZ med UTC-tid.
Komma igång
Den här självstudiekursen kräver att du har en fungerande förståelse för följande komponenter i Adobe Experience Platform:
-
Experience Data Model (XDM) System: Det standardiserade ramverk som Experience Platform använder för att ordna kundupplevelsedata.
- Grundläggande om schemakomposition: Lär dig mer om grundstenarna i XDM-scheman, inklusive nyckelprinciper och bästa metoder för schemakomposition.
- Utvecklarhandbok för schemaregister: Innehåller viktig information som du behöver känna till för att kunna utföra anrop till API:t för schemaregister. Detta inkluderar din
{TENANT_ID}
, konceptet med behållare och de huvuden som krävs för att göra förfrågningar (med särskild uppmärksamhet på huvudet Godkänn och dess möjliga värden).
-
Catalog Service: Katalog är arkivsystemet för dataplatser och -länkar inom Experience Platform.
-
Batch ingestion: Med API:t för gruppinmatning kan du importera data till Experience Platform som gruppfiler.
-
Sandlådor: Experience Platform tillhandahåller virtuella sandlådor som partitionerar en enda Experience Platform-instans till separata virtuella miljöer för att utveckla och utveckla program för digitala upplevelser.
Använda Experience Platform API:er
Information om hur du kan anropa Experience Platform API:er finns i guiden Komma igång med Experience Platform API:er.
Skapa en källanslutning
Du kan skapa en källanslutning genom att göra en POST-begäran till API:t Flow Service. En källanslutning består av ett anslutnings-ID, en sökväg till källdatafilen och ett anslutnings-spec-ID.
Om du vill skapa en källanslutning måste du också definiera ett uppräkningsvärde för dataformatattributet.
Använd följande uppräkningsvärden för filbaserade anslutningar:
Dataformat | Uppräkningsvärde |
---|---|
Avgränsad | delimited |
JSON | json |
Parquet | parquet |
Ange värdet tabular
för alla tabellbaserade anslutningar.
API-format
POST /sourceConnections
Begäran
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database source connection",
"baseConnectionId": "6990abad-977d-41b9-a85d-17ea8cf1c0e4",
"description": "Database source connection",
"data": {
"format": "tabular"
},
"params": {
"tableName": "test1.Mytable",
"columns": [
{
"name": "TestID",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Name",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Datefield",
"type": "string",
"meta:xdmType": "date-time",
"xdm": {
"type": "string",
"format": "date-time"
}
}
]
},
"connectionSpec": {
"id": "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
"version": "1.0"
}
}'
baseConnectionId
params.path
connectionSpec.id
Svar
Ett lyckat svar returnerar den unika identifieraren (id
) för den nyligen skapade källanslutningen. Detta ID krävs i senare steg för att skapa en målanslutning.
{
"id": "b7581b59-c603-4df1-a689-d23d7ac440f3",
"etag": "\"ef05d265-0000-0200-0000-6019e0080000\""
}
Skapa ett mål-XDM-schema
För att källdata ska kunna användas i Experience Platform måste ett målschema skapas för att strukturera källdata efter dina behov. Målschemat används sedan för att skapa en Experience Platform-datauppsättning där källdata finns.
Ett mål-XDM-schema kan skapas genom att en POST-begäran till schemats register-APIutförs.
Detaljerade steg om hur du skapar ett mål-XDM-schema finns i självstudiekursen Skapa ett schema med API:t.
Skapa en måldatauppsättning
En måldatauppsättning kan skapas genom att en POST-begäran till katalogtjänstens API utförs, med ID:t för målschemat i nyttolasten.
Detaljerade steg om hur du skapar en måldatauppsättning finns i självstudiekursen Skapa en datauppsättning med API:t.
Skapa en målanslutning
En målanslutning representerar anslutningen till målet där inkapslade data kommer in. Om du vill skapa en målanslutning måste du ange det fasta anslutnings-spec-ID som är associerat med datasjön. Anslutningens spec-ID är: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Nu har du de unika identifierarna ett målschema, en måldatamängd och ett anslutningsspec-ID till datasjön. Med API:t Flow Service kan du skapa en målanslutning genom att ange dessa identifierare tillsammans med den datauppsättning som kommer att innehålla inkommande källdata.
API-format
POST /targetConnections
Begäran
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database target connection",
"description": "Database target connection",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "6019e0e7c5dcf718db5ebc71"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
för mål-XDM-schemat.data.schema.version
application/vnd.adobe.xed-full+json;version=1
, vilket returnerar den senaste delversionen av schemat.params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
.Svar
Ett svar returnerar den nya målanslutningens unika identifierare (id
). Detta värde krävs i ett senare steg för att skapa ett dataflöde.
{
"id": "320f119a-5ac1-4ab1-88ea-eb19e674ea2e",
"etag": "\"c0038936-0000-0200-0000-6019e1190000\""
}
Skapa en mappning
För att källdata ska kunna hämtas till en måldatamängd måste den först mappas till målschemat som måldatamängden följer.
Om du vill skapa en mappningsuppsättning skickar du en POST-begäran till mappingSets
-slutpunkten för Data Prep API samtidigt som du anger ditt mål-XDM-schema $id
och information om de mappningsuppsättningar du vill skapa.
API-format
POST /mappingSets
Begäran
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/52b59140414aa6a370ef5e21155fd7a686744b8739ecc168",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "TestID",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.fullName",
"sourceAttribute": "Name",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.birthDate",
"sourceAttribute": "Datefield",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
xdmSchema
$id
för mål-XDM-schemat.Svar
Ett lyckat svar returnerar information om den nyligen skapade mappningen inklusive dess unika identifierare (id
). Detta ID krävs i ett senare steg för att skapa ett dataflöde.
{
"id": "0b090130b58b4819afc78b6dc98b484d",
"version": 0,
"createdDate": 1612309018666,
"modifiedDate": 1612309018666,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Hämta dataflödesspecifikationer
Ett dataflöde används för att samla in data från källor och föra in dem i Experience Platform. Om du vill skapa ett dataflöde måste du först hämta dataflödesspecifikationerna genom att utföra en GET-begäran till Flow Service API. Dataflödesspecifikationer används för att samla in data från en extern databas eller ett NoSQL-system.
API-format
GET /flowSpecs?property=name=="CRMToAEP"
Begäran
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Svar
Ett lyckat svar returnerar information om dataflödesspecifikationen som ansvarar för att hämta data från källan till Experience Platform. Svaret innehåller den unika flödesspecifikation id
som krävs för att skapa ett nytt dataflöde.
{
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"name": "CRMToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"attributes": {
"isSourceFlow": true,
"flacValidationSupported": true,
"frequency": "batch",
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"sourceConnectionSpecIds": [
"3416976c-a9ca-4bba-901a-1f08f66978ff",
"38ad80fe-8b06-4938-94f4-d4ee80266b07",
"d771e9c1-4f26-40dc-8617-ce58c4b53702",
"3c9b37f8-13a6-43d8-bad3-b863b941fedd",
"cc6a4487-9e91-433e-a3a3-9cf6626c1806",
"3000eb99-cd47-43f3-827c-43caf170f015",
"26d738e0-8963-47ea-aadf-c60de735468a",
"74a1c565-4e59-48d7-9d67-7c03b8a13137",
"cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
"4f63aa36-bd48-4e33-bb83-49fbcd11c708",
"cb66ab34-8619-49cb-96d1-39b37ede86ea",
"eb13cb25-47ab-407f-ba89-c0125281c563",
"1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
"37b6bf40-d318-4655-90be-5cd6f65d334b",
"a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
"221c7626-58f6-4eec-8ee2-042b0226f03b",
"a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
"6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
"aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
"8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
"ecde33f2-c56f-46cc-bdea-ad151c16cd69",
"102706fb-a5cd-42ee-afe0-bc42f017ff43",
"09182899-b429-40c9-a15a-bf3ddbc8ced7",
"0479cc14-7651-4354-b233-7480606c2ac3",
"d6b52d86-f0f8-475f-89d4-ce54c8527328",
"a8f4d393-1a6b-43f3-931f-91a16ed857f4",
"1fe283f6-9bec-11ea-bb37-0242ac130002",
"fcad62f3-09b0-41d3-be11-449d5a621b69",
"ea1c2a08-b722-11eb-8529-0242ac130003",
"35d6c4d8-c9a9-11eb-b8bc-0242ac130003",
"ff4274f2-c9a9-11eb-b8bc-0242ac130003",
"ba5126ec-c9ac-11eb-b8bc-0242ac130003",
"b2e08744-4f1a-40ce-af30-7abac3e23cf3",
"929e4450-0237-4ed2-9404-b7e1e0a00309",
"2acf109f-9b66-4d5e-bc18-ebb2adcff8d5",
"2fa8af9c-2d1a-43ea-a253-f00a00c74412"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
},
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"transformationSpec": [
{
"name": "Copy",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"deltaColumn": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"dateFormat": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"required": [
"deltaColumn"
]
}
},
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"runSpec": {
"name": "ProviderParams",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for creating flow run.",
"properties": {
"startTime": {
"type": "integer",
"description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
},
"windowStartTime": {
"type": "integer",
"description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"windowEndTime": {
"type": "integer",
"description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"deltaColumn": {
"type": "object",
"description": "The delta column is required to partition the data and separate newly ingested data from historic data.",
"properties": {
"name": {
"type": "string"
},
"dateFormat": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"required": [
"startTime",
"windowStartTime",
"windowEndTime",
"deltaColumn"
]
}
}
}
Skapa ett dataflöde
Det sista steget mot att samla in data är att skapa ett dataflöde. Nu bör du ha förberett följande obligatoriska värden:
Ett dataflöde ansvarar för att schemalägga och samla in data från en källa. Du kan skapa ett dataflöde genom att utföra en POST-begäran samtidigt som du anger de tidigare angivna värdena i nyttolasten för begäran.
Om du vill schemalägga ett intag måste du först ange starttidsvärdet till epok time i sekunder. Sedan måste du ange frekvensvärdet till ett av de fem alternativen: once
, minute
, hour
, day
eller week
. Intervallvärdet anger perioden mellan två på varandra följande inmatningar och att skapa en engångsinmatning kräver inget intervall. Intervallvärdet måste vara lika med eller större än 15
för alla andra frekvenser.
API-format
POST /flows
Begäran
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Database dataflow using BigQuery",
"description": "collecting test1.Mytable",
"flowSpec": {
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"version": "1.0"
},
"sourceConnectionIds": [
"b7581b59-c603-4df1-a689-d23d7ac440f3"
],
"targetConnectionIds": [
"320f119a-5ac1-4ab1-88ea-eb19e674ea2e"
],
"transformations": [
{
"name": "Copy",
"params": {
"deltaColumn": {
"name": "Datefield",
"dateFormat": "YYYY-MM-DD",
"timezone": "UTC"
}
}
},
{
"name": "Mapping",
"params": {
"mappingId": "0b090130b58b4819afc78b6dc98b484d",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1612310466",
"frequency":"minute",
"interval":"15",
"backfill": "true"
}
}'
+++
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
transformations.params.deltaColum
deltaColumn
är yyyy-MM-dd HH:mm:ss
. Om du använder Azure Table Storage är det format som stöds för deltaColumn
yyyy-MM-ddTHH:mm:ssZ
.transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
once
, minute
, hour
, day
eller week
.scheduleParams.interval
Intervallet anger perioden mellan två på varandra följande flödeskörningar. Intervallets värde ska vara ett heltal som inte är noll. Det minsta tillåtna intervallvärdet för varje frekvens är följande:
- En gång: ingen/a
- Minut: 15
- Timme: 1
- Dag: 1
- Vecka: 1
Svar
Ett lyckat svar returnerar ID:t (id
) för det nyskapade dataflödet.
{
"id": "2edc08ac-4df5-4fe6-936f-81a19ce92f5c",
"etag": "\"770029f8-0000-0200-0000-6019e7d40000\""
}
Övervaka dataflödet
När dataflödet har skapats kan du övervaka de data som importeras genom det för att se information om flödeskörningar, slutförandestatus och fel. Mer information om hur du övervakar dataflöden finns i självstudiekursen Övervaka dataflöden i API.
Nästa steg
Genom att följa den här självstudiekursen har du skapat en källanslutning för att samla in data från en databas på schemalagd basis. Inkommande data kan nu användas av Experience Platform-tjänster längre fram i kedjan som Real-Time Customer Profile och Data Science Workspace. Mer information finns i följande dokument:
Bilaga
I följande avsnitt visas de olika anslutningarna till molnlagringskällan och deras anslutningsspecifikationer.
Anslutningsspecifikation
3416976c-a9ca-4bba-901a-1f08f66978ff
aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f
6a8d82bc-1caf-45d1-908d-cadabc9d63a6
0479cc14-7651-4354-b233-7480606c2ac3
a49bcc7d-8038-43af-b1e4-5a7a089a7d79
ecde33f2-c56f-46cc-bdea-ad151c16cd69
1fe283f6-9bec-11ea-bb37-0242ac130002
3c9b37f8-13a6-43d8-bad3-b863b941fedd
37b6bf40-d318-4655-90be-5cd6f65d334b
09182899-b429-40c9-a15a-bf3ddbc8ced7
000eb99-cd47-43f3-827c-43caf170f015
1f372ff9-38a4-4492-96f5-b9a4e4bd00ec
26d738e0-8963-47ea-aadf-c60de735468a
d6b52d86-f0f8-475f-89d4-ce54c8527328
102706fb-a5cd-42ee-afe0-bc42f017ff43
74a1c565-4e59-48d7-9d67-7c03b8a13137