转到https://github.com/adobe/experience-platform-streaming-connect/releases并下载最新的Adobe Experience Platform Sink Connector正式版本。
将下载文件streaming-connect-sink-0.0.6-java-11.jar放入桌面。
转到桌面上名为Kafka_AEP的文件夹,然后导航到文件夹kafka_2.13-2.8.0/config
。
在该文件夹中,使用任何文本编辑器打开文件connect-distributed.properties。
在文本编辑器中,转到第34行和第35行,并确保将字段key.converter.schemas.enable
和value.converter.schemas.enable
设置为false
``json
key.converter.schemas.enable=false
value.converter.schemas.enable=false
``
接下来,返回到文件夹kafka_2.13-2.8.0
并手动创建新文件夹,然后将其命名为connectors
。
右键单击文件夹,然后单击New terminal at Folder。
然后你会看到这个。 输入命令pwd
以检索该文件夹的完整路径。 选择完整路径,并将其复制到剪贴板。
返回到文本编辑器,转到文件connect-distributed.properties,然后向下滚动到最后一行(屏幕截图中的第86行)。 您应取消对以# plugin.path=
开头的行的注释,并应粘贴名为connectors
的文件夹的完整路径。 结果应类似于以下内容:
plugin.path=/Users/woutervangeluwe/Desktop/Kafka_AEP/kafka_2.13-2.8.0/connectors
保存对文件connect-distributed.properties所做的更改,并关闭文本编辑器。
接下来,将您下载的Adobe Experience Platform Sink连接器的最新正式版本复制到名为connectors
的文件夹中。 您之前下载的文件名为streaming-connect-sink-0.0.6-java-11.jar,您只需将其移入connectors
文件夹即可。
接下来,在kafka_2.13-2.8.0文件夹级别打开新的“终端”窗口。 右键单击该文件夹,然后单击New Terminal at Folder。
在“终端”窗口中,粘贴以下命令:bin/connect-distributed.sh config/connect-distributed.properties
并单击Enter。 此命令将启动Kafka Connect,并加载Adobe Experience Platform Sink连接器的库。
几秒钟后,您将看到如下内容:
您现在可以使用Postman与Kafka Connect进行交互。 要执行此操作,请下载此Postman Collection,然后将其解压缩到桌面上的本地计算机。 然后,您将拥有一个名为Kafka_AEP.postman_collection.json
的文件。
您需要在Postman中导入此文件。 要执行此操作,请打开Postman ,单击Import,将文件Kafka_AEP.postman_collection.json
拖放到弹出窗口中,然后单击Import。
然后,您将在邮递员的左侧菜单中找到该收藏集。 单击第一个请求,GET可用Kafka Connectors以将其打开。
然后你会看到这个。 单击蓝色的Send按钮,在该按钮之后,您应会看到空响应[]
。 空响应是由于当前未定义Kafka Connect连接器。
要创建连接器,请单击以打开Kafka集合中的第二个请求, POST创建AEP Sink连接器。 然后你会看到这个。 在第11行,其中显示"aep.endpoint":"",您需要在练习24.3结束时收到的HTTP API流端点URL中进行粘贴。 HTTP API流端点URL如下所示:https://dcs.adobedc.net/collection/94981e0634e0d37c3559ce7ece05a35eae35c52cc5962d2d4a44e488400f2338
。
粘贴后,请求正文应如下所示。 单击蓝色的Send按钮以创建连接器。 创建连接器后,您会立即收到响应。
单击第一个请求, 可用GETKafka Connectors以再次将其打开,然后再次单击蓝色的发送按钮。 此时您将看到已创建Kafka Connect连接器。
接下来,打开Kafka集合中的第三个请求,GET检查Kafka连接器状态。 单击蓝色的Send按钮,您随后将收到与下面类似的响应,指出连接器正在运行。
右键单击文件夹kafka_2.13-2.8.0并单击文件夹中的新建终端,打开新的终端窗口。
输入以下命令:
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic aep
然后你会看到这个。 按下Enter按钮后,每添加一行新消息,都会在主题aep中发送一条新消息。
您现在可以发送一条消息,该消息将导致Adobe Experience Platform Sink连接器使用,并将实时摄取到Adobe Experience Platform。
让我们做个小演示来测试这个。
打开全新的隐身浏览器窗口,然后转到https://public.aepdemo.net。
然后你会看到这个。
输入配置ID并单击Load Configuration。 随后将加载您的配置。
向下滚动,然后单击保存配置。
然后,您将被重定向到“管理员”主页。 转到选择LDAP。 选择LDAP并单击Save。
然后,您将被重定向到“管理员”主页。 转到选择Brand并选择品牌Luma,单击Save。
然后,您将被重定向到“管理员”主页。 单击Luma徽标。
然后,您将看到Luma主页。
转到Login/Register。 填写表单并单击创建帐户。 不要忘记选中测试用户档案的复选框。
然后,您将在X光面板上看到您的用户档案数据。
您还会看到尚未记录任何体验事件。
让我们将其更改,并将呼叫中心体验活动从Kafka发送到Adobe Experience Platform。
采用以下体验事件有效负载示例,并将其复制到文本编辑器中。
{
"header": {
"datasetId": "5fd1a9dea30603194baeea43",
"imsOrgId": "907075E95BF479EC0A495C73@AdobeOrg",
"source": {
"name": "Launch"
},
"schemaRef": {
"id": "https://ns.adobe.com/experienceplatform/schemas/b0190276c6e1e1e99cf56c99f4c07a6e517bf02091dcec90",
"contentType": "application/vnd.adobe.xed-full+json;version=1"
}
},
"body": {
"xdmMeta": {
"schemaRef": {
"id": "https://ns.adobe.com/experienceplatform/schemas/b0190276c6e1e1e99cf56c99f4c07a6e517bf02091dcec90",
"contentType": "application/vnd.adobe.xed-full+json;version=1"
}
},
"xdmEntity": {
"eventType": "callCenterInteractionKafka",
"_id": "",
"timestamp": "2021-08-12T15:04:03.630Z",
"_experienceplatform": {
"identification": {
"core": {
"phoneNumber": ""
}
},
"interactionDetails": {
"core": {
"callCenterAgent": {
"callID": "Support Contact - 3767767",
"callTopic": "contract",
"callFeeling": "negative"
}
}
}
}
}
}
}
然后你会看到这个。 您需要手动更新2个字段:
对于每次数据摄取,字段**_id**必须是唯一的。 如果生成多个事件,请确保每次都将字段**_id**更新为新的唯一值。
然后,您应该拥有如下内容:
接下来,将完整的体验事件复制到剪贴板。 需要清空JSON有效负载的空格,为此,我们将使用在线工具。 转到http://jsonviewer.stack.hu/以执行此操作。
将体验事件粘贴到编辑器中,然后单击删除空格。
接下来,选择所有输出文本,并将其复制到剪贴板。
返回到“终端”窗口。
将不带空格的新有效负载粘贴到“终端”窗口中,然后单击Enter。
接下来,返回到您的演示网站并刷新页面。 此时,您应会在配置文件的其他事件下看到一个体验事件,就像下面的事件一样:
您已完成此练习。
下一步:摘要和优点