Kafka Consumer API示例

2021-02-05 20:15

阅读:689

标签:推送   反序列化   ext   lan   out   seek   自定义对象   选择   trap   

既然翻到这里,默认就认为已经基本掌握了Kafka的基础知识,本小结主要给出一次使用Kafka Consumer API的示例。我们都知道Kafka API有旧版(0.8版之前)和新版(0.9版之后),这里讲的是新版,官网KafkaConsumer有更详细介绍,可自行前往~

1 环境配置

  • 操作系统: Ubuntu 16.04
  • kafka_2.11-0.10.2.2
  • JDK: 1.8.0_181
  • IntelliJ IDEA Maven
  • VNC

2 操作过程

pom.xml文件

4.0.0com.kafka.test
    test
    1.0-SNAPSHOTmaven-kafkahttp://maven.apache.orgorg.apache.kafka
            kafka-clients
            0.10.2.2
            kafka_2.11
            org.slf4j
            slf4j-api
            1.7.21org.xerial.snappy
            snappy-java
            1.1.2.6

kafka consumer消费数据,未SASL认证,这里的代码只能消费生产者正在推送的数据:

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;


public class KafkaConsumerTest {
    public static void main(String[] args) throws Exception {

        // Kafka consumer configuration settings
        String topicName = "XXXX";
        Properties props = new Properties();
        
        props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094");
        props.put("group.id", "XXXX");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("offsets.storage", "kafka");

        // 要发送自定义对象,需要指定对象的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        @SuppressWarnings("resource")
        final KafkaConsumer consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList(topicName));

        while (true) {
            ConsumerRecords records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.println(record.value());
            }
        }

    }
}

kafka consumer消费数据,SASL认证,可以消费历史数据:

import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;


public class KafkaConsumerTest {
    public static void main(String[] args) throws Exception {

      
        String topicName = "XXXX";
        Properties props = new Properties();



        System.setProperty("java.security.auth.login.config", "/opt/kafka/kafka1/kafka_2.11-0.10.2.2/config/kafka_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");

        props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094");
        props.put("group.id", "XXXX");        
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("offsets.storage", "kafka");
        props.put("max.poll.records",1000);

        // 要发送自定义对象,需要指定对象的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        @SuppressWarnings("resource")

        final KafkaConsumer consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection collection) {

            }

            @Override
            public void onPartitionsAssigned(Collection collection) {
                Map beginningOffset = consumer.beginningOffsets(collection);

                //--from-beginning
                for(Map.Entry entry : beginningOffset.entrySet()){
                    //seekToBeginning()
                    consumer.seekToBeginning(collection);
                }

            }
        });

        while (true) {
            ConsumerRecords records = consumer.poll(20000);
            for (ConsumerRecord record : records) {
                System.out.println(record.value());
            }
        }


    }
}

另外,选择把kafka作为消息的中间件,主要是拿到数据持久化到本地或者HDFS待分析挖掘出重要的信息,可以使用Sparkstreaming存到HDFS,这里给出从控制台信息存到了本地磁盘。

import java.io.*;


public class IO2File {
    public static void main(String[] args) throws IOException{
        File f = new File("out.json");
        f.createNewFile();
        FileOutputStream fileOutputStream = new FileOutputStream(f);
        PrintStream printStream = new PrintStream(fileOutputStream);
        System.setOut(printStream);
        System.out.println("xxxxxxx out.json");//  结合上面的代码 直接把kafka消息的信息打印到控制台 然后存到磁盘
    }
}

有时间再把Kafka基本原理 存储 配置信息 SASL授权 Spark都总结出来。

Kafka Consumer API示例

标签:推送   反序列化   ext   lan   out   seek   自定义对象   选择   trap   

原文地址:https://www.cnblogs.com/eugene0/p/11437459.html


评论


亲,登录后才可以留言!