ArcGIS GeoEvent Server与Kafka对接系列:02-用Java API创建主题、删除主题、发送消息、接收消息

0
分享 2019-01-23
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/peckerze ... 00326

在Eclipse中创建Maven Project

配置以下以来包:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>


创建主题

package kevin.com.cn;

import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;

/**
* 创建主题(Topic)客户端
* @author Kevin Zeng
*/
public class CreateTopicClient {

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.201:9092");
AdminClient adminClient = AdminClient.create(props);
ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
NewTopic newTopic = new NewTopic("Flights", 1, (short) 1); //定义topic的名称,分区数,副本数
topics.add(newTopic);
CreateTopicsResult result = adminClient.createTopics(topics);
try {
result.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}


发送消息

package kevin.com.cn;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
* 数据发布端 ,发送数据至Kafka
* @author Kevin Zeng
*/
public class ProducerClient {

public static void main(String[] args) {

//配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.144:9092");
props.put("acks", "all");
props.put("delivery.timeout.ms", 300000);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//初始化
Producer<String, String> producer = new KafkaProducer<>(props);
String value = ",116.333885,39.81922,6/29/2016 15:30:02";

long startTime=System.currentTimeMillis(); //获取开始时间
for (int i = 0; i < 100000; i++){
producer.send(new ProducerRecord<String, String>("Flights", Integer.toString(i), "A100"+Integer.toString(i)+value));
}
producer.close();
long endTime=System.currentTimeMillis(); //获取结束时间
System.out.println("数据发送至Kafka耗时: "+(endTime-startTime)+"ms");
}
}


订阅消息(接收消息)

package kevin.com.cn;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
* 数据订阅客户端
* @author Kevin Zeng
*/
public class ConsumerClient {

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.144:9092");
props.put("group.id", "Flights");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("Flights")); //配置订阅topic
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}


删除主题

备注:如果你的kafka是安装在Windows操作系统中,那么不管是用API还是用自带的工具删除主题可能都会出现kafka挂掉的问题,所以强烈建议采用Linux环境!
package kevin.com.cn;

import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteTopicsResult;

/**
* 删除主题(Topic)客户端
* @author Kevin Zeng
*
*/
public class DeleteTopicClient {

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.201:9092");
AdminClient adminClient = AdminClient.create(props);
ArrayList<String> topics = new ArrayList<String>();
topics.add("Flights");
DeleteTopicsResult result = adminClient.deleteTopics(topics);
try {
result.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

}


文章来源:https://blog.csdn.net/peckerzeng/article/details/86300326

0 个评论

要回复文章请先登录注册