24.4安装和配置Kafka Connect和Adobe Experience Platform Sink连接器

24.4.1下载Adobe Experience Platform Sink连接器

转到https://github.com/adobe/experience-platform-streaming-connect/releases并下载最新的Adobe Experience Platform Sink Connector正式版本。

卡夫卡

将下载文件​streaming-connect-sink-0.0.6-java-11.jar​放入桌面。

卡夫卡

24.4.2配置Kafka Connect

转到桌面上名为​Kafka_AEP​的文件夹,然后导航到文件夹kafka_2.13-2.8.0/config
在该文件夹中,使用任何文本编辑器打开文件​connect-distributed.properties

卡夫卡

在文本编辑器中,转到第34行和第35行,并确保将字段key.converter.schemas.enablevalue.converter.schemas.enable设置为false

``json
key.converter.schemas.enable=false
value.converter.schemas.enable=false
``

卡夫卡

接下来,返回到文件夹kafka_2.13-2.8.0并手动创建新文件夹,然后将其命名为connectors

卡夫卡

右键单击文件夹,然后单击​New terminal at Folder

卡夫卡

然后你会看到这个。 输入命令pwd以检索该文件夹的完整路径。 选择完整路径,并将其复制到剪贴板。

卡夫卡

返回到文本编辑器,转到文件​connect-distributed.properties,然后向下滚动到最后一行(屏幕截图中的第86行)。 您应取消对以# plugin.path=开头的行的注释,并应粘贴名为connectors的文件夹的完整路径。 结果应类似于以下内容:

plugin.path=/Users/woutervangeluwe/Desktop/Kafka_AEP/kafka_2.13-2.8.0/connectors

保存对文件​connect-distributed.properties​所做的更改,并关闭文本编辑器。

卡夫卡

接下来,将您下载的Adobe Experience Platform Sink连接器的最新正式版本复制到名为connectors的文件夹中。 您之前下载的文件名为​streaming-connect-sink-0.0.6-java-11.jar,您只需将其移入connectors文件夹即可。

卡夫卡

接下来,在​kafka_2.13-2.8.0​文件夹级别打开新的“终端”窗口。 右键单击该文件夹,然后单击​New Terminal at Folder

在“终端”窗口中,粘贴以下命令:bin/connect-distributed.sh config/connect-distributed.properties并单击​Enter。 此命令将启动Kafka Connect,并加载Adobe Experience Platform Sink连接器的库。

卡夫卡

几秒钟后,您将看到如下内容:

卡夫卡

24.4.3使用Postman创建Adobe Experience Platform Sink连接器

您现在可以使用Postman与Kafka Connect进行交互。 要执行此操作,请下载此Postman Collection,然后将其解压缩到桌面上的本地计算机。 然后,您将拥有一个名为Kafka_AEP.postman_collection.json的文件。

卡夫卡

您需要在Postman中导入此文件。 要执行此操作,请打开Postman ,单击​Import,将文件Kafka_AEP.postman_collection.json拖放到弹出窗口中,然后单击​Import

卡夫卡

然后,您将在邮递员的左侧菜单中找到该收藏集。 单击第一个请求,GET可用Kafka Connectors​以将其打开。

卡夫卡

然后你会看到这个。 单击蓝色的​Send​按钮,在该按钮之后,您应会看到空响应[]。 空响应是由于当前未定义Kafka Connect连接器。

卡夫卡

要创建连接器,请单击以打开Kafka集合中的第二个请求, POST创建AEP Sink连接器。 然后你会看到这个。 在第11行,其中显示​"aep.endpoint":"",您需要在练习24.3结束时收到的HTTP API流端点URL中进行粘贴。 HTTP API流端点URL如下所示:https://dcs.adobedc.net/collection/94981e0634e0d37c3559ce7ece05a35eae35c52cc5962d2d4a44e488400f2338

卡夫卡

粘贴后,请求正文应如下所示。 单击蓝色的​Send​按钮以创建连接器。 创建连接器后,您会立即收到响应。

卡夫卡

单击第一个请求, 可用GETKafka Connectors​以再次将其打开,然后再次单击蓝色的​发送​按钮。 此时您将看到已创建Kafka Connect连接器。

卡夫卡

接下来,打开Kafka集合中的第三个请求,GET检查Kafka连接器状态。 单击蓝色的​Send​按钮,您随后将收到与下面类似的响应,指出连接器正在运行。

卡夫卡

24.4.4制作体验事件

右键单击文件夹​kafka_2.13-2.8.0​并单击文件夹​中的​新建终端,打开新的​终端​窗口。

卡夫卡

输入以下命令:

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic aep

卡夫卡

然后你会看到这个。 按下Enter按钮后,每添加一行新消息,都会在主题​aep​中发送一条新消息。

卡夫卡

您现在可以发送一条消息,该消息将导致Adobe Experience Platform Sink连接器使用,并将实时摄取到Adobe Experience Platform。

让我们做个小演示来测试这个。

打开全新的隐身浏览器窗口,然后转到https://public.aepdemo.net

然后你会看到这个。

启动设置

输入配置ID并单击​Load Configuration。 随后将加载您的配置。

启动设置

向下滚动,然后单击​保存配置

启动设置

然后,您将被重定向到“管理员”主页。 转到​选择LDAP。 选择LDAP并单击​Save

启动设置

然后,您将被重定向到“管理员”主页。 转到​选择Brand​并选择品牌​Luma,单击​Save

启动设置

然后,您将被重定向到“管理员”主页。 单击​Luma​徽标。

启动设置

然后,您将看到Luma主页。

启动设置

转到​Login/Register。 填写表单并单击​创建帐户。 不要忘记选中​测试用户档案​的复选框。

卡夫卡

然后,您将在X光面板上看到您的用户档案数据。

卡夫卡

您还会看到尚未记录任何体验事件。

卡夫卡

让我们将其更改,并将呼叫中心体验活动从Kafka发送到Adobe Experience Platform。

采用以下体验事件有效负载示例,并将其复制到文本编辑器中。

{
  "header": {
    "datasetId": "5fd1a9dea30603194baeea43",
    "imsOrgId": "907075E95BF479EC0A495C73@AdobeOrg",
    "source": {
      "name": "Launch"
    },
    "schemaRef": {
      "id": "https://ns.adobe.com/experienceplatform/schemas/b0190276c6e1e1e99cf56c99f4c07a6e517bf02091dcec90",
      "contentType": "application/vnd.adobe.xed-full+json;version=1"
    }
  },
  "body": {
    "xdmMeta": {
      "schemaRef": {
        "id": "https://ns.adobe.com/experienceplatform/schemas/b0190276c6e1e1e99cf56c99f4c07a6e517bf02091dcec90",
        "contentType": "application/vnd.adobe.xed-full+json;version=1"
      }
    },
    "xdmEntity": {
      "eventType": "callCenterInteractionKafka",
      "_id": "",
      "timestamp": "2021-08-12T15:04:03.630Z",
      "_experienceplatform": {
        "identification": {
          "core": {
            "phoneNumber": ""
          }
        },
        "interactionDetails": {
          "core": {
            "callCenterAgent": {
              "callID": "Support Contact - 3767767",
              "callTopic": "contract",
              "callFeeling": "negative"
            }
          }
        }
      }
    }
  }
}

然后你会看到这个。 您需要手动更新2个字段:

  • _id:请将其设置到ldap中,然后随机设置数字序列,例如:vangeluw1234
  • phoneNumber:输入刚在演示网站上创建的帐户的phoneNumber。您可以在X射线面板的​Identities​下找到它。
注意

对于每次数据摄取,字段​**_id**​必须是唯一的。 如果生成多个事件,请确保每次都将字段​**_id**​更新为新的唯一值。

卡夫卡

然后,您应该拥有如下内容:

卡夫卡

接下来,将完整的体验事件复制到剪贴板。 需要清空JSON有效负载的空格,为此,我们将使用在线工具。 转到http://jsonviewer.stack.hu/以执行此操作。

卡夫卡

将体验事件粘贴到编辑器中,然后单击​删除空格

卡夫卡

接下来,选择所有输出文本,并将其复制到剪贴板。

卡夫卡

返回到“终端”窗口。

卡夫卡

将不带空格的新有效负载粘贴到“终端”窗口中,然后单击​Enter

卡夫卡

接下来,返回到您的演示网站并刷新页面。 此时,您应会在配置文件的​其他事件​下看到一个体验事件,就像下面的事件一样:

卡夫卡

您已完成此练习。

下一步:摘要和优点

返回到模块24

返回到所有模块

在此页面上