消息队列之kafka(API)
2021-06-23 08:04
标签:反馈 import java boot 创建 mes l命令 stream serialize 生产者: 消费者: 消息队列之kafka(API) 标签:反馈 import java boot 创建 mes l命令 stream serialize 原文地址:http://blog.51cto.com/14048416/2337337解决相关依赖:
packagecom.zy.kafka;
importjava.util.Properties;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
publicclassKafkaTest {
publicstaticvoidmain(String[] args) {
//1.加载配置文件
//1.1封装配置文件对象
Properties prps=newProperties();
//配置broker地址
prps.put("bootstrap.servers", "hadoop02:9092");
//配置ack级别:0 1 -1(all)
prps.put("acks", "all");
//重试次数
prps.put("retries", 3);
prps.put("batch.size", 16384);
prps.put("linger.ms",1);
prps.put("buffer.memory", 33554432);
//指定(message的K-V)的序列化
prps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2.创建生产者对象(指定的key和value的泛型)
Producer
packagecom.zy.kafka;
importjava.util.Arrays;
importjava.util.Properties;
importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.apache.kafka.clients.consumer.ConsumerRecords;
importorg.apache.kafka.clients.consumer.KafkaConsumer;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
publicclassKafkaTest {
publicstaticvoidmain(String[] args) {
//1.加载配置文件
//1.1封装配置文件对象
Properties prps=newProperties();
//配置broker地址
prps.put("bootstrap.servers", "hadoop02:9092");
//指定消费的组的ID
prps.put("group.id", "test");
//是否启动自动提交(是否自动提交反馈信息,向zookeeper提交)
prps.put("enable.auto.commit", "true");
//自动提交的时间间隔
prps.put("auto.commit.interval.ms", "1000");
//指定(message的K-V)的序列化
prps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建kafka的消费者
KafkaConsumer
2.以shell命令的方式API
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import kafka.admin.TopicCommand;
public class KafkaAPI {
public static void main(String[] args) throws IOException {
/*
kafka-topics.sh --create --zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 --replication-factor 3 --partitions 10 --topic kafka_test11
*/
//创建一个topic
String ops[]=new String []{
"--create",
"--zookeeper","hadoop01:2181,hadoop02:2181,hadoop03:2181",
"--replication-factor","3",
"--topic","zy_topic","--partitions","5"
};
String list[]=new String[] {
"--list",
"--zookeeper",
"hadoop01:2181,hadoop02:2181,hadoop03:2181"
};
//以命令的方式提交
TopicCommand.main(list);
}
}