Questa esercitazione descrive i passaggi per recuperare i dati da un’origine di archiviazione cloud e portarli a Platform tramite Flow Service API.
Per creare un flusso di dati, è necessario disporre già di un ID connessione di base valido con un’origine di archiviazione cloud. Se non disponi di questo ID, consulta panoramica sulle origini per un elenco delle origini di archiviazione cloud con cui è possibile creare una connessione di base.
Questo tutorial richiede una buona conoscenza dei seguenti componenti di Adobe Experience Platform:
{TENANT_ID}
, il concetto di "contenitori" e le intestazioni necessarie per effettuare le richieste (con particolare attenzione all’intestazione Accept e ai suoi possibili valori).Per informazioni su come effettuare correttamente chiamate alle API di Platform, consulta la guida su introduzione alle API di Platform.
Puoi creare una connessione sorgente effettuando una richiesta POST al sourceConnections
endpoint di Flow Service fornendo l’ID della connessione di base, il percorso del file sorgente che desideri acquisire e l’ID della specifica di connessione corrispondente della sorgente.
Quando si crea una connessione di origine, è necessario definire anche un valore enum per l'attributo del formato dati.
Utilizzare i seguenti valori enum per le origini basate su file:
Formato dati | Valore enum |
---|---|
Delimitato | delimited |
JSON | json |
Parquet | parquet |
Per tutte le origini basate su tabelle, imposta il valore su tabular
.
Formato API
POST /sourceConnections
Richiesta
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}",
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Proprietà | Descrizione |
---|---|
baseConnectionId |
ID della connessione di base dell’origine di archiviazione cloud. |
data.format |
Il formato dei dati che desideri inserire in Platform. I valori supportati sono: delimited , JSON , e parquet . |
data.properties |
(Facoltativo) Insieme di proprietà che è possibile applicare ai dati durante la creazione di una connessione di origine. |
data.properties.columnDelimiter |
(Facoltativo) Un delimitatore di colonna a carattere singolo che è possibile specificare durante la raccolta di file flat. Qualsiasi valore di carattere singolo è un delimitatore di colonna consentito. Se non specificato, virgola (, ) viene utilizzato come valore predefinito. Nota: Il columnDelimiter Questa proprietà può essere utilizzata solo durante l’acquisizione di file delimitati. |
data.properties.encoding |
(Facoltativo) Proprietà che definisce il tipo di codifica da utilizzare per l’acquisizione dei dati in Platform. I tipi di codifica supportati sono: UTF-8 e ISO-8859-1 . Nota: Il encoding Il parametro è disponibile solo durante l’acquisizione di file CSV delimitati. Altri tipi di file verranno acquisiti con la codifica predefinita, UTF-8 . |
data.properties.compressionType |
(Facoltativo) Proprietà che definisce il tipo di file compresso da acquisire. I tipi di file compressi supportati sono: bzip2 , gzip , deflate , zipDeflate , tarGzip , e tar . Nota: Il compressionType Questa proprietà può essere utilizzata solo durante l’acquisizione di file delimitati o JSON. |
params.path |
Percorso del file di origine a cui si sta effettuando l'accesso. Questo parametro punta a un singolo file o a un'intera cartella. Nota: puoi utilizzare un asterisco al posto del nome del file per specificare l’acquisizione di un’intera cartella. Ad esempio: /acme/summerCampaign/*.csv acquisirà l’intero /acme/summerCampaign/ cartella. |
params.type |
Tipo di file del file di dati di origine che si sta acquisendo. Usa tipo file per acquisire un singolo file e utilizzare il tipo folder per acquisire un’intera cartella. |
connectionSpec.id |
ID della specifica di connessione associato all’origine di archiviazione cloud specifica. Consulta la appendice per un elenco degli ID delle specifiche di connessione. |
Risposta
In caso di esito positivo, la risposta restituisce l’identificatore univoco (id
) della connessione sorgente appena creata. Questo ID è necessario in un passaggio successivo per creare un flusso di dati.
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
È possibile utilizzare espressioni regolari per acquisire un particolare set di file dall’origine a Platform durante la creazione di una connessione sorgente.
Formato API
POST /sourceConnections
Richiesta
Nell’esempio seguente, nel percorso del file viene utilizzata l’espressione regolare per specificare l’acquisizione di tutti i file CSV che presentano premium
in loro nome.
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/*premium*.csv",
"type": "folder"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Quando si crea una connessione sorgente, è possibile utilizzare recursive
parametro per acquisire dati da cartelle nidificate in profondità.
Formato API
POST /sourceConnections
Richiesta
Nell’esempio seguente, il recursive: true
informazioni sui parametri Flow Service per leggere tutte le sottocartelle in modo ricorsivo durante il processo di acquisizione.
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source with recursive ingestion",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/customers/premium/buyers/recursive",
"type": "folder",
"recursive": true
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
Per utilizzare i dati sorgente in Platform, è necessario creare uno schema di destinazione che strutturi i dati sorgente in base alle tue esigenze. Lo schema di destinazione viene quindi utilizzato per creare un set di dati di Platform in cui sono contenuti i dati di origine.
È possibile creare uno schema XDM di destinazione eseguendo una richiesta POST al API del registro dello schema.
Per i passaggi dettagliati su come creare uno schema XDM di destinazione, consulta l’esercitazione su creazione di uno schema tramite l’API.
È possibile creare un set di dati di destinazione eseguendo una richiesta POST al API Catalog Service, che fornisce l’ID dello schema di destinazione all’interno del payload.
Per i passaggi dettagliati su come creare un set di dati di destinazione, consulta l’esercitazione su creazione di un set di dati tramite l’API.
Una connessione di destinazione rappresenta la connessione alla destinazione in cui arrivano i dati acquisiti. Per creare una connessione di destinazione, devi fornire l’ID della specifica di connessione fissa associato al Data Lake. L'ID della specifica di connessione è: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
Ora disponi degli identificatori univoci, di uno schema di destinazione, di un set di dati di destinazione e dell’ID della specifica di connessione al Data Lake. Utilizzando questi identificatori, puoi creare una connessione di destinazione utilizzando Flow Service API per specificare il set di dati che conterrà i dati di origine in entrata.
Formato API
POST /targetConnections
Richiesta
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
Proprietà | Descrizione |
---|---|
data.schema.id |
Il $id dello schema XDM di destinazione. |
data.schema.version |
Versione dello schema. Questo valore deve essere impostato application/vnd.adobe.xed-full+json;version=1 , che restituisce la versione secondaria più recente dello schema. |
params.dataSetId |
ID del set di dati di destinazione. |
connectionSpec.id |
ID della specifica di connessione fissa al Data Lake. Questo ID è: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
Risposta
In caso di esito positivo, la risposta restituisce l’identificatore univoco della nuova connessione di destinazione (id
). Questo ID è richiesto nei passaggi successivi.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Per poter acquisire i dati di origine in un set di dati di destinazione, è necessario prima mapparli sullo schema di destinazione a cui il set di dati di destinazione aderisce.
Per creare un set di mappatura, effettua una richiesta POST al mappingSets
endpoint del Data Prep API mentre fornisci lo schema XDM di destinazione $id
e i dettagli dei set di mappatura da creare.
Puoi mappare tipi di dati complessi, ad esempio array, in file JSON utilizzando un connettore di origine dell’archiviazione cloud.
Formato API
POST /conversion/mappingSets
Richiesta
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
Proprietà | Descrizione |
---|---|
xdmSchema |
ID dello schema XDM di destinazione. |
Risposta
In caso di esito positivo, la risposta restituisce i dettagli della mappatura appena creata, compreso l’identificatore univoco (id
). Questo valore è necessario in un passaggio successivo per creare un flusso di dati.
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
Un flusso di dati è responsabile della raccolta dei dati dalle origini e della loro introduzione in Platform. Per creare un flusso di dati, devi innanzitutto ottenere le specifiche del flusso di dati responsabili della raccolta dei dati dell’archiviazione cloud.
Formato API
GET /flowSpecs?property=name=="CloudStorageToAEP"
Richiesta
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Il payload di risposta JSON seguente è nascosto per brevità. Seleziona "payload" per visualizzare il payload di risposta.
Risposta
In caso di esito positivo, la risposta restituisce i dettagli della specifica del flusso di dati responsabili dell’importazione di dati dall’origine in Platform. La risposta include le specifiche di flusso univoche id
necessario per creare un nuovo flusso di dati.
{
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"name": "CloudStorageToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"attributes": {
"isSourceFlow": true,
"flacValidationSupported": true,
"frequency": "batch",
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"sourceConnectionSpecIds": [
"b3ba5556-48be-44b7-8b85-ff2b69b46dc4",
"ecadc60c-7455-4d87-84dc-2a0e293d997b",
"b7829c2f-2eb0-4f49-a6ee-55e33008b629",
"4c10e202-c428-4796-9208-5f1f5732b1cf",
"fb2e94c9-c031-467d-8103-6bd6e0a432f2",
"32e8f412-cdf7-464c-9885-78184cb113fd",
"b7bf2577-4520-42c9-bae9-cad01560f7bc",
"998b8ae3-cec0-43b7-8abe-40b1eb4ee069",
"be5ec48c-5b78-49d5-b8fa-7c89ec4569b8",
"54e221aa-d342-4707-bcff-7a4bceef0001",
"c85f9425-fb21-426c-ad0b-405e9bd8a46c",
"26f526f2-58f4-4712-961d-e41bf1ccc0e8"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
},
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"transformationSpec": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"runSpec": {
"name": "ProviderParams",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for creating flow run.",
"properties": {
"startTime": {
"type": "integer",
"description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
},
"windowStartTime": {
"type": "integer",
"description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"windowEndTime": {
"type": "integer",
"description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
}
},
"required": [
"startTime",
"windowStartTime",
"windowEndTime"
]
}
}
}
L’ultimo passaggio per la raccolta dei dati di archiviazione cloud è la creazione di un flusso di dati. A questo punto sono stati preparati i seguenti valori obbligatori:
Un flusso di dati è responsabile della pianificazione e della raccolta di dati da un’origine. Puoi creare un flusso di dati eseguendo una richiesta POST e fornendo i valori precedentemente menzionati all’interno del payload.
Per l’acquisizione in batch, ogni flusso di dati successivo seleziona i file da acquisire dall’origine in base ai ultima modifica timestamp. Ciò significa che i flussi di dati batch selezionano dall’origine i file che sono nuovi o che sono stati modificati dall’ultima esecuzione del flusso di dati.
Per pianificare un’acquisizione, devi prima impostare il valore dell’ora di inizio su tempo epoca in secondi. Quindi, è necessario impostare il valore della frequenza su una delle cinque opzioni seguenti: once
, minute
, hour
, day
, o week
. Il valore di intervallo indica il periodo tra due acquisizioni consecutive e la creazione di un’acquisizione una tantum non richiede l’impostazione di un intervallo. Per tutte le altre frequenze, il valore dell'intervallo deve essere impostato su uguale o maggiore di 15
.
Si consiglia vivamente di pianificare il flusso di dati per l’acquisizione unica quando si utilizza Connettore FTP.
Formato API
POST /flows
Richiesta
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Platform",
"description": "Cloud Storage flow to Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
Proprietà | Descrizione |
---|---|
flowSpec.id |
Il ID specifica di flusso recuperato nel passaggio precedente. |
sourceConnectionIds |
Il ID connessione sorgente recuperato in un passaggio precedente. |
targetConnectionIds |
Il ID connessione di destinazione recuperato in un passaggio precedente. |
transformations.params.mappingId |
Il ID mappatura recuperato in un passaggio precedente. |
scheduleParams.startTime |
L’ora di inizio del flusso di dati in tempo epoca. |
scheduleParams.frequency |
La frequenza con cui il flusso di dati raccoglierà i dati. I valori accettabili includono: once , minute , hour , day , o week . |
scheduleParams.interval |
L’intervallo indica il periodo tra due esecuzioni consecutive del flusso. Il valore dell'intervallo deve essere un numero intero diverso da zero. Intervallo non richiesto quando la frequenza è impostata come once e deve essere maggiore o uguale a 15 per altri valori di frequenza. |
Risposta
In caso di esito positivo, la risposta restituisce l’ID (id
) del flusso di dati appena creato.
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
Una volta creato il flusso di dati, puoi monitorare i dati che vengono acquisiti tramite di esso per visualizzare informazioni sulle esecuzioni del flusso, sullo stato di completamento e sugli errori. Per ulteriori informazioni su come monitorare i flussi di dati, consulta l’esercitazione su monitoraggio dei flussi di dati nell’API
Seguendo questa esercitazione, hai creato un connettore di origine per raccogliere i dati dall’archiviazione cloud in base a una pianificazione. I dati in arrivo possono ora essere utilizzati da servizi Platform a valle come Real-Time Customer Profile e Data Science Workspace. Per ulteriori informazioni, consulta i seguenti documenti:
Nella sezione seguente sono elencati i diversi connettori di origine dell’archiviazione cloud e le relative specifiche di connessione.
Nome connettore | Specifica di connessione |
---|---|
Amazon S3 (S3) | ecadc60c-7455-4d87-84dc-2a0e293d997b |
Amazon Kinesis (Kinesis) | 86043421-563b-46ec-8e6c-e23184711bf6 |
Azure Blob (Blob) | 4c10e202-c428-4796-9208-5f1f5732b1cf |
Azure Data Lake Storage Gen2 (ADLS Gen2) | b3ba5556-48be-44b7-8b85-ff2b69b46dc4 |
Azure Event Hubs (Hub eventi) | bf9f5905-92b7-48bf-bf20-455bc6b60a4e |
Azure File Storage | be5ec48c-5b78-49d5-b8fa-7c89ec4569b8 |
Google Cloud Storage | 32e8f412-cdf7-464c-9885-78184cb113fd |
HDFS | 54e221aa-d342-4707-bcff-7a4bceef0001 |
Oracle Object Storage | c85f9425-fb21-426c-ad0b-405e9bd8a46c |
SFTP | bf367b0d-3d9b-4060-b67b-0d3d9bd06094 |