Jupyterlab笔记本中的数据访问

每个受支持的内核都提供内置功能,允许您从笔记本内的数据集中读取平台数据。 目前,Adobe Experience Platform Data Science Workspace中的JupyterLab支持Python、R、PySpark和Scala的笔记本。 但是,对分页数据的支持仅限于Python和R笔记本。 本指南重点介绍如何使用JupyterLab笔记本访问数据。

快速入门

在阅读本指南之前,请查阅JupyterLab 用户指南 ,以了解JupyterLab及其在数据科学工作区中的角色的高级简介。

笔记本数据限制

重要

对于PySpark和Scala笔记本,如果您收到错误,原因是“远程RPC客户端不相关”。 这通常意味着驱动程序或执行器内存不足。 尝试切换到"batch"模式以解决此错误。

以下信息定义可读取的最大数据量、使用的数据类型以及读取数据所花费的估计时间范围。

对于Python和R,使用配置为40GB RAM的笔记本服务器作为基准。 对于PySpark和Scala ,以64GB RAM、8个内核、2个DBU(最多4个工作程序)配置的数据库群集用于下面概述的基准。

使用的ExperienceEvent架构数据的大小从1000(1K)行(范围最多为10亿(1B)行)开始。 请注意,对于PySpark和Spark量度,XDM数据的日期范围为10天。

使用Query Service选择创建表(CTAS)预处理了临时架构数据。 此数据的大小也从1,000行(1K)(范围高达10亿(1B)行)开始。

何时使用批处理模式与交互模式

在读取包含PySpark和Scala笔记本的数据集时,您可以选择使用交互模式或批处理模式来读取数据集。 交互式用于获取快速结果,而批量模式用于大数据集。

  • 对于PySpark和Scala笔记本,在读取500万行或更多数据时,应使用批处理模式。 有关每种模式效率的更多信息,请参阅下面的PySparkScala数据限制表。

Python 笔记本数据限制

XDM ExperienceEvent架构: 您应该能够在不到22分钟的时间内读取XDM数据的最多200万行(磁盘上的~6.1 GB数据)。添加其他行可能会导致错误。

行数 1K 10K 100K 1M 2米
磁盘大小(MB) 18.73 187.5 308 3000 6050
SDK(以秒为单位) 20.3 86.8 63 659 1315

临时架构: 您应该能够在不到14分钟的时间内读取最多500万行非XDM(临时)数据(在磁盘上约有5.6 GB数据)。添加其他行可能会导致错误。

行数 1K 1万 10万 1M 2米 3M 5米
磁盘大小(MB) 1.21 11.72 115 1120 2250 3380 5630
SDK(以秒为单位) 7.27 9.04 27.3 180 346 487 819

R笔记本数据限制

XDM ExperienceEvent架构: 您应该能够在13分钟内读取最多100万行XDM数据(磁盘上的3GB数据)。

行数 1K 1万 10万 1M
磁盘大小(MB) 18.73 187.5 308 3000
R内核(以秒为单位) 14.03 69.6 86.8 775

临时架构: 您应该能够在大约10分钟内读取最多300万行临时数据(磁盘上有293MB数据)。

行数 1K 1万 10万 1M 2米 3M
磁盘大小(MB) 0.082 0.612 9.0 91 188 293
R SDK(秒) 7.7 4.58 35.9 233 470.5 603

PySpark(Python内核)笔记本数据限制:

XDM ExperienceEvent架构: 在交互模式下,您应该能够在大约20分钟内读取XDM数据的最多500万行(磁盘上的数据约为13.42GB)。交互式模式最多仅支持500万行。 如果您想要读取较大的数据集,建议您切换到批处理模式。 在批处理模式下,您应该能够在大约14小时内读取XDM数据的最多5亿行(磁盘上的数据约为1.31TB)。

行数 1K 1万 10万 1M 2米 3M 5米 10米 50M 1亿 5亿
磁盘大小 2.93兆字节 4.38兆字节 29.02 2.69 GB 5.39 GB 8.09 GB 13.42 GB 26.82 GB 134.24 GB 268.39 GB 1.31太字节
SDK(交互模式) 33秒 32.4秒 55.1秒 253.5秒 489.2秒 729.6秒 1206.8秒 - - - -
SDK(批处理模式) 815.8秒 492.8秒 379.1秒 637.4秒 624.5秒 869.2秒 1104.1秒 1786年代 5387.2秒 10624.6秒 50547s

临时架构: 在交互模式下,您应该能够在不到3分钟的时间内读取非XDM数据的最多500万行(磁盘上约有5.36GB数据)。在批处理模式下,您应该能够在大约18分钟内读取非XDM数据的最多10亿行(磁盘上的数据约为1.05TB)。

行数 1K 1万 10万 1M 2米 3M 5米 10米 50M 1亿 5亿 1B
磁盘大小 1.12兆字节 11.24兆字节 109.48兆字节 2.69 GB 2.14 GB 3.21 GB 5.36 GB 10.71 GB 53.58 GB 107.52 GB 535.88 GB 1.05太字节
SDK交互模式(以秒为单位) 28.2秒 18.6秒 20.8秒 20.9秒 23.8秒 21.7秒 24.7秒 - - - - -
SDK批量模式(以秒为单位) 428.8秒 578.8秒 641.4秒 538.5秒 630.9秒 467.3秒 411秒 675秒 702秒 719.2秒 1022.1秒 1122.3秒

Spark (Scala内核)笔记本数据限制:

XDM ExperienceEvent架构: 在交互模式下,您应该能够在大约18分钟内读取XDM数据的最多500万行(磁盘上的数据约为13.42GB)。交互式模式最多仅支持500万行。 如果您想要读取较大的数据集,建议您切换到批处理模式。 在批处理模式下,您应该能够在大约14小时内读取XDM数据的最多5亿行(磁盘上的数据约为1.31TB)。

行数 1K 1万 10万 1M 2米 3M 5米 10米 50M 1亿 5亿
磁盘大小 2.93兆字节 4.38兆字节 29.02 2.69 GB 5.39 GB 8.09 GB 13.42 GB 26.82 GB 134.24 GB 268.39 GB 1.31太字节
SDK交互模式(以秒为单位) 37.9秒 22.7秒 45.6秒 231.7秒 444.7秒 660.6秒 1100年代 - - - -
SDK批量模式(以秒为单位) 374.4秒 398.5秒 527秒 487.9秒 588.9秒 829秒 939.1秒 1441年代 5473.2秒 10118.8 49207.6

临时架构: 在交互模式下,您最多应在3分钟内读取非XDM数据的500万行(磁盘上约有5.36GB数据)。在批处理模式下,您应该能够在大约16分钟内读取非XDM数据的最多10亿行(磁盘上的数据约为1.05TB)。

行数 1K 1万 10万 1M 2米 3M 5米 10米 50M 1亿 5亿 1B
磁盘大小 1.12兆字节 11.24兆字节 109.48兆字节 2.69 GB 2.14 GB 3.21 GB 5.36 GB 10.71 GB 53.58 GB 107.52 GB 535.88 GB 1.05太字节
SDK交互模式(以秒为单位) 35.7秒 31秒 19.5秒 25.3秒 23秒 33.2秒 25.5秒 - - - - -
SDK批量模式(以秒为单位) 448.8秒 459.7秒 519秒 475.8秒 599.9秒 347.6秒 407.8秒 397年代 518.8秒 487.9秒 760.2秒 975.4秒

Python笔记本

Python 通过笔记本,您可以在访问数据集时对数据进行分页。下面演示了用于读取数据(包括分页和不分页)的示例代码。 有关可用的Python启动程序笔记本的更多信息,请访问JupyterLab用户指南中的JupyterLab Launcher部分。

以下Python文档概述了以下概念:

从Python中的数据集读取

不分页:

执行以下代码将读取整个数据集。 如果执行成功,则数据将另存为变量df引用的Pantics数据帧。

# Python

from platform_sdk.dataset_reader import DatasetReader
dataset_reader = DatasetReader(get_platform_sdk_client_context(), dataset_id="{DATASET_ID}")
df = dataset_reader.read()
df.head()

分页:

执行以下代码将从指定的数据集中读取数据。 通过分别通过函数limit()offset()限制和偏移数据来实现分页。 限制数据是指要读取的数据点的最大数量,而偏移是指在读取数据之前要跳过的数据点的数量。 如果读取操作成功执行,则数据将另存为变量df引用的Pantics数据帧。

# Python

from platform_sdk.dataset_reader import DatasetReader

dataset_reader = DatasetReader(get_platform_sdk_client_context(), dataset_id="{DATASET_ID}")
df = dataset_reader.limit(100).offset(10).read()

在Python中写入数据集

要写入JupyterLab笔记本中的数据集,请在JupyterLab的左侧导航中选择“数据”图标选项卡(如下所示)。 此时会显示​Datasets​和​Schemas​目录。 选择​Datasets​并右键单击,然后从您要使用的数据集的下拉菜单中选择​在笔记本中写入数据​选项。 可执行代码条目显示在笔记本的底部。

  • 使用​在笔记本​中写入数据,以生成包含选定数据集的写入单元格。
  • 使用​在笔记本​中浏览数据,以生成包含您选定数据集的读取单元格。
  • 使用“笔记本​中的​查询数据”,以使用您选定的数据集生成基本查询单元格。

或者,您也可以复制并粘贴以下代码单元格。 替换{DATASET_ID}{PANDA_DATAFRAME}

from platform_sdk.models import Dataset
from platform_sdk.dataset_writer import DatasetWriter

dataset = Dataset(get_platform_sdk_client_context()).get_by_id(dataset_id="{DATASET_ID}")
dataset_writer = DatasetWriter(get_platform_sdk_client_context(), dataset)
write_tracker = dataset_writer.write({PANDA_DATAFRAME}, file_format='json')

在Python中使用Query Service查询数据

JupyterLab 在 Platform 上,允许您在笔记本中使 Python 用SQL通过Adobe Experience Platform查 询服务访问数据。通过Query Service访问数据,因其优异的运行时间而对处理大型数据集非常有用。 请注意,使用Query Service查询数据的处理时间限制为10分钟。

在JupyterLab中使用Query Service之前,请确保您对Query Service SQL语法有正确的了解。

使用Query Service查询数据时,需要提供目标数据集的名称。 您可以通过使用​Data Explorer​查找所需的数据集来生成必要的代码单元格。 右键单击数据集列表,然后单击​在笔记本中查询数据​以在笔记本中生成两个代码单元格。 下面将详细介绍这两个单元格。

要在JupyterLab中使用Query Service,必须先在工作Python笔记本和Query Service之间创建连接。 这可以通过执行第一生成的单元来实现。

qs_connect()

在第二个生成的单元格中,必须在SQL查询之前定义第一行。 默认情况下,生成的单元格定义一个可选变量(df0),该变量将查询结果保存为Pantics数据帧。
-c QS_CONNECTION 数是必选参数,并告知内核对执行SQL查询 Query Service。有关其他参数的列表,请参阅附录

%%read_sql df0 -c QS_CONNECTION
SELECT *
FROM name_of_the_dataset
LIMIT 10
/* Querying table "name_of_the_dataset" (datasetId: {DATASET_ID})*/

Python变量可以在SQL查询中直接引用,方法是使用字符串格式的语法并将变量包装在大括号({})中,如以下示例所示:

table_name = 'name_of_the_dataset'
table_columns = ','.join(['col_1','col_2','col_3'])
%%read_sql demo -c QS_CONNECTION
SELECT {table_columns}
FROM {table_name}

筛选ExperienceEvent数据

要访问和过滤Python笔记本中的ExperienceEvent数据集,您必须提供数据集({DATASET_ID})的ID,以及使用逻辑运算符定义特定时间范围的过滤器规则。 定义时间范围后,将忽略任何指定的分页,并考虑整个数据集。

筛选运算符列表如下所述:

  • eq(): 等于
  • gt(): 大于
  • ge(): 大于或等于
  • lt(): 小于
  • le(): 小于或等于
  • And():逻辑AND运算符
  • Or():逻辑OR运算符

以下单元格过滤ExperienceEvent数据集,以筛选仅存在于2019年1月1日至2019年12月31日终了期间的数据。

# Python

from platform_sdk.dataset_reader import DatasetReader

dataset_reader = DatasetReader(get_platform_sdk_client_context(), dataset_id="{DATASET_ID}")
df = dataset_reader.\
    where(dataset_reader["timestamp"].gt("2019-01-01 00:00:00").\
    And(dataset_reader["timestamp"].lt("2019-12-31 23:59:59"))\
).read()

R笔记本

R笔记本允许您在访问数据集时对数据进行分页。 下面演示了用于读取数据(包括分页和不分页)的示例代码。 有关可用启动R笔记本的更多信息,请访问JupyterLab用户指南中的JupyterLab 启动器部分。

以下R文档概述了以下概念:

从R中的数据集读取

不分页:

执行以下代码将读取整个数据集。 如果执行成功,则数据将另存为变量df0引用的Pantics数据帧。

# R

library(reticulate)
use_python("/usr/local/bin/ipython")
psdk <- import("platform_sdk")
datetime <- import("datetime", convert = FALSE)
py_run_file("~/.ipython/profile_default/startup/platform_sdk_context.py")
DatasetReader <- psdk$dataset_reader$DatasetReader
dataset_reader <- DatasetReader(py$get_platform_sdk_client_context(), dataset_id="{DATASET_ID}")
df0 <- dataset_reader$read()
head(df0)

分页:

执行以下代码将从指定的数据集中读取数据。 通过分别通过函数limit()offset()限制和偏移数据来实现分页。 限制数据是指要读取的数据点的最大数量,而偏移是指在读取数据之前要跳过的数据点的数量。 如果读取操作成功执行,则数据将另存为变量df0引用的Pantics数据帧。

# R

library(reticulate)
use_python("/usr/local/bin/ipython")
psdk <- import("platform_sdk")
datetime <- import("datetime", convert = FALSE)
py_run_file("~/.ipython/profile_default/startup/platform_sdk_context.py")

DatasetReader <- psdk$dataset_reader$DatasetReader
dataset_reader <- DatasetReader(py$get_platform_sdk_client_context(), dataset_id="{DATASET_ID}") 
df0 <- dataset_reader$limit(100L)$offset(10L)$read()

写入R中的数据集

要写入JupyterLab笔记本中的数据集,请在JupyterLab的左侧导航中选择“数据”图标选项卡(如下所示)。 此时会显示​Datasets​和​Schemas​目录。 选择​Datasets​并右键单击,然后从您要使用的数据集的下拉菜单中选择​在笔记本中写入数据​选项。 可执行代码条目显示在笔记本的底部。

  • 使用​在笔记本​中写入数据,以生成包含选定数据集的写入单元格。
  • 使用​在笔记本​中浏览数据,以生成包含您选定数据集的读取单元格。

或者,您也可以复制并粘贴以下代码单元格:

psdk <- import("platform_sdk")
dataset <- psdk$models$Dataset(py$get_platform_sdk_client_context())$get_by_id(dataset_id="{DATASET_ID}")
dataset_writer <- psdk$dataset_writer$DatasetWriter(py$get_platform_sdk_client_context(), dataset)
write_tracker <- dataset_writer$write(df, file_format='json')

筛选ExperienceEvent数据

要访问和过滤R笔记本中的ExperienceEvent数据集,您必须提供数据集({DATASET_ID})的ID,以及使用逻辑运算符定义特定时间范围的过滤器规则。 定义时间范围后,将忽略任何指定的分页,并考虑整个数据集。

筛选运算符列表如下所述:

  • eq(): 等于
  • gt(): 大于
  • ge(): 大于或等于
  • lt(): 小于
  • le(): 小于或等于
  • And():逻辑AND运算符
  • Or():逻辑OR运算符

以下单元格过滤ExperienceEvent数据集,以筛选仅存在于2019年1月1日至2019年12月31日终了期间的数据。

# R

library(reticulate)
use_python("/usr/local/bin/ipython")
psdk <- import("platform_sdk")
datetime <- import("datetime", convert = FALSE)
py_run_file("~/.ipython/profile_default/startup/platform_sdk_context.py")

client_context <- py$PLATFORM_SDK_CLIENT_CONTEXT
DatasetReader <- psdk$dataset_reader$DatasetReader
dataset_reader <- DatasetReader(py$get_platform_sdk_client_context(), dataset_id="{DATASET_ID}") 

df0 <- dataset_reader$
    where(dataset_reader["timestamp"]$gt("2019-01-01 00:00:00")$
    And(dataset_reader["timestamp"]$lt("2019-12-31 23:59:59"))
)$read()

PySpark 3笔记本

以下PySpark文档概述了以下概念:

初始化sparkSession

所有Spark 2.4笔记本都要求使用以下样板代码初始化会话。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

使用%dataset通过PySpark 3笔记本进行读写

随着Spark 2.4的推出,%dataset自定义魔术已提供给PySpark 3(Spark 2.4)笔记本使用。 有关IPython内核中可用魔术命令的更多详细信息,请访问IPython魔术文档

用法

%dataset {action} --datasetId {id} --dataFrame {df} --mode batch

描述

用于从PySpark笔记本(Python 3个内核)读取或写入数据集的自定义Data Science Workspace魔术命令。

名称 描述 必需
{action} 要对数据集执行的操作类型。 “读取”或“写入”两个操作均可用。
--datasetId {id} 用于提供要读取或写入的数据集的ID。
--dataFrame {df} 熊猫数据帧。
  • 当操作为“读取”时,{df}是数据集读取操作结果可用的变量(如数据帧)。
  • 当操作为“写入”时,此数据帧{df}将写入数据集。
--mode 用于更改数据读取方式的其他参数。 允许的参数为“batch”和“interactive”。 默认情况下,该模式将设置为“批处理”。
建议您使用“交互”模式,以提高较小数据集上的查询性能。
小贴士

查看笔记本数据限制部分中的PySpark表,以确定应将mode设置为interactive还是batch

示例

  • 阅读示例: %dataset read --datasetId 5e68141134492718af974841 --dataFrame pd0 --mode batch
  • 编写示例: %dataset write --datasetId 5e68141134492718af974842 --dataFrame pd0 --mode batch
重要

在写入数据之前使用df.cache()缓存数据可以显着提高笔记本性能。 如果您收到以下任何错误,这将会有所帮助:

  • 由于暂存失败,作业中止……只能压缩每个分区中元素数量相同的RDD。
  • 远程RPC客户端已断开关联,并出现其他内存错误。
  • 读取和写入数据集时性能不佳。

有关详细信息,请参阅疑难解答指南

您可以在JupyterLab购买中使用以下方法自动生成上述示例:

在JupyterLab的左侧导航中选择数据图标选项卡(在下面突出显示)。 此时会显示​Datasets​和​Schemas​目录。 选择​Datasets​并右键单击,然后从您要使用的数据集的下拉菜单中选择​在笔记本中写入数据​选项。 可执行代码条目显示在笔记本的底部。

  • 使用​在笔记本​中浏览数据以生成读取单元格。
  • 使用​在笔记本中写入数据​生成写单元格。

创建本地数据帧

要使用PySpark 3创建本地数据帧,请使用SQL查询。 例如:

date_aggregation.createOrReplaceTempView("temp_df")

df = spark.sql('''
  SELECT *
  FROM sparkdf
''')

local_df
df = spark.sql('''
  SELECT *
  FROM sparkdf
  LIMIT limit
''')
sample_df = df.sample(fraction)
小贴士

您还可以指定可选种子样本,如带有Replacement、双分数或长种子的布尔值。

筛选ExperienceEvent数据

访问和过滤PySpark笔记本中的ExperienceEvent数据集时,需要提供数据集标识({DATASET_ID})、贵组织的IMS标识以及定义特定时间范围的过滤器规则。 过滤时间范围使用函数spark.sql()定义,其中函数参数是SQL查询字符串。

以下单元格会过滤ExperienceEvent数据集,以便仅显示2019年1月1日至2019年12月31日结束期间存在的数据。

# PySpark 3 (Spark 2.4)

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

%dataset read --datasetId {DATASET_ID} --dataFrame df --mode batch

df.createOrReplaceTempView("event")
timepd = spark.sql("""
    SELECT *
    FROM event
    WHERE timestamp > CAST('2019-01-01 00:00:00.0' AS TIMESTAMP)
    AND timestamp < CAST('2019-12-31 23:59:59.9' AS TIMESTAMP)
""")
timepd.show()

缩放笔记本

以下文档包含以下概念的示例:

初始化SparkSession

所有Scala笔记本都要求您使用以下样板代码初始化会话:

import org.apache.spark.sql.{ SparkSession }
val spark = SparkSession
  .builder()
  .master("local")
  .getOrCreate()

读取数据集

在Scala中,您可以导入clientContext以获取和返回平台值,这样就无需定义变量,如var userToken。 在下面的Scala示例中, clientContext用于获取并返回读取数据集所需的所有值。

重要

在写入数据之前使用df.cache()缓存数据可以显着提高笔记本性能。 如果您收到以下任何错误,这将会有所帮助:

  • 由于暂存失败,作业中止……只能压缩每个分区中元素数量相同的RDD。
  • 远程RPC客户端已断开关联,并出现其他内存错误。
  • 读取和写入数据集时性能不佳。

有关详细信息,请参阅疑难解答指南

import org.apache.spark.sql.{Dataset, SparkSession}
import com.adobe.platform.token.ClientContext
val spark = SparkSession.builder().master("local").config("spark.sql.warehouse.dir", "/").getOrCreate()

val clientContext = ClientContext.getClientContext()
val df1 = spark.read.format("com.adobe.platform.query")
  .option("user-token", clientContext.getUserToken())
  .option("ims-org", clientContext.getOrgId())
  .option("api-key", clientContext.getApiKey())
  .option("service-token", clientContext.getServiceToken())
  .option("sandbox-name", clientContext.getSandboxName())
  .option("mode", "batch")
  .option("dataset-id", "5e68141134492718af974844")
  .load()

df1.printSchema()
df1.show(10)
元素 描述
df1 一个变量,表示用于读取和写入数据的Pantics数据帧。
用户令牌 使用clientContext.getUserToken()自动获取的用户令牌。
服务令牌 使用clientContext.getServiceToken()自动获取的服务令牌。
ims-org 您的IMS组织ID,使用clientContext.getOrgId()自动获取。
api-key 使用clientContext.getApiKey()自动获取的API密钥。
小贴士

查看笔记本数据限制部分中的Scala表,以确定应将mode设置为interactive还是batch

您可以在JupyterLab购买中使用以下方法自动生成上述示例:

在JupyterLab的左侧导航中选择数据图标选项卡(在下面突出显示)。 此时会显示​Datasets​和​Schemas​目录。 选择​Datasets​并右键单击,然后从您要使用的数据集的下拉菜单中选择​在笔记本中浏览数据​选项。 可执行代码条目显示在笔记本的底部。

  • 使用​在笔记本​中浏览数据以生成读取单元格。
  • 使用​在笔记本中写入数据​生成写单元格。

写入数据集

在Scala中,您可以导入clientContext以获取和返回平台值,这样就无需定义变量,如var userToken。 在下面的Scala示例中, clientContext用于定义并返回写入数据集所需的所有值。

重要

在写入数据之前使用df.cache()缓存数据可以显着提高笔记本性能。 如果您收到以下任何错误,这将会有所帮助:

  • 由于暂存失败,作业中止……只能压缩每个分区中元素数量相同的RDD。
  • 远程RPC客户端已断开关联,并出现其他内存错误。
  • 读取和写入数据集时性能不佳。

有关详细信息,请参阅疑难解答指南

import org.apache.spark.sql.{Dataset, SparkSession}
import com.adobe.platform.token.ClientContext
val spark = SparkSession.builder().master("local").config("spark.sql.warehouse.dir", "/").getOrCreate()

val clientContext = ClientContext.getClientContext()
df1.write.format("com.adobe.platform.query")
  .option("user-token", clientContext.getUserToken())
  .option("service-token", clientContext.getServiceToken())
  .option("ims-org", clientContext.getOrgId())
  .option("api-key", clientContext.getApiKey())
  .option("sandbox-name", clientContext.getSandboxName())
  .option("mode", "batch")
  .option("dataset-id", "5e68141134492718af974844")
  .save()
元素 描述
df1 一个变量,表示用于读取和写入数据的Pantics数据帧。
用户令牌 使用clientContext.getUserToken()自动获取的用户令牌。
服务令牌 使用clientContext.getServiceToken()自动获取的服务令牌。
ims-org 您的IMS组织ID,使用clientContext.getOrgId()自动获取。
api-key 使用clientContext.getApiKey()自动获取的API密钥。
小贴士

查看笔记本数据限制部分中的Scala表,以确定应将mode设置为interactive还是batch

创建本地数据帧

要使用Scala创建本地数据帧,需要SQL查询。 例如:

sparkdf.createOrReplaceTempView("sparkdf")

val localdf = spark.sql("SELECT * FROM sparkdf LIMIT 1)

筛选ExperienceEvent数据

访问和过滤Scala笔记本中的ExperienceEvent数据集时,您需要提供数据集标识({DATASET_ID})、贵组织的IMS标识以及定义特定时间范围的过滤器规则。 过滤时间范围使用函数spark.sql()定义,其中函数参数是SQL查询字符串。

以下单元格会过滤ExperienceEvent数据集,以便仅显示2019年1月1日至2019年12月31日结束期间存在的数据。

// Spark (Spark 2.4)

// Turn off extra logging
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("com").setLevel(Level.OFF)

import org.apache.spark.sql.{Dataset, SparkSession}
val spark = org.apache.spark.sql.SparkSession.builder().appName("Notebook")
  .master("local")
  .getOrCreate()

// Stage Exploratory
val dataSetId: String = "{DATASET_ID}"
val orgId: String = sys.env("IMS_ORG_ID")
val clientId: String = sys.env("PYDASDK_IMS_CLIENT_ID")
val userToken: String = sys.env("PYDASDK_IMS_USER_TOKEN")
val serviceToken: String = sys.env("PYDASDK_IMS_SERVICE_TOKEN")
val mode: String = "batch"

var df = spark.read.format("com.adobe.platform.query")
  .option("user-token", userToken)
  .option("ims-org", orgId)
  .option("api-key", clientId)
  .option("mode", mode)
  .option("dataset-id", dataSetId)
  .option("service-token", serviceToken)
  .load()
df.createOrReplaceTempView("event")
val timedf = spark.sql("""
    SELECT * 
    FROM event 
    WHERE timestamp > CAST('2019-01-01 00:00:00.0' AS TIMESTAMP)
    AND timestamp < CAST('2019-12-31 23:59:59.9' AS TIMESTAMP)
""")
timedf.show()

后续步骤

本文档介绍了使用JupyterLab笔记本访问数据集的一般准则。 有关查询数据集的更多深入示例,请访问JupyterLab笔记本🔗文档中的查询服务。 有关如何探索和可视化数据集的更多信息,请访问上的文档,使用笔记本分析数据

Query Service的可选SQL标记

此表概述了可用于Query Service的可选SQL标记。

标志 描述
-h--help 显示帮助消息并退出。
-n--notify 切换用于通知查询结果的选项。
-a--async 使用此标记可异步执行查询,并可在查询执行时释放内核。 在将查询结果分配给变量时要谨慎,因为如果查询不完成,则可能未定义该变量。
-d--display 使用此标记可阻止显示结果。

在此页面上