[Ultimate]{class="badge positive"}

使用Flow Service API流式传输Snowflake数据以Experience Platform

IMPORTANT
已购买Real-time Customer Data Platform Ultimate的用户可在API中使用Snowflake流源。

本教程提供了有关如何使用Flow Service API将数据从Snowflake帐户连接并流式传输到Adobe Experience Platform的步骤。

快速入门

本指南要求您对 Adobe Experience Platform 的以下组件有一定了解:

  • : Experience Platform允许从各种源摄取数据,同时允许您使用Platform服务来构建、标记和增强传入数据。
  • 沙盒: Experience Platform提供将单个Platform实例划分为单独虚拟环境的虚拟沙盒,以帮助开发和改进数字体验应用程序。

有关Snowflake流源的先决条件设置和信息。 请阅读Snowflake 流源概述

使用平台API

有关如何成功调用平台API的信息,请参阅平台API快速入门指南。

创建基本连接 create-a-base-connection

基本连接会保留您的源和平台之间的信息,包括源的身份验证凭据、连接的当前状态以及唯一的基本连接ID。 基本连接ID允许您浏览和浏览源中的文件,并标识要摄取的特定项目,包括有关其数据类型和格式的信息。

要创建基本连接ID,请在将Snowflake身份验证凭据作为请求正文的一部分提供时,向/connections端点发出POST请求。

API格式

POST /connections

请求

以下请求为Snowflake创建基本连接:

TIP
必须完全按照以下示例输入auth.specName值,包括空格。
curl -X POST \
  'https://platform.adobe.io/data/foundation/flowservice/connections' \
  -H 'Authorization: Bearer {ACCESS_TOKEN}' \
  -H 'x-api-key: {API_KEY}' \
  -H 'x-gw-ims-org-id: {ORG_ID}' \
  -H 'x-sandbox-name: {SANDBOX_NAME}' \
  -H 'Content-Type: application/json' \
  -d '{
      "name": "Snowflake base connection",
      "description": "Snowflake base connection",
      "auth": {
          "specName": "Basic Authentication for Snowflake",
          "params": {
              "account": "wixnnnd-ui60793.snowflakecomputing.com",
              "database": "ACME_DB",
              "warehouse": "ACME_WH",
              "username": "nikola15",
              "schema": "PUBLIC",
              "password": "xxxx",
              "role": "ACCOUNTADMIN"
          }
      },
      "connectionSpec": {
          "id": "51ae16c2-bdad-42fd-9fce-8d5dfddaf140",
          "version": "1.0"
      }
  }'
属性
描述
auth.params.account
您的Snowflake流帐户的名称。
auth.params.database
将从其中提取数据的Snowflake数据库的名称。
auth.params.warehouse
Snowflake仓库的名称。 Snowflake仓库管理应用程序的查询执行过程。 每个仓库彼此独立,在将数据传送到Platform时必须单独访问。
auth.params.username
您的Snowflake流帐户的用户名。
auth.params.schema
(可选)与您的Snowflake流帐户关联的数据库架构。
auth.params.password
您的Snowflake流帐户的密码。
auth.params.role
(可选)此Snowflake连接的用户角色。 如果未提供,此值默认为public
connectionSpec.id
Snowflake连接规范ID: 51ae16c2-bdad-42fd-9fce-8d5dfddaf140

响应

成功的响应将返回新创建的基本连接及其对应的电子标记。

{
    "id": "1b614dc0-b76e-41e1-b25f-09f4a9d3f111",
    "etag": "\"d300cf4e-0000-0200-0000-6447a7750000\""
}

浏览您的数据表 explore-your-data-tables

接下来,使用基本连接ID来浏览和导航源的数据表,方法是在提供基本连接ID作为参数的同时,向/connections/{BASE_CONNECTION_ID}/explore?objectType=root端点发出GET请求。

API格式

GET /connections/{BASE_CONNECTION_ID}/explore?objectType=root
参数
描述
{BASE_CONNECTION_ID}
Snowflake流源的基本连接ID。

请求

以下请求检索您的Snowflake流帐户的结构和内容。

curl -X GET \
  'https://platform.adobe.io/data/foundation/flowservice/connections/1b614dc0-b76e-41e1-b25f-09f4a9d3f111/explore?objectType=root' \
  -H 'Authorization: Bearer {ACCESS_TOKEN}' \
  -H 'x-api-key: {API_KEY}' \
  -H 'x-gw-ims-org-id: {ORG_ID}' \
  -H 'x-sandbox-name: {SANDBOX_NAME}'

响应

成功的响应将在根级别返回源数据的结构和内容。

{
    "items": [
        {
            "type": "table",
            "name": "ACME"
        }
    ]
}
属性
描述
items.type
表的类型。
items.names
表的名称。

创建源连接 create-a-source-connection

源连接创建和管理与摄取数据的外部源的连接。

要创建源连接,请向Flow Service API的/sourceConnections端点发出POST请求。

API格式

POST /sourceConnections

请求

curl -X POST \
  'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
  -H 'authorization: Bearer {ACCESS_TOKEN}' \
  -H 'content-type: application/json' \
  -H 'x-api-key: {API_KEY}' \
  -H 'x-gw-ims-org-id: {ORG_ID}' \
  -H 'x-sandbox-name: {SANDBOX_NAME}' \
  -d '{
      "name": "Snowflake Streaming Source Connection",
      "description": "A source connection for Snowflake Streaming data",
      "baseConnectionId": "1b614dc0-b76e-41e1-b25f-09f4a9d3f111",
      "connectionSpec": {
          "id": "51ae16c2-bdad-42fd-9fce-8d5dfddaf140",
          "version": "1.0"
      },
      "params": {
          "tableName": "ACME",
          "timestampColumn": "dOb",
          "backfill": "true",
          "timezoneValue": "PST"
      }
  }'
属性
描述
baseConnectionId
Snowflake流源的已验证基本连接ID。 此ID是在前面的步骤中生成的。
connectionSpec.id
Snowflake流源的连接规范ID。
params.tableName
Snowflake数据库中要带到Platform的表名称。
params.timestampColumn
用于获取增量值的时间戳列的名称。
params.backfill
一个布尔标志,用于确定是从开始(0纪元时间)还是从启动源时获取数据。 有关此值的详细信息,请阅读Snowflake 流源概述
params.timezoneValue
时区值指示在查询Snowflake数据库时应获取哪个时区的当前时间。 如果配置中的时间戳列设置为TIMESTAMP_NTZ,则应提供此参数。 如果未提供,timezoneValue将默认为UTC。

响应

成功的响应将返回您的源连接ID及其对应的电子标记。 源连接ID将在后面的步骤中用于创建数据流。

{
    "id": "61c0c5f1-bfe5-40f7-8f8c-a4dc175ddac6",
    "etag": "\"d300cf4e-0000-0200-0000-6447a7750000\""
}

创建数据流

要创建数据流以将数据从导览Snowflake帐户流式传输到Platform,您必须在提供以下值时向/flows端点发出POST请求:

TIP
有关如何检索以下ID的分步指南,请访问下面的链接。

API格式

POST /flows

请求

以下请求为您的Snowflake帐户创建流式数据流。

curl -X POST \
  'https://platform.adobe.io/data/foundation/flowservice/flows' \
  -H 'x-api-key: {API_KEY}' \
  -H 'x-gw-ims-org-id: {ORG_ID}' \
  -H 'x-sandbox-name: {SANDBOX_NAME}' \
  -H 'Content-Type: application/json' \
  -d '{
      "name": "Snowflake Streaming Dataflow",
      "description": "A dataflow for Snowflake streaming data",
      "sourceConnectionIds": [
        "61c0c5f1-bfe5-40f7-8f8c-a4dc175ddac6"
      ],
      "targetConnectionIds": [
        "78f41c31-3652-4a5e-b264-74331226dcf3"
      ],
      "flowSpec": {
        "id": "c1a19761-d2c7-4702-b9fa-fe91f0613e81",
        "version": "1.0"
      },
      "transformations": [
        {
          "name": "Mapping",
          "params": {
            "mappingId": "44d42ed27c46499a80eb0c0705c38cbd",
            "mappingVersion": 0
          }
        }
      ]
    }'
属性
描述
sourceConnectionIds
您的Snowflake流源的源连接ID。
targetConnectionIds
Snowflake流源的目标连接ID。
flowSpec.id
用于为Snowflake流源创建数据流的流规范ID。 此流规范ID允许您通过映射转换创建流数据流。 此ID是固定的,为: c1a19761-d2c7-4702-b9fa-fe91f0613e81
transformations.params.mappingId
数据流的映射ID。

响应

成功的响应将返回您的流ID及其对应的电子标记。

{
    "id": "2edc08ac-4df5-4fe6-936f-81a19ce92f5c",
    "etag": "\"770029f8-0000-0200-0000-6019e7d40000\""
}

后续步骤

通过完成本教程,您已使用Flow Service API为Snowflake数据创建流式数据流。 有关Adobe Experience Platform源的其他信息,请访问以下文档:

recommendation-more-help
337b99bb-92fb-42ae-b6b7-c7042161d089