kafka学习总结007 --- 生产者Java API实例

2021-05-11 02:28

阅读:555

标签:end   查看   load   rap   exce   final   lazy   ring   comm   

事先说明,本文的所有实例均基于kafka2.5.0开发;依赖的jar包

        org.apache.kafka
            kafka-clients
            2.5.0

创建topic的方法

  private static final String BOOTSTRAP_SERVER = "192.168.1.8:9091,192.168.1.8:9092,192.168.1.8:9093";
  public static void createTopic(String topicName) {
        Properties properties = new Properties();
        properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        AdminClient adminClient = KafkaAdminClient.create(properties);
        NewTopic newTopic = new NewTopic(topicName,2, (short) 3);
        adminClient.createTopics(Collections.singletonList(newTopic));
        adminClient.close();
    }

执行后创建topic成功:

技术图片

kafka生产消息有同步和异步两种方式:

1. 创建生产者的方法

    public static KafkaProducer createProducer() {
        Properties properties = new Properties();
        properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer(properties);
    }

2. 同步生产消息

public class MySyncProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
KafkaProducer producer = KafkaTestUtil.createProducer();
ProducerRecord producerRecord = new ProducerRecord(KafkaTestContants.SECOND_TOPIC, "first msg");
RecordMetadata record = producer.send(producerRecord).get();
System.out.println("Producer msg: partition=" + record.partition() + ", offset=" + record.offset());
producer.close();
}
}

3. 异步生产消息

public class MyAsyncProducer {
    public static void main(String[] args) {
        KafkaProducer producer = KafkaTestUtil.createProducer();
        for (int i = 0; i ) {
            ProducerRecord producerRecord = new ProducerRecord(KafkaTestContants.SECOND_TOPIC, "0617 msg" + i);
            producer.send(producerRecord, (RecordMetadata metadata, Exception exception) -> {
                if (null != exception) {
                    exception.printStackTrace();
                }

                if (null != metadata) {
                    System.out.println("Producer msg: partition=" + metadata.partition() + ", offset=" + metadata.offset());
                }
            });
        }
        producer.close();
    }
}

相关命令:

查看topic详细信息:
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic java-api-0617-topic --describe

技术图片

查看kafka log数据:
bin/kafka-run-class.sh kafka.tools.DumpLogSegments -print-data-log -files kafka-logs/java-api-0617-topic-0/00000000000000000000.log

技术图片

查看某个消费组消费情况:
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.8:9091,192.168.1.8:9092,192.168.1.8:9093 --group group1 --describe

技术图片

 

kafka学习总结007 --- 生产者Java API实例

标签:end   查看   load   rap   exce   final   lazy   ring   comm   

原文地址:https://www.cnblogs.com/sniffs/p/13149527.html


评论


亲,登录后才可以留言!