批量摄取API概述
Adobe Experience Platform批量摄取API允许您将数据作为批处理文件接入Platform。 所摄取的数据可以是平面文件(如Parquet文件)的配置文件数据,也可以是与Experience Data Model (XDM)注册表中的已知架构相符的数据。
批次摄取API引用提供了有关这些API调用的其他信息。
下图概述了批量摄取流程:
快速入门
本指南中使用的API端点是批次摄取API的一部分。 在继续之前,请查看快速入门指南,以获取相关文档的链接、阅读本文档中示例API调用的指南,以及有关成功调用任何Experience PlatformAPI所需的所需标头的重要信息。
Data Ingestion先决条件
- 要上传的数据必须为Parquet或JSON格式。
- 在Catalog services中创建了一个数据集。
- Parquet文件的内容必须与要上传到的数据集的架构子集匹配。
- 在身份验证后具有您的唯一访问令牌。
批量摄取最佳实践
- 建议的批处理大小在256 MB和100 GB之间。
- 每个批次应最多包含1500个文件。
批量摄取约束
批量数据摄取有一些限制:
- 每批次的最大文件数:1500
- 最大批次大小:100 GB
- 每行属性或字段的最大数量:10000
- 每个用户每分钟数据湖的最大批次数:2000
类型
摄取数据时,了解Experience Data Model (XDM)架构的工作方式很重要。 有关XDM字段类型如何映射到不同格式的详细信息,请阅读架构注册开发人员指南。
在摄取数据时有一定的灵活性 — 如果类型与目标架构中的类型不匹配,数据将转换为表达的目标类型。 如果失败,它将失败TypeCompatibilityException
的批处理。
例如,JSON和CSV都不具有date
或date-time
类型。 因此,这些值使用ISO 8601格式字符串("2018-07-10T15:05:59.000-08:00")或以毫秒(1531263959000)为单位的Unix时间格式表示,并在摄取时转换为目标XDM类型。
下表显示了摄取数据时支持的转化。
使用 API
Data Ingestion API允许您通过三个基本步骤将数据作为批次(由一个或多个要作为单个单元摄取的文件组成的数据单元)摄取到Experience Platform中:
- 创建新批次。
- 将文件上传到与数据的XDM架构匹配的指定数据集。
- 表示批次的结束。
创建批次
将数据添加到数据集之前,必须将其链接到批次,稍后将这些批次上传到指定的数据集。
POST /batches
请求
curl -X POST "https://platform.adobe.io/data/foundation/import/batches" \
-H "Content-Type: application/json" \
-H "x-gw-ims-org-id: {ORG_ID}" \
-H "x-sandbox-name: {SANDBOX_NAME}" \
-H "Authorization: Bearer {ACCESS_TOKEN}" \
-H "x-api-key: {API_KEY}"
-d '{
"datasetId": "{DATASET_ID}"
}'
datasetId
响应
{
"id": "{BATCH_ID}",
"imsOrg": "{ORG_ID}",
"updated": 0,
"status": "loading",
"created": 0,
"relatedObjects": [
{
"type": "dataSet",
"id": "{DATASET_ID}"
}
],
"version": "1.0.0",
"tags": {},
"createdUser": "{USER_ID}",
"updatedUser": "{USER_ID}"
}
id
relatedObjects.id
文件上传
成功创建新的上传批次后,可以将文件上传到特定数据集。
您可以使用小文件上传API上传文件。 但是,如果文件过大,并且超过了网关限制(例如扩展超时、请求超出正文大小以及其他限制),则可以切换到大文件上传API。 此API以块形式上传文件,并使用大文件上传完成API调用将数据拼合在一起。
小文件上传
创建批次后,可将数据上传到预先存在的数据集。 要上传的文件必须与其引用的XDM架构匹配。
PUT /batches/{BATCH_ID}/datasets/{DATASET_ID}/files/{FILE_NAME}
{BATCH_ID}
{DATASET_ID}
{FILE_NAME}
请求
curl -X PUT "https://platform.adobe.io/data/foundation/import/batches/{BATCH_ID}/datasets/{DATASET_ID}/files/{FILE_NAME}.parquet" \
-H "content-type: application/octet-stream" \
-H "x-gw-ims-org-id: {ORG_ID}" \
-H "x-sandbox-name: {SANDBOX_NAME}" \
-H "Authorization: Bearer {ACCESS_TOKEN}" \
-H "x-api-key: {API_KEY}" \
--data-binary "@{FILE_PATH_AND_NAME}.parquet"
{FILE_PATH_AND_NAME}
响应
#Status 200 OK, with empty response body
大文件上传 — 创建文件
要上载大文件,必须将文件拆分为较小的块,并一次上载一个。
POST /batches/{BATCH_ID}/datasets/{DATASET_ID}/files/{FILE_NAME}?action=initialize
{BATCH_ID}
{DATASET_ID}
{FILE_NAME}
请求
curl -X POST "https://platform.adobe.io/data/foundation/import/batches/{BATCH_ID}/datasets/{DATASET_ID}/files/part1=a/part2=b/{FILE_NAME}.parquet?action=initialize" \
-H "x-gw-ims-org-id: {ORG_ID}" \
-H "x-sandbox-name: {SANDBOX_NAME}" \
-H "Authorization: Bearer {ACCESS_TOKEN}" \
-H "x-api-key: {API_KEY}"
响应
#Status 201 CREATED, with empty response body
大文件上传 — 上传后续部分
创建文件后,可以通过重复的PATCH请求(文件的每个部分各一个)上传所有后续的块。
PATCH /batches/{BATCH_ID}/datasets/{DATASET_ID}/files/{FILE_NAME}
{BATCH_ID}
{DATASET_ID}
{FILE_NAME}
请求
curl -X PATCH "https://platform.adobe.io/data/foundation/import/batches/{BATCH_ID}/datasets/{DATASET_ID}/files/part1=a/part2=b/{FILE_NAME}.parquet" \
-H "content-type: application/octet-stream" \
-H "x-gw-ims-org-id: {ORG_ID}" \
-H "x-sandbox-name: {SANDBOX_NAME}" \
-H "Authorization: Bearer {ACCESS_TOKEN}" \
-H "x-api-key: {API_KEY}" \
-H "Content-Range: bytes {CONTENT_RANGE}" \
--data-binary "@{FILE_PATH_AND_NAME}.parquet"
{FILE_PATH_AND_NAME}
响应
#Status 200 OK, with empty response
信号批次完成
将所有文件都上载到批处理之后,可以向批处理发出完成信号。 通过这样做,将为已完成的文件创建Catalog个DataSetFile条目,并与上面生成的批次关联。 然后,Catalog批次被标记为成功,这会触发下游流摄取可用数据。
请求
POST /batches/{BATCH_ID}?action=COMPLETE
{BATCH_ID}
curl -X POST "https://platform.adobe.io/data/foundation/import/batches/{BATCH_ID}?action=COMPLETE" \
-H "x-gw-ims-org-id: {ORG_ID}" \
-H "x-sandbox-name: {SANDBOX_NAME}" \
-H "Authorization: Bearer {ACCESS_TOKEN}" \
-H "x-api-key: {API_KEY}"
响应
#Status 200 OK, with empty response
检查批次状态
在等待将文件上载到批次时,可以检查批次的状态以查看其进度。
API格式
GET /batch/{BATCH_ID}
{BATCH_ID}
请求
curl GET "https://platform.adobe.io/data/foundation/catalog/batch/{BATCH_ID}" \
-H "Authorization: Bearer {ACCESS_TOKEN}" \
-H "x-gw-ims-org-id: {ORG_ID}" \
-H "x-sandbox-name: {SANDBOX_NAME}" \
-H "x-api-key: {API_KEY}"
响应
{
"{BATCH_ID}": {
"imsOrg": "{ORG_ID}",
"created": 1494349962314,
"createdClient": "MCDPCatalogService",
"createdUser": "{USER_ID}",
"updatedUser": "{USER_ID}",
"updated": 1494349963467,
"externalId": "{EXTERNAL_ID}",
"status": "success",
"errors": [
{
"code": "err-1494349963436"
}
],
"version": "1.0.3",
"availableDates": {
"startDate": 1337,
"endDate": 4000
},
"relatedObjects": [
{
"type": "batch",
"id": "foo_batch"
},
{
"type": "connection",
"id": "foo_connection"
},
{
"type": "connector",
"id": "foo_connector"
},
{
"type": "dataSet",
"id": "foo_dataSet"
},
{
"type": "dataSetView",
"id": "foo_dataSetView"
},
{
"type": "dataSetFile",
"id": "foo_dataSetFile"
},
{
"type": "expressionBlock",
"id": "foo_expressionBlock"
},
{
"type": "service",
"id": "foo_service"
},
{
"type": "serviceDefinition",
"id": "foo_serviceDefinition"
}
],
"metrics": {
"foo": 1337
},
"tags": {
"foo_bar": [
"stuff"
],
"bar_foo": [
"woo",
"baz"
],
"foo/bar/foo-bar": [
"weehaw",
"wee:haw"
]
},
"inputFormat": {
"format": "parquet",
"delimiter": ".",
"quote": "`",
"escape": "\\",
"nullMarker": "",
"header": "true",
"charset": "UTF-8"
}
}
}
{USER_ID}
"status"
字段显示所请求批次的当前状态。 批次可以具有以下状态之一: