Creare un flusso di dati per le origini di archiviazione cloud utilizzando l'API Flow Service
Creato per:
- Sviluppatore
Questo tutorial descrive i passaggi per recuperare i dati da un'origine di archiviazione cloud e portarli ad Experience Platform utilizzando Flow Service API.
Introduzione
Questo tutorial richiede una buona conoscenza dei seguenti componenti di Adobe Experience Platform:
-
Experience Data Model (XDM) System: framework standardizzato tramite il quale Experience Platform organizza i dati sull'esperienza del cliente.
- Nozioni di base sulla composizione dello schema: scopri i blocchi predefiniti di base degli schemi XDM, inclusi i principi chiave e le best practice nella composizione dello schema.
- Guida per gli sviluppatori del Registro di schema: include informazioni importanti che è necessario conoscere per eseguire correttamente le chiamate all'API del Registro di schema. Ciò include
{TENANT_ID}
, il concetto di "contenitori" e le intestazioni necessarie per effettuare le richieste (con particolare attenzione all'intestazione Accept e ai suoi possibili valori).
-
Catalog Service: il catalogo è il sistema di registrazione per la posizione e la derivazione dei dati in Experience Platform.
-
Batch ingestion: l'API di acquisizione batch consente di acquisire dati in Experience Platform come file batch.
-
Sandbox: Experience Platform fornisce sandbox virtuali che suddividono una singola istanza Experience Platform in ambienti virtuali separati, utili per le attività di sviluppo e aggiornamento delle applicazioni di esperienza digitale.
Utilizzo delle API di Experience Platform
Per informazioni su come effettuare correttamente chiamate alle API di Experience Platform, consulta la guida introduttiva alle API di Experience Platform.
Creare una connessione sorgente
È possibile creare una connessione di origine effettuando una richiesta POST all'endpoint sourceConnections
dell'API Flow Service e fornendo l'ID connessione di base, il percorso del file di origine che si desidera acquisire e l'ID della specifica di connessione corrispondente dell'origine.
Quando si crea una connessione di origine, è necessario definire anche un valore enum per l'attributo del formato dati.
Utilizzare i seguenti valori enum per le origini basate su file:
delimited
json
parquet
Per tutte le origini basate su tabelle, impostare il valore su tabular
.
Formato API
POST /sourceConnections
Richiesta
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}",
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
baseConnectionId
data.format
delimited
, JSON
e parquet
.data.properties
data.properties.columnDelimiter
,
) come valore predefinito. Nota: la proprietà columnDelimiter
può essere utilizzata solo durante l'acquisizione di file delimitati.data.properties.encoding
UTF-8
e ISO-8859-1
. Nota: il parametro encoding
è disponibile solo per l'acquisizione di file CSV delimitati. Altri tipi di file verranno acquisiti con la codifica predefinita, UTF-8
.data.properties.compressionType
bzip2
, gzip
, deflate
, zipDeflate
, tarGzip
e tar
. Nota: la proprietà compressionType
può essere utilizzata solo durante l'acquisizione di file delimitati o JSON.params.path
/acme/summerCampaign/*.csv
acquisirà l'intera cartella /acme/summerCampaign/
.params.type
file
per acquisire un singolo file e il tipo folder
per acquisire un'intera cartella.connectionSpec.id
Risposta
In caso di esito positivo, la risposta restituisce l'identificatore univoco (id
) della connessione di origine appena creata. Questo ID è necessario in un passaggio successivo per creare un flusso di dati.
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
Utilizza espressioni regolari per selezionare un set specifico di file da acquisire
È possibile utilizzare espressioni regolari per acquisire un particolare set di file dall’origine ad Experience Platform durante la creazione di una connessione sorgente.
Formato API
POST /sourceConnections
Richiesta
Nell’esempio seguente, nel percorso del file viene utilizzata l’espressione regolare per specificare l’acquisizione di tutti i file CSV il cui nome contiene premium
.
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/*premium*.csv",
"type": "folder"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Configurare una connessione di origine per acquisire i dati in modo ricorsivo
Durante la creazione di una connessione di origine, è possibile utilizzare il parametro recursive
per acquisire dati da cartelle nidificate in modo profondo.
Formato API
POST /sourceConnections
Richiesta
Nell'esempio seguente, il parametro recursive: true
informa Flow Service di leggere tutte le sottocartelle in modo ricorsivo durante il processo di acquisizione.
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source with recursive ingestion",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/customers/premium/buyers/recursive",
"type": "folder",
"recursive": true
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Creare uno schema XDM di destinazione
Affinché i dati sorgente possano essere utilizzati in Experience Platform, è necessario creare uno schema di destinazione per strutturare i dati sorgente in base alle tue esigenze. Lo schema di destinazione viene quindi utilizzato per creare un set di dati Experience Platform in cui sono contenuti i dati di origine.
È possibile creare uno schema XDM di destinazione eseguendo una richiesta POST all'API Schema Registry.
Per i passaggi dettagliati su come creare uno schema XDM di destinazione, consulta l'esercitazione su creazione di uno schema utilizzando l'API.
Creare un set di dati di destinazione
È possibile creare un set di dati di destinazione eseguendo una richiesta POST all'API Catalog Service, fornendo l'ID dello schema di destinazione all'interno del payload.
Per i passaggi dettagliati su come creare un set di dati di destinazione, consulta l'esercitazione su creazione di un set di dati utilizzando l'API.
Creare una connessione di destinazione
Una connessione di destinazione rappresenta la connessione alla destinazione in cui arrivano i dati acquisiti. Per creare una connessione di destinazione, devi fornire l’ID della specifica di connessione fissa associato al Data Lake. ID della specifica di connessione: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Ora disponi degli identificatori univoci, di uno schema di destinazione, di un set di dati di destinazione e dell’ID della specifica di connessione al Data Lake. Utilizzando questi identificatori, è possibile creare una connessione di destinazione utilizzando l'API Flow Service per specificare il set di dati che conterrà i dati di origine in entrata.
Formato API
POST /targetConnections
Richiesta
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
dello schema XDM di destinazione.data.schema.version
application/vnd.adobe.xed-full+json;version=1
, che restituisce la versione secondaria più recente dello schema.params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
.Risposta
Una risposta corretta restituisce l'identificatore univoco della nuova connessione di destinazione (id
). Questo ID è richiesto nei passaggi successivi.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Creare una mappatura
Per poter acquisire i dati di origine in un set di dati di destinazione, è necessario prima mapparli sullo schema di destinazione a cui il set di dati di destinazione aderisce.
Per creare un set di mappatura, effettua una richiesta POST all'endpoint mappingSets
dell'Data Prep API fornendo allo stesso tempo lo schema XDM di destinazione $id
e i dettagli dei set di mappatura che desideri creare.
Formato API
POST /conversion/mappingSets
Richiesta
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
xdmSchema
Risposta
In caso di esito positivo, la risposta restituisce i dettagli della mappatura appena creata, incluso il relativo identificatore univoco (id
). Questo valore è necessario in un passaggio successivo per creare un flusso di dati.
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Recuperare le specifiche del flusso di dati
Un flusso di dati è responsabile della raccolta dei dati dalle origini e della loro integrazione in Experience Platform. Per creare un flusso di dati, devi innanzitutto ottenere le specifiche del flusso di dati responsabili della raccolta dei dati dell’archiviazione cloud.
Formato API
GET /flowSpecs?property=name=="CloudStorageToAEP"
Richiesta
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Risposta
In caso di esito positivo, la risposta restituisce i dettagli della specifica del flusso di dati responsabili dell’importazione di dati dall’origine in Experience Platform. La risposta include la specifica di flusso univoca id
necessaria per creare un nuovo flusso di dati.
{
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"name": "CloudStorageToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"attributes": {
"isSourceFlow": true,
"flacValidationSupported": true,
"frequency": "batch",
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"sourceConnectionSpecIds": [
"b3ba5556-48be-44b7-8b85-ff2b69b46dc4",
"ecadc60c-7455-4d87-84dc-2a0e293d997b",
"b7829c2f-2eb0-4f49-a6ee-55e33008b629",
"4c10e202-c428-4796-9208-5f1f5732b1cf",
"fb2e94c9-c031-467d-8103-6bd6e0a432f2",
"32e8f412-cdf7-464c-9885-78184cb113fd",
"b7bf2577-4520-42c9-bae9-cad01560f7bc",
"998b8ae3-cec0-43b7-8abe-40b1eb4ee069",
"be5ec48c-5b78-49d5-b8fa-7c89ec4569b8",
"54e221aa-d342-4707-bcff-7a4bceef0001",
"c85f9425-fb21-426c-ad0b-405e9bd8a46c",
"26f526f2-58f4-4712-961d-e41bf1ccc0e8"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
},
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"transformationSpec": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"runSpec": {
"name": "ProviderParams",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for creating flow run.",
"properties": {
"startTime": {
"type": "integer",
"description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
},
"windowStartTime": {
"type": "integer",
"description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"windowEndTime": {
"type": "integer",
"description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
}
},
"required": [
"startTime",
"windowStartTime",
"windowEndTime"
]
}
}
}
Creare un flusso di dati
L’ultimo passaggio per la raccolta dei dati di archiviazione cloud è la creazione di un flusso di dati. A questo punto sono stati preparati i seguenti valori obbligatori:
Un flusso di dati è responsabile della pianificazione e della raccolta di dati da un’origine. Puoi creare un flusso di dati eseguendo una richiesta POST e fornendo i valori precedentemente menzionati all’interno del payload.
Per pianificare un’acquisizione, devi prima impostare il valore dell’ora di inizio su tempo epoca in secondi. Impostare quindi il valore della frequenza su una delle cinque opzioni seguenti: once
, minute
, hour
, day
o week
. Il valore di intervallo indica il periodo tra due acquisizioni consecutive e la creazione di un’acquisizione una tantum non richiede l’impostazione di un intervallo. Per tutte le altre frequenze, il valore dell'intervallo deve essere impostato su uguale o maggiore di 15
.
Formato API
POST /flows
Richiesta
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Experience Platform",
"description": "Cloud Storage flow to Experience Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
once
, minute
, hour
, day
o week
.scheduleParams.interval
L’intervallo indica il periodo tra due esecuzioni consecutive del flusso. Il valore dell'intervallo deve essere un numero intero diverso da zero. Il valore dell'intervallo minimo accettato per ciascuna frequenza è il seguente:
- Una volta: n/d
- Minuto: 15
- Ora: 1
- Giorno: 1
- Settimana: 1
Risposta
In caso di esito positivo, la risposta restituisce l'ID (id
) del flusso di dati appena creato.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Monitorare il flusso di dati
Una volta creato il flusso di dati, puoi monitorare i dati che vengono acquisiti tramite di esso per visualizzare informazioni sulle esecuzioni del flusso, sullo stato di completamento e sugli errori. Per ulteriori informazioni su come monitorare i flussi di dati, consulta l'esercitazione sul monitoraggio dei flussi di dati nell'API
Passaggi successivi
Seguendo questa esercitazione, hai creato un connettore di origine per raccogliere i dati dall’archiviazione cloud in base a una pianificazione. I dati in arrivo possono ora essere utilizzati dai servizi Experience Platform a valle come Real-Time Customer Profile e Data Science Workspace. Per ulteriori informazioni, consulta i seguenti documenti:
Appendice
Nella sezione seguente sono elencati i diversi connettori di origine dell’archiviazione cloud e le relative specifiche di connessione.
Specifica di connessione
ecadc60c-7455-4d87-84dc-2a0e293d997b
86043421-563b-46ec-8e6c-e23184711bf6
4c10e202-c428-4796-9208-5f1f5732b1cf
b3ba5556-48be-44b7-8b85-ff2b69b46dc4
bf9f5905-92b7-48bf-bf20-455bc6b60a4e
be5ec48c-5b78-49d5-b8fa-7c89ec4569b8
32e8f412-cdf7-464c-9885-78184cb113fd
54e221aa-d342-4707-bcff-7a4bceef0001
c85f9425-fb21-426c-ad0b-405e9bd8a46c
bf367b0d-3d9b-4060-b67b-0d3d9bd06094