Kafka核心API——Producer生产者
2021-03-05 04:29
标签:udp 提高 好的 creat rop 支持 实例 new 就是 在上文中介绍了AdminClient API的使用,现在我们已经知道如何在应用中通过API去管理Kafka了。但在大多应用开发中,我们最常面临的场景就是发送消息到Kafka,或者从Kafka中消费消息,也就是典型的生产/消费模式。而本文将要演示的就是如何使用Producer API将消息发送至Kafka中,使应用成为一个生产者。 Producer API具有以下几种发送模式: 接下来,使用一个简单的例子演示一下异步向Kafka发送消息。首先,我们需要创建一个 在本例中,消息的key和value均为 在 Tips: 然后调用 在 大致时序图如下: 发送消息的具体流程图如下: 运行以上代码,控制台输出内容如下: 如果想要在发送完消息后获取结果,比起直接调用 在 运行以上代码,控制台输出内容如下: 在某些特殊的业务场景下我们经常会有自定义负载均衡算法的需求,在Kafka中可以通过实现 本例中所实现的负载均衡算法比较简单,就是使用 然后在创建 我们首先要了解一下消息的传递语义,一般存在三种类型语义: 在Kafka中主要通过消息重发和ACK机制来保障消息的传递,消息重发机制主要是提高消息发送的成功率,并不能保证消息一定能发送成功。我们可以通过在创建 另一个消息传递保障机制就是ACK机制,Kafka中的ACK机制有三种模式,需要通过配置去指定。这三种配置的含义如下: 同样的该配置项可以在创建 上面的三种取值可以根据实际的业务场景来进行设置,消息的可靠性越强的,性能肯定就会越差。这三种取值就是在消息的可靠性以及性能两个方面做一个权衡: Kafka核心API——Producer生产者 标签:udp 提高 好的 creat rop 支持 实例 new 就是 原文地址:https://blog.51cto.com/zero01/2495999
Producer
实例,并且必须配置三个参数,分别是Kafka服务的ip地址及端口号,以及消息key和value的序列化器(消息体以key-value结构形式存在)。String
类型,所以使用StringSerializer
这个字符串类型的序列化器。代码示例:/**
* 创建Producer实例
*/
public static Producer
new KafkaProducer
时,构造器里做了什么:
Properties
里的配置项,初始化ProducerConfig
ProducerConfig
初始化一些配置字段MetricConfig
监控度量指标配置以及MetricsReporter
报告器列表和Metrics
存储库partitioner
负载均衡器,当有多个partition时就是通过这个负载均衡器去将消息均匀的分发到不同的partition中RecordAccumulator
,一个类似于计数器的东西,用于计算消息批次的。因为Producer
并不是接收到一条消息就发送到一条消息,而是达到一定批量后按批次发送的,所以需要有一个计数器来存储和计算批次。Sender
,然后会为其创建一个守护线程,并启动
KafkaProducer
构造器的源码,就会发现其所有的属性都是final
的,并且均在构造器中完成了初始化,不存在不安全的发布或共享变量,这也就变相说明了KafkaProducer
是线程安全的Producer
中的send
方法即可实现异步发送。代码示例:/**
* 演示Producer异步发送
*/
public static void producerAsyncSend() {
String topicName = "MyTopic";
String key = "test-key";
String value = "this is test message!";
try (Producer
producer.send(record)
里主要做了以下事情:
accumulator.append
向批次中追加消息sender.wakeup
在守护线程中去发送消息
Producer异步阻塞发送演示
send
方法会有一个Future
类型的返回值,当我们调用Future
的get
方法时,就会阻塞当前线程,此时就达到了异步阻塞发送消息的效果,即发送消息是异步的,获取结果是阻塞的。我们可以通过这种方式去获取Future
里存储的元数据信息。代码示例:/**
* 演示Producer异步阻塞式发送
*/
public static void producerAsyncBlockSend() throws Exception {
String topicName = "MyTopic";
String key = "test-key";
String value = "this is test message!";
try (Producer
hasTimestamp: true, timestamp: 1589637627231, hasOffset: true, offset: 5, partition: 1, topic: MyTopic
Producer异步回调发送演示
Future
的get
方法更好的方式是使用异步回调的消息发送形式。send
方法中支持传入一个回调函数,当消息发送完毕后,会调用回调函数并将结果当作参数传入,此时我们就可以在回调函数中对结果进行处理。代码示例:/**
* 演示Producer异步回调发送
*/
public static void producerAsyncCallbackSend() throws Exception {
String topicName = "MyTopic";
String key = "test-key";
String value = "this is test message!";
try (Producer
hasTimestamp: true, timestamp: 1589639553024, hasOffset: true, offset: 7, partition: 1, topic: MyTopic
自定义Partition负载均衡器
Partitioner
接口来自定义Partition负载均衡器。key
的hashcode
去对partition
的数量进行取余得出partition
的索引,代码示例:package com.zj.study.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定义Partition负载均衡器
*
* @author 01
* @date 2020-05-17
**/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key,
byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) {
int partitionsNum = cluster.partitionsForTopic(topic).size();
return key.hashCode() % partitionsNum;
}
@Override
public void close() {
}
@Override
public void configure(Map
Producer
实例时,指定MyPartitioner
的包名路径即可。代码示例:/**
* 创建Producer实例
*/
public static Producer
Kafka的消息传递保障
Producer
实例时,设置retries
配置项来开启或关闭消息重发机制,代码示例:// 设置的值为0表示关闭,大于0则表示开启
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "0");
Producer
实例时进行设置,代码示例:properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
acks=0
acks=1
acks=all