[ベータ版]{class="badge informative"} [Ultimate]{class="badge positive"}
Flow Service APISnowflake 使用したExperience Platformへのデータのストリーミング
- Snowflake ストリーミングソースはベータ版です。 ベータラベル付きソースの使用について詳しくは、 ソースの概要を参照してください。
- Real-time Customer Data Platform Ultimate を購入したユーザーは、API で Snowflake ストリーミングソースを利用できます。
このチュートリアルでは、[Flow Service API] (https://www.adobe.io/experience-platform-apis/references/flow-service/) を使用して、Snowflake アカウントからAdobe Experience Platformにデータを接続してストリーミングする手順について説明します。
はじめに
このガイドでは、Adobe Experience Platform の次のコンポーネントに関する十分な知識が必要です。
前提条件として、Snowflake ストリーミングソースに関する設定と情報を行います。 Snowflake ストリーミングソースの概要を参照してください。
Platform API の使用
Platform API への呼び出しを正常に実行する方法について詳しくは、Platform API の概要を参照してください。
ベース接続の作成 create-a-base-connection
ベース接続は、ソースと Platform 間の情報(ソースの認証資格情報、現在の接続状態、固有のベース接続 ID など)を保持します。ベース接続 ID により、ソース内からファイルを参照および移動し、データタイプやフォーマットに関する情報を含む、取り込みたい特定の項目を識別することができます。
ベース接続 ID を作成するには、/connections
エンドポイントに対してPOSTリクエストを実行し、その際にリクエスト本文の一部として Snowflake 認証資格情報を指定します。
API 形式
POST /connections
リクエスト
次のリクエストは、Snowflake のベース接続を作成します。
auth.specName
の値は、空白を含め、以下の例のように正確に入力する必要があります。curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/connections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Snowflake base connection",
"description": "Snowflake base connection",
"auth": {
"specName": "Basic Authentication for Snowflake",
"params": {
"account": "wixnnnd-ui60793.snowflakecomputing.com",
"database": "ACME_DB",
"warehouse": "ACME_WH",
"username": "nikola15",
"schema": "PUBLIC",
"password": "xxxx",
"role": "ACCOUNTADMIN"
}
},
"connectionSpec": {
"id": "51ae16c2-bdad-42fd-9fce-8d5dfddaf140",
"version": "1.0"
}
}'
auth.params.account
auth.params.database
auth.params.warehouse
auth.params.username
auth.params.schema
auth.params.password
auth.params.role
public
になります。connectionSpec.id
51ae16c2-bdad-42fd-9fce-8d5dfddaf140
。応答
リクエストが成功した場合は、新しく作成したベース接続と、対応する etag が返されます。
{
"id": "1b614dc0-b76e-41e1-b25f-09f4a9d3f111",
"etag": "\"d300cf4e-0000-0200-0000-6447a7750000\""
}
データテーブルの探索 explore-your-data-tables
次に、ベース接続 ID をパラメーターとして指定しながら、/connections/{BASE_CONNECTION_ID}/explore?objectType=root
エンドポイントに対してデータリクエストを行うことで、ソースのGETテーブルを探索および移動するために使用します。
API 形式
GET /connections/{BASE_CONNECTION_ID}/explore?objectType=root
{BASE_CONNECTION_ID}
リクエスト
次のリクエストでは、Snowflake ストリーミングアカウントの構造と内容を取得します。
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/connections/1b614dc0-b76e-41e1-b25f-09f4a9d3f111/explore?objectType=root' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
応答
応答が成功すると、ルートレベルでソースのデータの構造と内容が返されます。
{
"items": [
{
"type": "table",
"name": "ACME"
}
]
}
items.type
items.names
ソース接続の作成 create-a-source-connection
ソース接続は、データの取り込み元となる外部ソースへの接続を作成および管理します。
ソース接続を作成するには、Flow Service API の /sourceConnections
エンドポイントに POST リクエストを実行します。
API 形式
POST /sourceConnections
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'authorization: Bearer {ACCESS_TOKEN}' \
-H 'content-type: application/json' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-d '{
"name": "Snowflake Streaming Source Connection",
"description": "A source connection for Snowflake Streaming data",
"baseConnectionId": "1b614dc0-b76e-41e1-b25f-09f4a9d3f111",
"connectionSpec": {
"id": "51ae16c2-bdad-42fd-9fce-8d5dfddaf140",
"version": "1.0"
},
"params": {
"tableName": "ACME",
"timestampColumn": "dOb",
"backfill": "true",
"timezoneValue": "PST"
}
}'
baseConnectionId
connectionSpec.id
params.tableName
params.timestampColumn
params.backfill
params.timezoneValue
TIMESTAMP_NTZ
に設定されている場合は、このパラメーターを指定する必要があります。 指定しない場合、デフォルト timezoneValue
UTC になります。応答
正常な応答は、ソース接続 ID と対応する etag を返します。 このソース接続 ID は、後の手順でデータフローを作成する際に使用します。
{
"id": "61c0c5f1-bfe5-40f7-8f8c-a4dc175ddac6",
"etag": "\"d300cf4e-0000-0200-0000-6447a7750000\""
}
データフローの作成
ツアー Snowflake ーザーアカウントから Platform にデータをストリーミングするデータフローを作成するには、次の値を指定したうえで、/flows
エンドポイントに対してPOSTリクエストを行う必要があります。
API 形式
POST /flows
リクエスト
次のリクエストは、Snowflake アカウントのストリーミングデータフローを作成します。
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Snowflake Streaming Dataflow",
"description": "A dataflow for Snowflake streaming data",
"sourceConnectionIds": [
"61c0c5f1-bfe5-40f7-8f8c-a4dc175ddac6"
],
"targetConnectionIds": [
"78f41c31-3652-4a5e-b264-74331226dcf3"
],
"flowSpec": {
"id": "c1a19761-d2c7-4702-b9fa-fe91f0613e81",
"version": "1.0"
},
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "44d42ed27c46499a80eb0c0705c38cbd",
"mappingVersion": 0
}
}
]
}'
sourceConnectionIds
targetConnectionIds
flowSpec.id
c1a19761-d2c7-4702-b9fa-fe91f0613e81
です。transformations.params.mappingId
応答
正常な応答は、フロー ID と対応する etag を返します。
{
"id": "2edc08ac-4df5-4fe6-936f-81a19ce92f5c",
"etag": "\"770029f8-0000-0200-0000-6019e7d40000\""
}
次の手順
このチュートリアルでは、Flow Service API を使用して Snowflake データのストリーミングデータフローを作成しました。 Adobe Experience Platform ソースについて詳しくは、次のドキュメントを参照してください。