本教程介绍了以下步骤:从云存储源检索数据,并使用将这些数据引入平台 Flow Service API.
要创建数据流,您必须已拥有一个与云存储源的有效基本连接ID。 如果您没有此ID,请查看 源概述 以获取可创建基本连接的云存储源列表。
本教程要求您实际了解Adobe Experience Platform的以下组件:
{TENANT_ID}
、“容器”的概念以及发出请求所需的标头(请特别注意“接受”标头及其可能的值)。有关如何成功调用Platform API的信息,请参阅 Platform API快速入门.
您可以通过对以下对象发出POST请求来创建源连接: sourceConnections
端点 Flow Service 提供基本连接ID时的API、要摄取的源文件的路径以及源对应的连接规范ID。
创建源连接时,还必须为数据格式属性定义一个枚举值。
为基于文件的源使用以下枚举值:
数据格式 | 枚举值 |
---|---|
已分隔 | delimited |
JSON | json |
Parquet | parquet |
对于所有基于表的源,将值设置为 tabular
.
API格式
POST /sourceConnections
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}",
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
属性 | 描述 |
---|---|
baseConnectionId |
云存储源的基本连接ID。 |
data.format |
您要带到Platform的数据的格式。 支持的值包括: delimited , JSON 、和 parquet . |
data.properties |
(可选)可在创建源连接时应用于数据的一组属性。 |
data.properties.columnDelimiter |
(可选)收集平面文件时可指定的单个字符列分隔符。 任何单个字符值都是允许的列分隔符。 如果未提供,请使用逗号(, )用作默认值。 注释:此 columnDelimiter 属性只能在摄取分隔文件时使用。 |
data.properties.encoding |
(可选)一个属性,定义将数据摄取到Platform时使用的编码类型。 支持的编码类型包括: UTF-8 和 ISO-8859-1 . 注释:此 encoding 参数仅在摄取分隔的CSV文件时可用。 其他文件类型将使用默认编码摄取。 UTF-8 . |
data.properties.compressionType |
(可选)一个属性,用于定义摄取的压缩文件类型。 支持的压缩文件类型为: bzip2 , gzip , deflate , zipDeflate , tarGzip 、和 tar . 注释:此 compressionType 属性只能在引入分隔文件或JSON文件时使用。 |
params.path |
您正在访问的源文件的路径。 此参数指向单个文件或整个文件夹。 注释:您可以使用星号代替文件名来指定整个文件夹的摄取。 例如: /acme/summerCampaign/*.csv 将摄取整个 /acme/summerCampaign/ 文件夹。 |
params.type |
要摄取的源数据文件的文件类型。 使用类型 file 摄取单个文件并使用类型 folder 摄取整个文件夹。 |
connectionSpec.id |
与特定云存储源关联的连接规范ID。 请参阅 附录 以获取连接规范ID的列表。 |
响应
成功响应将返回唯一标识符(id
)。 此ID在后续步骤中是创建数据流所必需的。
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
创建源连接时,可以使用正则表达式将源中的一组特定文件摄取到Platform。
API格式
POST /sourceConnections
请求
在以下示例中,在文件路径中使用正则表达式,以指定摄取具有 premium
以他们的名义。
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/*premium*.csv",
"type": "folder"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
创建源连接时,您可以使用 recursive
用于从深度嵌套文件夹摄取数据的参数。
API格式
POST /sourceConnections
请求
在以下示例中, recursive: true
参数信息 Flow Service 用于在摄取过程中递归读取所有子文件夹。
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source with recursive ingestion",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/customers/premium/buyers/recursive",
"type": "folder",
"recursive": true
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
为了在Platform中使用源数据,必须创建一个目标架构,以根据您的需求构建源数据。 然后,使用目标架构创建包含源数据的Platform数据集。
可以通过向以下对象执行POST请求来创建目标XDM架构 架构注册表API.
有关如何创建目标XDM架构的详细步骤,请参阅关于的教程 使用API创建架构.
可以通过向执行POST请求来创建目标数据集 目录服务API,在有效负载中提供目标架构的ID。
有关如何创建目标数据集的详细步骤,请参阅关于的教程 使用API创建数据集.
目标连接表示与所摄取数据所登陆的目标之间的连接。 要创建目标连接,您必须提供与Data Lake关联的固定连接规范ID。 此连接规范ID为: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
现在,您拥有目标架构、目标数据集和到数据湖的连接规范ID的唯一标识符。 使用这些标识符,您可以使用 Flow Service 用于指定将包含入站源数据的数据集的API。
API格式
POST /targetConnections
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
属性 | 描述 |
---|---|
data.schema.id |
此 $id 目标XDM架构的。 |
data.schema.version |
架构的版本。 必须设置此值 application/vnd.adobe.xed-full+json;version=1 ,返回架构的最新次要版本。 |
params.dataSetId |
目标数据集的ID。 |
connectionSpec.id |
到数据湖的固定连接规范ID。 此ID为: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
响应
成功响应将返回新目标连接的唯一标识符(id
)。 此ID在后续步骤中是必需的。
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
为了将源数据引入目标数据集,必须首先将其映射到目标数据集所遵循的目标架构。
POST要创建映射集,请向 mappingSets
的端点 Data Prep API 提供目标XDM架构时 $id
以及要创建映射集的详细信息。
您可以使用云存储源连接器映射复杂数据类型,例如JSON文件中的阵列。
API格式
POST /conversion/mappingSets
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
属性 | 描述 |
---|---|
xdmSchema |
目标XDM架构的ID。 |
响应
成功响应将返回新创建映射的详细信息,包括其唯一标识符(id
)。 在后续步骤中需要使用此值来创建数据流。
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
数据流负责从源中收集数据,并将这些数据导入Platform。 要创建数据流,您必须首先获取负责收集云存储数据的数据流规范。
API格式
GET /flowSpecs?property=name=="CloudStorageToAEP"
请求
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
为简短起见,以下JSON响应有效负载已隐藏。 选择“有效负载”可查看响应有效负载。
响应
成功的响应将返回负责将数据从源引入平台的数据流规范的详细信息。 响应中包含唯一的流规范 id
需要才能创建新数据流。
{
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"name": "CloudStorageToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"attributes": {
"isSourceFlow": true,
"flacValidationSupported": true,
"frequency": "batch",
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"sourceConnectionSpecIds": [
"b3ba5556-48be-44b7-8b85-ff2b69b46dc4",
"ecadc60c-7455-4d87-84dc-2a0e293d997b",
"b7829c2f-2eb0-4f49-a6ee-55e33008b629",
"4c10e202-c428-4796-9208-5f1f5732b1cf",
"fb2e94c9-c031-467d-8103-6bd6e0a432f2",
"32e8f412-cdf7-464c-9885-78184cb113fd",
"b7bf2577-4520-42c9-bae9-cad01560f7bc",
"998b8ae3-cec0-43b7-8abe-40b1eb4ee069",
"be5ec48c-5b78-49d5-b8fa-7c89ec4569b8",
"54e221aa-d342-4707-bcff-7a4bceef0001",
"c85f9425-fb21-426c-ad0b-405e9bd8a46c",
"26f526f2-58f4-4712-961d-e41bf1ccc0e8"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
},
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"transformationSpec": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"runSpec": {
"name": "ProviderParams",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for creating flow run.",
"properties": {
"startTime": {
"type": "integer",
"description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
},
"windowStartTime": {
"type": "integer",
"description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"windowEndTime": {
"type": "integer",
"description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
}
},
"required": [
"startTime",
"windowStartTime",
"windowEndTime"
]
}
}
}
收集云存储数据的最后一步是创建数据流。 现在,您已准备以下必需值:
数据流负责从源中计划和收集数据。 您可以通过在有效负载中提供上述值时执行POST请求来创建数据流。
对于批量摄取,每个后续数据流都会根据文件本身的 上次修改时间 时间戳。 这意味着批处理数据流从源中选择自上次数据流运行以来新增或经过修改的文件。
要计划摄取,您必须先将开始时间值设置为纪元时间(以秒为单位)。 然后,必须将频率值设置为五个选项之一: once
, minute
, hour
, day
,或 week
. 间隔值用于指定两次连续摄取之间的时间段,创建一次性摄取不需要设置间隔。 对于所有其他频率,间隔值必须设置为等于或大于 15
.
强烈建议在使用时安排数据流的一次性引入 FTP连接器.
API格式
POST /flows
请求
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Platform",
"description": "Cloud Storage flow to Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
属性 | 描述 |
---|---|
flowSpec.id |
此 流量规范ID 在上一步中检索。 |
sourceConnectionIds |
此 源连接ID 在之前的步骤中检索。 |
targetConnectionIds |
此 目标连接Id 在之前的步骤中检索。 |
transformations.params.mappingId |
此 映射Id 在之前的步骤中检索。 |
scheduleParams.startTime |
数据流的开始时间(以Epoch时间表示)。 |
scheduleParams.frequency |
数据流收集数据的频率。 可接受的值包括: once , minute , hour , day ,或 week . |
scheduleParams.interval |
间隔指定两次连续流运行之间的周期。 间隔值应为非零整数。 当频率设置为时,不需要间隔 once 和应大于或等于 15 其他频率值。 |
响应
成功的响应会返回ID (id
)。
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
创建数据流后,您可以监视通过它摄取的数据,以查看有关流运行、完成状态和错误的信息。 有关如何监视数据流的更多信息,请参阅关于的教程 监测API中的数据流
在本教程之后,您已创建一个源连接器,以按计划从云存储中收集数据。 传入数据现在可供下游平台服务使用,例如 Real-Time Customer Profile 和 Data Science Workspace. 有关更多详细信息,请参阅以下文档:
以下部分列出了不同的云存储源连接器及其连接规范。
连接器名称 | 连接规范 |
---|---|
Amazon S3 (S3) | ecadc60c-7455-4d87-84dc-2a0e293d997b |
Amazon Kinesis (Kinesis) | 86043421-563b-46ec-8e6c-e23184711bf6 |
Azure Blob (Blob) | 4c10e202-c428-4796-9208-5f1f5732b1cf |
Azure Data Lake Storage Gen2 (ADLS Gen2) | b3ba5556-48be-44b7-8b85-ff2b69b46dc4 |
Azure Event Hubs (事件中心) | bf9f5905-92b7-48bf-bf20-455bc6b60a4e |
Azure File Storage | be5ec48c-5b78-49d5-b8fa-7c89ec4569b8 |
Google Cloud Storage | 32e8f412-cdf7-464c-9885-78184cb113fd |
HDFS | 54e221aa-d342-4707-bcff-7a4bceef0001 |
Oracle Object Storage | c85f9425-fb21-426c-ad0b-405e9bd8a46c |
SFTP | bf367b0d-3d9b-4060-b67b-0d3d9bd06094 |