Kafka Consumer API示例
2021-02-05 20:15
标签:推送 反序列化 ext lan out seek 自定义对象 选择 trap 既然翻到这里,默认就认为已经基本掌握了Kafka的基础知识,本小结主要给出一次使用Kafka Consumer API的示例。我们都知道Kafka API有旧版(0.8版之前)和新版(0.9版之后),这里讲的是新版,官网KafkaConsumer有更详细介绍,可自行前往~ pom.xml文件 kafka consumer消费数据,未SASL认证,这里的代码只能消费生产者正在推送的数据: kafka consumer消费数据,SASL认证,可以消费历史数据: 另外,选择把kafka作为消息的中间件,主要是拿到数据持久化到本地或者HDFS待分析挖掘出重要的信息,可以使用Sparkstreaming存到HDFS,这里给出从控制台信息存到了本地磁盘。 有时间再把Kafka基本原理 存储 配置信息 SASL授权 Spark都总结出来。 Kafka Consumer API示例 标签:推送 反序列化 ext lan out seek 自定义对象 选择 trap 原文地址:https://www.cnblogs.com/eugene0/p/11437459.html1 环境配置
2 操作过程
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class KafkaConsumerTest {
public static void main(String[] args) throws Exception {
// Kafka consumer configuration settings
String topicName = "XXXX";
Properties props = new Properties();
props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094");
props.put("group.id", "XXXX");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("offsets.storage", "kafka");
// 要发送自定义对象,需要指定对象的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
@SuppressWarnings("resource")
final KafkaConsumer
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class KafkaConsumerTest {
public static void main(String[] args) throws Exception {
String topicName = "XXXX";
Properties props = new Properties();
System.setProperty("java.security.auth.login.config", "/opt/kafka/kafka1/kafka_2.11-0.10.2.2/config/kafka_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094");
props.put("group.id", "XXXX");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("offsets.storage", "kafka");
props.put("max.poll.records",1000);
// 要发送自定义对象,需要指定对象的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
@SuppressWarnings("resource")
final KafkaConsumer
import java.io.*;
public class IO2File {
public static void main(String[] args) throws IOException{
File f = new File("out.json");
f.createNewFile();
FileOutputStream fileOutputStream = new FileOutputStream(f);
PrintStream printStream = new PrintStream(fileOutputStream);
System.setOut(printStream);
System.out.println("xxxxxxx out.json");// 结合上面的代码 直接把kafka消息的信息打印到控制台 然后存到磁盘
}
}
上一篇:C#关键字:访问修饰符
文章标题:Kafka Consumer API示例
文章链接:http://soscw.com/index.php/essay/51483.html