(二)Kafka0.8.2官方文档中文版系列-API
2021-03-26 02:25
标签:isp icc 扩展 查看 black 初始 topic maven group 2. API 我们正在为Kafka重写JVM客户端。在Kafka0.8.2中,包含一个新重写的Java producer。下一个版本将包含一个等效的Java consumer。这些新客户端旨在取代现有的Scala客户端,但为了兼容性,它们将共存一段时间。这些客户端可以在一个独立的jar中使用,并且具有最小的依赖性,而旧的Scala客户端仍然与服务器打包在一起。 2.1 Producer API 在kafka0.8.2版本中,我们鼓励你使用新的java producer。这个客户端经过生产环境的测试,相比之前的scala客户端该客户端更快、有更多的特性。你可以通过添加如下maven依赖使用它: 在javadoc中可以查看如何使用producer。 对于那些对遗留Scala生产者api感兴趣的人,可以在这里找到相关信息。 你可以参考这个示例去学习如何使用high level 消费者API。 2.3 Simple Consumer API 对大多数应用来说,the high level API是完全够用的。一些应用想要使用一些high level API没有暴露的特性(比如说,当重启消费者时指定初始的offset,即偏移量)。他们可以使用我们的low level SimpleConsumer API。但是这个逻辑会有点复杂,你可以参考这个例子。 2.4 Kafka Hadoop Consumer API 我们的一个基本用例就是,为数据聚合和加载数据到hadoop提供一个水平扩展的解决方案。为了支持这个用户用例,我们提供了一个基于hadoop的消费者,它生成了许多map任务,以并行地从Kafka集群中拉取数据。这可以非常快速的将kafka的数据加载到hadoop中(我们只用了一些Kafka服务器就完全饱和了网络,意思就是基于hadoop的consumer拉取速度很快)。 使用hadoop consumer的信息,可以在这里找到。 (二)Kafka0.8.2官方文档中文版系列-API 标签:isp icc 扩展 查看 black 初始 topic maven group 原文地址:https://www.cnblogs.com/dreamfor123/p/9392521.html2.2 High Level Consumer API
class Consumer {
/**
* Create a ConsumerConnector 创建一个消费者连接器
*
* @param config at the minimum, need to specify the groupid of the consumer and the zookeeper
* connection string zookeeper.connect.
* 参数解释:基于一个最小的配置,你只需要指定消费者组,zookeeper的连接
*/
public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config);
}
/**
* V: type of the message 消息的类型
* K: type of the optional key assciated with the message 与消息相关的可选的配置
*/
public interface kafka.javaapi.consumer.ConsumerConnector {
/**
* Create a list of message streams of type T for each topic.
* 为每个topic创建一个T类型的消息流列表
*
* @param topicCountMap a map of (topic, #streams) pair
* @param decoder a decoder that converts from Message to T
* @return a map of (topic, list of KafkaStream) pairs.
* The number of items in the list is #streams. Each stream supports
* an iterator over message/metadata pairs.
*/
public
* 为每个topic创建一个T类型的消息流列表,使用默认的解码器
*/
public Map
* 为与通配符匹配的消息流创建一个消息流列表,使用默认的解码器
*/
public List
* 为与通配符匹配的消息流创建一个消息流列表,使用默认的解码器
*/
public List
* 提交通过该连接器关联的所有的topic和分区的偏移量
*/
public void commitOffsets();
/**
* Shut down the connector 关闭连接器
*/
public void shutdown();
}class kafka.javaapi.consumer.SimpleConsumer {
/**
* Fetch a set of messages from a topic.
* 从一个topic拉取消息
*
* @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
* 请求需要指定主题的名称、主题分区、起始偏移量、拉取数据的最大字节数
* @return a set of fetched messages
* 拉取回来的消息集合
*/
public FetchResponse fetch(kafka.javaapi.FetchRequest request);
/**
* Fetch metadata for a sequence of topics.
* 获取一系列主题的元数据
*
* @param request specifies the versionId, clientId, sequence of topics. 需要指定版本号、客户端ID、主题
* @return metadata for each topic in the request. 每个请求主题的元数据
*/
public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request);
/**
* Get a list of valid offsets (up to maxSize) before the given time.
* 在给定的时间之前,得到一个有效的偏移量列表(偏移量可以取到给定时间之前的最大值)
*
* @param request a [[kafka.javaapi.OffsetRequest]] object.
* @return a [[kafka.javaapi.OffsetResponse]] object.
*/
public kafka.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);
/**
* Close the SimpleConsumer. 关闭SimpleConsumer
*/
public void close();
}