Jupyterlab笔记本中的数据访问
创建对象:
- 用户
- 开发人员
每个受支持的内核都提供了内置功能,允许您从笔记本内的数据集读取Experience Platform数据。 目前,Adobe Experience Platform Data Science Workspace中的JupyterLab支持Python、R、PySpark和Scala的笔记本。 但是,对分页数据的支持仅限于Python和R笔记本。 本指南重点介绍如何使用JupyterLab笔记本访问您的数据。
快速入门
在阅读本指南之前,请查看JupyterLab 用户指南,以详细了解JupyterLab及其在数据科学Workspace中的角色。
笔记本数据限制
以下信息定义了可读取的最大数据量、使用了什么类型的数据以及读取数据的估计时间范围。
对于Python和R,使用配置为40GB RAM的笔记本服务器进行基准测试。 对于PySpark和Scala,配置为64GB RAM、8核、2个DBU且最多4个工作线程的数据库群集用于下面列出的基准。
使用的ExperienceEvent架构数据大小不一,从1,000行(1K)到十亿(1B)行。 请注意,对于PySpark和Spark量度,XDM数据使用了10天的日期范围。
使用Query Service创建表作为选择(CTAS)预处理了临时架构数据。 这些数据也各不相同,从1,000行(1,000行)到最多10亿(1,000行)不等。
何时使用批处理模式与交互模式
读取包含PySpark和Scala笔记本的数据集时,您可以选择使用交互模式或批处理模式来读取数据集。 交互模式用于快速结果,而批处理模式用于大型数据集。
Python笔记本数据限制
XDM ExperienceEvent架构: 您最多可以在22分钟内读取200万行(磁盘上约6.1 GB的数据)的XDM数据。 添加其他行可能会导致错误。
临时架构: 您应该能够在14分钟内读取非XDM (临时)数据的最多500万行(磁盘上约5.6 GB的数据)。 添加其他行可能会导致错误。
R笔记本数据限制
XDM ExperienceEvent架构: 您在13分钟内最多可读取100万行XDM数据(磁盘上的3GB数据)。
临时架构: 您最多可以在10分钟内读取300万行临时数据(磁盘上为293MB数据)。
PySpark (Python内核)笔记本数据限制:
XDM ExperienceEvent架构: 在交互模式下,您应该能够在大约20分钟内读取最多500万行(磁盘上约13.42GB的数据)的XDM数据。 交互模式最多仅支持500万行。 如果要读取更大的数据集,建议切换到批处理模式。 在批处理模式中,您最多可以在14小时内读取5亿行(磁盘上大约1.31TB的数据)的XDM数据。
临时架构: 在交互模式下,您应该可以在3分钟内读取非XDM数据的最多500万行(磁盘上约5.36GB的数据)。 在批处理模式下,您最多可以在18分钟内读取非XDM数据的10亿行(磁盘上大约1.05TB的数据)。
Spark (Scala内核)笔记本数据限制:
XDM ExperienceEvent架构: 在交互模式下,您应该能够在大约18分钟内读取最多500万行(磁盘上约13.42GB的数据)的XDM数据。 交互模式最多仅支持500万行。 如果要读取更大的数据集,建议切换到批处理模式。 在批处理模式中,您最多可以在14小时内读取5亿行(磁盘上大约1.31TB的数据)的XDM数据。
临时架构: 在交互模式下,您应该能够在3分钟内读取最多500万行(磁盘上约5.36GB的数据)的非XDM数据。 在批处理模式下,您最多可以在16分钟内读取非XDM数据的10亿行(磁盘上大约1.05TB的数据)。
Python笔记本
Python笔记本允许您在访问数据集时分页数据。 下面演示了使用分页和不使用分页读取数据的示例代码。 有关可用入门Python笔记本的更多信息,请访问JupyterLab用户指南中的JupyterLab 启动器部分。
以下Python文档概述了以下概念:
从Python中的数据集读取
没有分页:
执行以下代码将读取整个数据集。 如果执行成功,则数据将保存为变量df
引用的熊猫数据流。
# Python
from platform_sdk.dataset_reader import DatasetReader
dataset_reader = DatasetReader(get_platform_sdk_client_context(), dataset_id="{DATASET_ID}")
df = dataset_reader.read()
df.head()
分页:
执行以下代码将从指定的数据集中读取数据。 通过分别通过函数limit()
和offset()
限制和偏移数据来实现分页。 限制数据是指要读取的最大数据点数,而偏移是指在读取数据之前要跳过的数据点数。 如果读取操作执行成功,则数据将保存为变量df
引用的Pandas数据流。
# Python
from platform_sdk.dataset_reader import DatasetReader
dataset_reader = DatasetReader(get_platform_sdk_client_context(), dataset_id="{DATASET_ID}")
df = dataset_reader.limit(100).offset(10).read()
写入Python中的数据集
要写入JupyterLab笔记本中的数据集,请在JupyterLab的左侧导航中选择“数据”图标选项卡(下面高亮显示)。 出现 数据集 和 架构 目录。 选择 数据集 并右键单击,然后从要使用的数据集上的下拉菜单中选择 在笔记本中写入数据 选项。 笔记本底部会显示一个可执行代码条目。
- 使用 在笔记本中写入数据 生成包含选定数据集的写入单元格。
- 使用 浏览笔记本中的数据 生成选定数据集的读取单元格。
- 使用 笔记本中的查询数据 生成包含选定数据集的基本查询单元格。
或者,您可以复制并粘贴以下代码单元格。 同时替换{DATASET_ID}
和{PANDA_DATAFRAME}
。
from platform_sdk.models import Dataset
from platform_sdk.dataset_writer import DatasetWriter
dataset = Dataset(get_platform_sdk_client_context()).get_by_id(dataset_id="{DATASET_ID}")
dataset_writer = DatasetWriter(get_platform_sdk_client_context(), dataset)
write_tracker = dataset_writer.write({PANDA_DATAFRAME}, file_format='json')
在Python中使用Query Service查询数据
Experience Platform上的JupyterLab允许您使用Python笔记本中的SQL通过Adobe Experience Platform查询服务访问数据。 通过Query Service访问数据对于处理大型数据集很有用,因为其运行时间较长。 请注意,使用Query Service查询数据的处理时间限制为10分钟。
在JupyterLab中使用Query Service之前,请确保您对Query Service SQL语法有一定的了解。
使用Query Service查询数据需要您提供目标数据集的名称。 您可以使用 数据资源管理器 查找所需的数据集来生成必要的代码单元格。 右键单击数据集列表并单击 在笔记本中查询数据 以在笔记本中生成两个代码单元格。 下面将更详细地概述这两个单元格。
为了在JupyterLab中利用Query Service,您必须首先在正在处理的Python笔记本和Query Service之间创建连接。 这可以通过执行第一个生成的单元格来实现。
qs_connect()
在第二个生成的单元格中,必须在SQL查询之前定义第一行。 默认情况下,生成的单元格定义了一个可选变量(df0
),该变量将查询结果保存为Pandas数据流。-c QS_CONNECTION
参数是必需的,它告知内核对Query Service执行SQL查询。 有关其他参数的列表,请参阅附录。
%%read_sql df0 -c QS_CONNECTION
SELECT *
FROM name_of_the_dataset
LIMIT 10
/* Querying table "name_of_the_dataset" (datasetId: {DATASET_ID})*/
通过使用字符串格式语法并将变量括在大括号({}
)中,可以直接在SQL查询中引用Python变量,如以下示例所示:
table_name = 'name_of_the_dataset'
table_columns = ','.join(['col_1','col_2','col_3'])
%%read_sql demo -c QS_CONNECTION
SELECT {table_columns}
FROM {table_name}
筛选ExperienceEvent数据
要访问和筛选Python笔记本中的ExperienceEvent数据集,您必须提供数据集({DATASET_ID}
)的ID以及使用逻辑运算符定义特定时间范围的筛选规则。 定义时间范围后,将忽略任何指定的分页,并会考虑整个数据集。
筛选运算符列表如下所述:
eq()
:等于gt()
:大于ge()
:大于或等于lt()
:小于le()
:小于或等于And()
:逻辑AND运算符Or()
:逻辑或运算符
以下单元格筛选ExperienceEvent数据集,以使其包含在2019年1月1日至2019年12月31日终了期间专门存在的数据。
# Python
from platform_sdk.dataset_reader import DatasetReader
dataset_reader = DatasetReader(get_platform_sdk_client_context(), dataset_id="{DATASET_ID}")
df = dataset_reader.\
where(dataset_reader["timestamp"].gt("2019-01-01 00:00:00").\
And(dataset_reader["timestamp"].lt("2019-12-31 23:59:59"))\
).read()
R笔记本
R笔记本允许您在访问数据集时分页数据。 下面演示了使用分页和不使用分页读取数据的示例代码。 有关可用入门R笔记本的更多信息,请访问JupyterLab用户指南中的JupyterLab 启动器部分。
以下R文档概述了以下概念:
从R中的数据集读取
没有分页:
执行以下代码将读取整个数据集。 如果执行成功,则数据将保存为变量df0
引用的熊猫数据流。
# R
library(reticulate)
use_python("/usr/local/bin/ipython")
psdk <- import("platform_sdk")
datetime <- import("datetime", convert = FALSE)
py_run_file("~/.ipython/profile_default/startup/platform_sdk_context.py")
DatasetReader <- psdk$dataset_reader$DatasetReader
dataset_reader <- DatasetReader(py$get_platform_sdk_client_context(), dataset_id="{DATASET_ID}")
df0 <- dataset_reader$read()
head(df0)
分页:
执行以下代码将从指定的数据集中读取数据。 通过分别通过函数limit()
和offset()
限制和偏移数据来实现分页。 限制数据是指要读取的最大数据点数,而偏移是指在读取数据之前要跳过的数据点数。 如果读取操作执行成功,则数据将保存为变量df0
引用的Pandas数据流。
# R
library(reticulate)
use_python("/usr/local/bin/ipython")
psdk <- import("platform_sdk")
datetime <- import("datetime", convert = FALSE)
py_run_file("~/.ipython/profile_default/startup/platform_sdk_context.py")
DatasetReader <- psdk$dataset_reader$DatasetReader
dataset_reader <- DatasetReader(py$get_platform_sdk_client_context(), dataset_id="{DATASET_ID}")
df0 <- dataset_reader$limit(100L)$offset(10L)$read()
写入R中的数据集
要写入JupyterLab笔记本中的数据集,请在JupyterLab的左侧导航中选择“数据”图标选项卡(下面高亮显示)。 出现 数据集 和 架构 目录。 选择 数据集 并右键单击,然后从要使用的数据集上的下拉菜单中选择 在笔记本中写入数据 选项。 笔记本底部会显示一个可执行代码条目。
- 使用 在笔记本中写入数据 生成包含选定数据集的写入单元格。
- 使用 浏览笔记本中的数据 生成选定数据集的读取单元格。
或者,您可以复制并粘贴以下代码单元格:
psdk <- import("platform_sdk")
dataset <- psdk$models$Dataset(py$get_platform_sdk_client_context())$get_by_id(dataset_id="{DATASET_ID}")
dataset_writer <- psdk$dataset_writer$DatasetWriter(py$get_platform_sdk_client_context(), dataset)
write_tracker <- dataset_writer$write(df, file_format='json')
筛选ExperienceEvent数据
要访问和筛选R笔记本中的ExperienceEvent数据集,您必须提供数据集({DATASET_ID}
)的ID以及使用逻辑运算符定义特定时间范围的筛选规则。 定义时间范围后,将忽略任何指定的分页,并会考虑整个数据集。
筛选运算符列表如下所述:
eq()
:等于gt()
:大于ge()
:大于或等于lt()
:小于le()
:小于或等于And()
:逻辑AND运算符Or()
:逻辑或运算符
以下单元格筛选ExperienceEvent数据集,以使其包含在2019年1月1日至2019年12月31日终了期间专门存在的数据。
# R
library(reticulate)
use_python("/usr/local/bin/ipython")
psdk <- import("platform_sdk")
datetime <- import("datetime", convert = FALSE)
py_run_file("~/.ipython/profile_default/startup/platform_sdk_context.py")
client_context <- py$PLATFORM_SDK_CLIENT_CONTEXT
DatasetReader <- psdk$dataset_reader$DatasetReader
dataset_reader <- DatasetReader(py$get_platform_sdk_client_context(), dataset_id="{DATASET_ID}")
df0 <- dataset_reader$
where(dataset_reader["timestamp"]$gt("2019-01-01 00:00:00")$
And(dataset_reader["timestamp"]$lt("2019-12-31 23:59:59"))
)$read()
PySpark 3笔记本
以下PySpark文档概述了以下概念:
初始化sparkSession
所有Spark 2.4笔记本都要求您使用以下样板代码初始化会话。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
使用%dataset对PySpark 3笔记本进行读取和写入
随着Spark 2.4的引入,提供了%dataset
自定义魔术以用于PySpark 3 (Spark 2.4)笔记本。 有关IPython内核中可用的魔术命令的更多详细信息,请访问IPython魔术文档。
用法
%dataset {action} --datasetId {id} --dataFrame {df} --mode batch
描述
用于从PySpark笔记本(Python 3内核)读取或写入数据集的自定义Data Science Workspace魔术命令。
{action}
--datasetId {id}
--dataFrame {df}
熊猫的数据流。
- 当操作为“读取”时,{df}是变量,其中数据集读取操作的结果可用(例如数据流)。
- 当操作为“写入”时,此数据流{df}将写入数据集。
--mode
建议您“交互”模式以在较小的数据集上提高查询性能。
示例
- 阅读示例:
%dataset read --datasetId 5e68141134492718af974841 --dataFrame pd0 --mode batch
- 编写示例:
%dataset write --datasetId 5e68141134492718af974842 --dataFrame pd0 --mode batch
df.cache()
缓存数据可以大大提高笔记本性能。 如果您收到以下任何错误,这将很有帮助:- 由于暂存失败,作业已中止……只能压缩每个分区中具有相同元素数的RDD。
- 远程RPC客户端已取消关联和其他内存错误。
- 读取和写入数据集时性能不佳。
您可以使用以下方法在JupyterLab buy中自动生成上述示例:
选择JupyterLab左侧导航栏中的数据图标选项卡(突出显示如下)。 出现 数据集 和 架构 目录。 选择 数据集 并右键单击,然后从要使用的数据集上的下拉菜单中选择 在笔记本中写入数据 选项。 笔记本底部会显示一个可执行代码条目。
- 使用 浏览笔记本中的数据 生成读取单元格。
- 使用 在笔记本中写入数据 生成写入单元格。
创建本地数据流
要使用PySpark 3创建本地数据流,请使用SQL查询。 例如:
date_aggregation.createOrReplaceTempView("temp_df")
df = spark.sql('''
SELECT *
FROM sparkdf
''')
local_df
df = spark.sql('''
SELECT *
FROM sparkdf
LIMIT limit
''')
sample_df = df.sample(fraction)
筛选ExperienceEvent数据
访问和筛选PySpark笔记本中的ExperienceEvent数据集需要您提供数据集标识({DATASET_ID}
)、组织的IMS标识以及定义特定时间范围的筛选规则。 通过使用函数spark.sql()
定义过滤时间范围,其中函数参数是SQL查询字符串。
以下单元格将ExperienceEvent数据集筛选为2019年1月1日至2019年12月31日终了期间专门存在的数据。
# PySpark 3 (Spark 2.4)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
%dataset read --datasetId {DATASET_ID} --dataFrame df --mode batch
df.createOrReplaceTempView("event")
timepd = spark.sql("""
SELECT *
FROM event
WHERE timestamp > CAST('2019-01-01 00:00:00.0' AS TIMESTAMP)
AND timestamp < CAST('2019-12-31 23:59:59.9' AS TIMESTAMP)
""")
timepd.show()
Scala笔记本
以下文档包含有关以下概念的示例:
初始化SparkSession
所有Scala笔记本都要求您使用以下样板代码初始化会话:
import org.apache.spark.sql.{ SparkSession }
val spark = SparkSession
.builder()
.master("local")
.getOrCreate()
读取数据集
在Scala中,您可以导入clientContext
以获取和返回Experience Platform值,这样就无需定义变量,如var userToken
。 在下面的Scala示例中,clientContext
用于获取和返回读取数据集所需的所有值。
df.cache()
缓存数据可以大大提高笔记本性能。 如果您收到以下任何错误,这将很有帮助:- 由于暂存失败,作业已中止……只能压缩每个分区中具有相同元素数的RDD。
- 远程RPC客户端已取消关联和其他内存错误。
- 读取和写入数据集时性能不佳。
import org.apache.spark.sql.{Dataset, SparkSession}
import com.adobe.platform.token.ClientContext
val spark = SparkSession.builder().master("local").config("spark.sql.warehouse.dir", "/").getOrCreate()
val clientContext = ClientContext.getClientContext()
val df1 = spark.read.format("com.adobe.platform.query")
.option("user-token", clientContext.getUserToken())
.option("ims-org", clientContext.getOrgId())
.option("api-key", clientContext.getApiKey())
.option("service-token", clientContext.getServiceToken())
.option("sandbox-name", clientContext.getSandboxName())
.option("mode", "batch")
.option("dataset-id", "5e68141134492718af974844")
.load()
df1.printSchema()
df1.show(10)
clientContext.getUserToken()
自动获取的用户令牌。clientContext.getServiceToken()
自动获取的服务令牌。clientContext.getOrgId()
自动获取的组织ID。clientContext.getApiKey()
自动获取的API密钥。可以使用以下方法在JupyterLab buy中自动生成上述示例:
选择JupyterLab左侧导航栏中的数据图标选项卡(突出显示如下)。 出现 数据集 和 架构 目录。 选择 数据集 并右键单击,然后从要使用的数据集上的下拉菜单中选择 在笔记本中浏览数据 选项。 笔记本底部会显示一个可执行代码条目。
与
- 使用 浏览笔记本中的数据 生成读取单元格。
- 使用 在笔记本中写入数据 生成写入单元格。
写入数据集
在Scala中,您可以导入clientContext
以获取和返回Experience Platform值,这样就无需定义变量,如var userToken
。 在下面的Scala示例中,clientContext
用于定义和返回写入数据集所需的所有值。
df.cache()
缓存数据可以大大提高笔记本性能。 如果您收到以下任何错误,这将很有帮助:- 由于暂存失败,作业已中止……只能压缩每个分区中具有相同元素数的RDD。
- 远程RPC客户端已取消关联和其他内存错误。
- 读取和写入数据集时性能不佳。
import org.apache.spark.sql.{Dataset, SparkSession}
import com.adobe.platform.token.ClientContext
val spark = SparkSession.builder().master("local").config("spark.sql.warehouse.dir", "/").getOrCreate()
val clientContext = ClientContext.getClientContext()
df1.write.format("com.adobe.platform.query")
.option("user-token", clientContext.getUserToken())
.option("service-token", clientContext.getServiceToken())
.option("ims-org", clientContext.getOrgId())
.option("api-key", clientContext.getApiKey())
.option("sandbox-name", clientContext.getSandboxName())
.option("mode", "batch")
.option("dataset-id", "5e68141134492718af974844")
.save()
clientContext.getUserToken()
自动获取的用户令牌。clientContext.getServiceToken()
自动获取的服务令牌。clientContext.getOrgId()
自动获取的组织ID。clientContext.getApiKey()
自动获取的API密钥。创建本地数据流
要使用Scala创建本地数据流,需要SQL查询。 例如:
sparkdf.createOrReplaceTempView("sparkdf")
val localdf = spark.sql("SELECT * FROM sparkdf LIMIT 1)
筛选ExperienceEvent数据
访问和筛选Scala笔记本中的ExperienceEvent数据集需要您提供数据集标识({DATASET_ID}
)、组织的IMS标识以及定义特定时间范围的筛选规则。 使用函数spark.sql()
定义筛选时间范围,其中函数参数是SQL查询字符串。
以下单元格将ExperienceEvent数据集筛选为2019年1月1日至2019年12月31日终了期间专门存在的数据。
// Spark (Spark 2.4)
// Turn off extra logging
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("com").setLevel(Level.OFF)
import org.apache.spark.sql.{Dataset, SparkSession}
val spark = org.apache.spark.sql.SparkSession.builder().appName("Notebook")
.master("local")
.getOrCreate()
// Stage Exploratory
val dataSetId: String = "{DATASET_ID}"
val orgId: String = sys.env("IMS_ORG_ID")
val clientId: String = sys.env("PYDASDK_IMS_CLIENT_ID")
val userToken: String = sys.env("PYDASDK_IMS_USER_TOKEN")
val serviceToken: String = sys.env("PYDASDK_IMS_SERVICE_TOKEN")
val mode: String = "batch"
var df = spark.read.format("com.adobe.platform.query")
.option("user-token", userToken)
.option("ims-org", orgId)
.option("api-key", clientId)
.option("mode", mode)
.option("dataset-id", dataSetId)
.option("service-token", serviceToken)
.load()
df.createOrReplaceTempView("event")
val timedf = spark.sql("""
SELECT *
FROM event
WHERE timestamp > CAST('2019-01-01 00:00:00.0' AS TIMESTAMP)
AND timestamp < CAST('2019-12-31 23:59:59.9' AS TIMESTAMP)
""")
timedf.show()
Query Service的可选SQL标记
此表概述了可用于Query Service的可选SQL标记。
-h
、--help
-n
、--notify
-a
、--async
-d
、--display