kafka(2): Java实现简单的Producer和Consumer
2020-12-29 04:28
标签:eset version des sage apache public cep slist 序列化 Maven pom.xml kafka(2): Java实现简单的Producer和Consumer 标签:eset version des sage apache public cep slist 序列化 原文地址:https://www.cnblogs.com/fangjb/p/13025972.htmlxml version="1.0" encoding="UTF-8"?>
project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
modelVersion>4.0.0modelVersion>
groupId>com.jjgroupId>
artifactId>ak02artifactId>
version>1.0.0version>
properties>
kafka.version>1.1.0kafka.version>
project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
properties>
dependencies>
dependency>
groupId>org.apache.kafkagroupId>
artifactId>kafka-clientsartifactId>
version>${kafka.version}version>
dependency>
dependency>
groupId>org.slf4jgroupId>
artifactId>slf4j-simpleartifactId>
version>1.7.25version>
scope>compilescope>
dependency>
dependencies>
build>
plugins>
plugin>
groupId>org.apache.maven.pluginsgroupId>
artifactId>maven-compiler-pluginartifactId>
version>3.7.0version>
configuration>
source>1.8source>
target>1.8target>
configuration>
plugin>
plugins>
build>
project>
BasicProducer.java
package com.jj.ak02;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class BasicProducer {
public static void main(String[] args) {
// 步驟1. 設定要連線到Kafka集群的相關設定
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群在那裡?
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定msgKey的序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定msgValue的序列化器
// 步驟2. 產生一個Kafka的Producer的實例
Producer
BasicConsumer.java
package com.wistron.witlab4.ak02;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.record.TimestampType;
import java.util.Arrays;
import java.util.Properties;
public class BasicConsumer {
public static void main(String[] args) {
// 步驟1. 設定要連線到Kafka集群的相關設定
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群在那裡?
props.put("group.id", "my-group"); //
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定msgKey的反序列化器
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定msgValue的反序列化器
props.put("auto.offset.reset", "earliest"); // 是否從這個ConsumerGroup尚未讀取的partition/offset開始讀
// 步驟2. 產生一個Kafka的Consumer的實例
Consumer
上一篇:C#反射赋值(更新时用)
下一篇:ring buf中柔性数组的应用
文章标题:kafka(2): Java实现简单的Producer和Consumer
文章链接:http://soscw.com/essay/38971.html