kafka急速入门与核心API解析

2021-03-11 05:32

阅读:513

kafka环境安装


上一节课我们已经对kafka的基本概念、核心思想有了一定的了解和认知,并且掌握了kafka在实际工作中的一些主要的应用场景。那么接下来,我们就一起快速进入kafka的安装吧。

  • kafka下载地址:http://kafka.apache.org/downloads.html

  • kafka安装步骤:首先kafka安装需要依赖与zookeeper,所以小伙伴们先准备好zookeeper环境(三个节点即可),然后我们来一起构建kafka broker。

    ## 解压命令:
    tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/
    ## 改名命令:
    mv kafka_2.12-2.1.0/ kafka_2.12
    ## 进入解压后的目录,修改server.properties文件:
    vim /usr/local/kafka_2.12/config/server.properties
    ## 修改配置:
    broker.id=0
    port=9092
    host.name=192.168.11.51
    advertised.host.name=192.168.11.51
    log.dirs=/usr/local/kafka_2.12/kafka-logs
    num.partitions=2
    zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181
    
    ## 建立日志文件夹:
    mkdir /usr/local/kafka_2.12/kafka-logs
    
    ##启动kafka:
    /usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &
    

kafka常用命令


我们接下来一起了解几个非常重要的命令,通过这些命令我们对kafka topic partition 进行查看和操作。

  • 常用命令:

    ## 简单操作:
    #(1)创建topic主题命令:(创建名为test的topic, 1个分区分别存放数据,数据备份总共1份)
    kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic topic1 --partitions 1  --replication-factor 1  
    ## --zookeeper 为zk服务列表
    ## --create 命令后 --topic 为创建topic 并指定 topic name
    ## --partitions 为指定分区数量
    ## --replication-factor 为指定副本集数量
    
    #(2)查看topic列表命令:
    kafka-topics.sh --zookeeper 192.168.11.111:2181 --list
    
    #(3)kafka命令发送数据:(然后我们就可以编写数据发送出去了)
    kafka-console-producer.sh --broker-list 192.168.11.51:9092 --topic topic1
    
    #(4)kafka命令接受数据:(然后我们就可以看到消费的信息了)
    kafka-console-consumer.sh --bootstrap-server 192.168.11.51:9092 --topic topic1 --from-beginning
    
    
    #(5)删除topic命令:
    kafka-topics.sh --zookeeper 192.168.11.111:2181 --delete --topic topic1
    
    #(6)kafka查看消费进度:(当我们需要查看一个消费者组的消费进度时,则使用下面的命令)
    kafka-consumer-groups.sh --bootstrap-server 192.168.11.51:9092 --describe --group group1
    ## --describe --group 为订阅组, 后面指定 group name
    

急速入门


下面我们一起使用kafka最基本的API来对kafka进行操作!

  • kafka依赖包:

    >
        >org.apache.kafka>
        >kafka_2.12>
    > 
    
  • kafka生产者:

    package com.bfxy.mix.kafka;
    
    import java.util.Properties;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import com.alibaba.fastjson.JSON;
    
    public class CollectKafkaProducer {
    
        // 创建一个kafka生产者
    	private final KafkaProducer, String> producer;
    	// 定义一个成员变量为topic
    	private final String topic;
        
         // 初始化kafka的配置文件和实例:Properties & KafkaProducer
    	public CollectKafkaProducer(String topic) { 
    		Properties props = new Properties(); 
             // 配置broker地址
    		props.put("bootstrap.servers", "192.168.11.51:9092"); 
             // 定义一个 client.id
    		props.put("client.id", "demo-producer-test"); 
    		
             // 其他配置项:
            
    //		props.put("batch.size", 16384);			//16KB -> 满足16KB发送批量消息
    //		props.put("linger.ms", 10); 			//10ms -> 满足10ms时间间隔发送批量消息
    //		props.put("buffer.memory", 33554432);	 //32M -> 缓存提性能
    		
             // kafka 序列化配置:
    		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
             
             // 创建 KafkaProducer 与 接收 topic
    		this.producer = new KafkaProducer>(props);
    		this.topic = topic; 
    	}
    
        // 发送消息 (同步或者异步)
    	public void send(Object message, boolean syncSend) throws InterruptedException { 
    		try { 
                 // 同步发送
    			if(syncSend) {
    				producer.send(new ProducerRecord>(topic, JSON.toJSONString(message)));		
    			} 
                 // 异步发送(callback实现回调监听)
                 else {
    				producer.send(new ProducerRecord>(topic, 
                                  JSON.toJSONString(message)), 
                                  new Callback() {
    					@Override
    					public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    	                    if (e != null) {
    	                        System.err.println("Unable to write to Kafka in CollectKafkaProducer [" + topic + "] exception: " + e);
    	                    }
    					}
    				});				
    			}
    		} catch (Exception e) {
    			e.printStackTrace(); 
    		} 
    	} 
    	
        // 关闭producer
    	public void close() {
    		producer.close();
    	}
    
        


评论


亲,登录后才可以留言!