GeoEvent使用kafka作为输入

0
分享 2017-08-18
做了一回搬运工和实践者。原教程地址:http://www.arcgis.com/home/ite ... 0d671由GeoEvent团队提供。下载的教程附带数据、脚本等。
一、部署过程
Apache Kafka安装配置
1.安装ArcGIS Server10.5.1 和ArcGIS GeoEvent Server10.5.1
2.下载并安装jdk8 网址:http://www.oracle.com/technetw ... .html
3.下载Apache Kafka version 0.10.1.1 网址:https://www.apache.org/dyn/clo ... 1.tgz
4.解压并移动到c:\kafka目录
5.设置KAFKA_HOME系统变量,值为C:\kafka\kafka_2.11-0.10.1.1
注意:kafka的ZooKeeper端口需要更改,因为和GeoEvent使用的Zookeeper端口冲突。
6.打开C:\kafka\Kafka_2.11-0.10.1.1\config
7.更改zookeeper.properties文件中的clientPort属性从2181变成2183;server.properties文件中的zookeeper.connect属性中的2181改为2183。
8.运行教程中\scripts文件夹下的run-zookeeper命令
9.运行教程中\scprits文件夹下的run-kafka命令
10.运行教程中\scprits文件夹下的create-topics命令,默认创建一个Flights topic
添加kafka传输到GeoEvent Server
1.打开ArcGIS GeoEvent Manager然后打开站点>组件>传输
2.点击添加本地传输,浏览到教程中的\component文件夹中的kafka-transport-10.5.0.jar
3.点击打开
4.点击添加传输到GeoEvent Server,如果添加成功,将出现一个消息。
5.检查列表中是否有Apache Kafka Inbound Transport 和 Apache Kafka Outbound Transport


添加传输
注意:如果kafka传输出现在传输列表中,则转到下一节。如果在导入之后,kafka传输没有出现在传输列表中,请按照以下步骤执行:
1,进入\deploy文件夹,将文件夹中的kafka-transport-10.5.0.jar文件后缀更改为kafka-transport-10.5.0.jar.OLD文件。注意后缀名是.jar.OLD,经测试.OLD不可行,.jar.OLD才可以。2,然后再复制一个kafka-transport-10.5.0.jar文件到\deploy目录下,这时目录中存在一个kafka-transport-10.5.0.jar和一个kafka-transport-10.5.0.jar.OLD
3,在GeoEvent Manager的组件传输页面中https://ip:6143/geoevent/manager/components.html,刷新此网页,就会出现kafka出站和入站的两个传输类型
导入一个GeoEvent Server配置
1.打开ArcGIS GeoEvent Manager然后打开站点>GeoEvent>配置存储 点击导入配置。
2.浏览到教程中\configurations文件夹中的kafka_config.xml。
3.点击导入配置
4.访问站点>GeoEvent>连接器,搜索kafka。可以看到Publish Text to Kafka 和 Receive Text from Kafka两个连接器出现。


导入连接器配置
配置输入连接器来接收kafka的消息
1.访问服务>输入,点击添加输入
2.选择Receive Text from Apache Kafka输入连接器类型

kafka输入连接器
3.现在,如下设定Receive Text from Apache Kafka类型的输入


输入的设置
4.点击保存
5.访问 服务>输出,添加输出
6.选择Write to a JSON File类型
7.注册并使用文件夹c:/shuju,其余参数按默认值。
8.点击保存
9.访问服务>监控页面,确认新的输入和输出已存在。
10.访问 服务>GeoEvent服务,点击添加服务
12.命名为KafkaInDataLogger点击创建。
13.拖拽kafka-text-in 输入和 file-json-out输出,并使前者指向后者。

服务详情
14.点击发布
15.用记事本打开教程中\simulations文件夹中的Flights.csv文件,并复制第一行。
16.运行\scrpits下的run-producer-test.bat命令。
17.在run-producer-test命令行中粘贴复制的数据,敲击回车键。


kafka生产者
18.在c:/shuju文件夹中,将看到此数据。


GeoEvent成功接收并输出
19.在GeoEvent Manager中将看到,count增加了1。


count增加
20.在GeoEvent定义中,找到Flight,点击编辑。


增加了一个topic同名定义
21.修改字段类型和标签,如下:


编辑定义
至此,GeoEvent成功接入kafka。
二、测试过程
主要测试此环境在重启时的稳定性。
重启kafka,发送消息,GeoEvent依然正常收到
1.在kafka命令窗口ctrl+c,Y停止。
2.在zookeeper命令窗口ctrl+c,Y停止。


kafka停止
3.运行教程中\scripts文件夹下的run-zookeeper命令
4.运行教程中\scprits文件夹下的run-kafka命令
5.运行\scrpits下的run-producer-test.bat命令。
6.在run-producer-test命令行中粘贴复制的一条数据,敲击回车键。
7.GeoEvent监控页面此服务输入输出数量+1,c:/shuju下的json文件中增加了相同内容的json数据。
重启GeoEvent,发送消息,GeoEvent依然正常收到
在windows服务中停止,然后开启GeoEvent服务。其余步骤同上。
重启计算机,发送消息,GeoEvent依然正常收到
重启计算机,其余步骤基本一致,需要注意的是,若重启计算机后GeoEvent服务自动启动,后启动zookeeper和kafka。那么有可能会出现输入找不到的情况。
将ArcGIS GeoEvent Server服务设置为手动启动。并且在每次此服务启动前,确保输入端的kafka和zookeeper已运行。即可解决。
只要保证kafka和GeoEvent启动的顺序,kafka输入没有丢失情况。
需要注意的关键步骤如下:
1.一定要在kafka运行状态下并且创建完topic后,再在GeoEvent中导入kafka传输。
2.导入配置后,连接器自动出现,而不是手动创建连接器。
3.注意kafka的版本为Kafka_2.11-0.10.1.1。
4.重启时一定要保证kafka和zookeeper已运行,再启动ArcGIS GeoEvent。
文章来源:http://www.jianshu.com/p/f3c75dc5b3fc

1 个评论

经过测试 10.6.1 可以接入kafka_2.11-2.1.0 使用以上步骤兼容

要回复文章请先登录注册