kafka基础命令及api使用

2021-03-08 01:27

阅读:704

标签:group   list   生产   ica   boot   test   keep   prope   err   

一、Kafka 0.11

参考文档
(1)https://kafka.apache.org/0110/documentation.html

二、kafka 0.8
1、命令行操作
(1)新建topic

> bin/kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic msg_format_v0

(2)发送消息

bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic msg_format_v0

2、API使用
(1)pom依赖

    org.apache.kafka
      kafka-clients
      0.8.2.1

(2)生产者api使用

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class TestProducer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer producer = new KafkaProducer(props);
        //topic: 目标topic;  key: message的序号;   value: 写入的message信息;
        producer.send(new ProducerRecord("msg_format_v0", "key", "value"));

        //当不需要指定key值时,采用下面的方法
        //Producer producer2 = new KafkaProducer(props);
        //producer2.send(new ProducerRecord("msg_format_v1", "value"));

        producer.close();

    }
}

参考文档
(1)https://kafka.apache.org/082/documentation.html#producerapi

kafka基础命令及api使用

标签:group   list   生产   ica   boot   test   keep   prope   err   

原文地址:https://www.cnblogs.com/hxuhongming/p/12812832.html


评论


亲,登录后才可以留言!