kafka(2): Java实现简单的Producer和Consumer

2020-12-29 04:28

阅读:698

标签:eset   version   des   sage   apache   public   cep   slist   序列化   

Maven   pom.xml

xml version="1.0" encoding="UTF-8"?>
project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    modelVersion>4.0.0modelVersion>

    groupId>com.jjgroupId>
    artifactId>ak02artifactId>
    version>1.0.0version>

    properties>
        kafka.version>1.1.0kafka.version>
        project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
    properties>


    dependencies>
        dependency>
            groupId>org.apache.kafkagroupId>
            artifactId>kafka-clientsartifactId>
            version>${kafka.version}version>
        dependency>

        dependency>
            groupId>org.slf4jgroupId>
            artifactId>slf4j-simpleartifactId>
            version>1.7.25version>
            scope>compilescope>
        dependency>
    dependencies>

    build>
        plugins>
            plugin>
                groupId>org.apache.maven.pluginsgroupId>
                artifactId>maven-compiler-pluginartifactId>
                version>3.7.0version>
                configuration>
                    source>1.8source>
                    target>1.8target>
                configuration>
            plugin>
        plugins>
    build>

project>
BasicProducer.java
package com.jj.ak02;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class BasicProducer {
    public static void main(String[] args) {
        // 步驟1. 設定要連線到Kafka集群的相關設定
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092"); // Kafka集群在那裡?
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定msgKey的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定msgValue的序列化器

        // 步驟2. 產生一個Kafka的Producer的實例
        Producer producer = new KafkaProducer(props);

        // 步驟3. 指定想要發佈訊息的topic名稱
        String topicName = "test";

        int msgCounter = 0;

        try {
            System.out.println("Start sending messages ...");

            // 步驟4. 產生要發佈到Kafka的訊息 (把訊息封裝進一個ProducerRecord的實例中)
            //    - 參數#1: topicName
            //    - 參數#2: msgKey
            //    - 參數#3: msgValue
            producer.send(new ProducerRecord(topicName, null, "Hello"));
            producer.send(new ProducerRecord(topicName, null, "Hello2"));
            producer.send(new ProducerRecord(topicName, "8703147", "Hello3"));
            producer.send(new ProducerRecord(topicName, "8703147", "Hello4"));

            msgCounter+=4;
            System.out.println("Send " + msgCounter + " messages to Kafka");

        } catch (Exception e) {
            // 錯誤處理
            e.printStackTrace();
        }

        // 步驟5. 關掉Producer實例的連線
        producer.close();

        System.out.println("Message sending completed!");
    }
}
BasicConsumer.java
package com.wistron.witlab4.ak02;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.record.TimestampType;

import java.util.Arrays;
import java.util.Properties;

public class BasicConsumer {
    public static void main(String[] args) {
        // 步驟1. 設定要連線到Kafka集群的相關設定
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka集群在那裡?
        props.put("group.id", "my-group"); // 
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定msgKey的反序列化器
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定msgValue的反序列化器
        props.put("auto.offset.reset", "earliest"); // 是否從這個ConsumerGroup尚未讀取的partition/offset開始讀
        // 步驟2. 產生一個Kafka的Consumer的實例
        Consumer consumer = new KafkaConsumer(props);
        // 步驟3. 指定想要訂閱訊息的topic名稱
        String topicName = "test";
        // 步驟4. 讓Consumer向Kafka集群訂閱指定的topic
        consumer.subscribe(Arrays.asList(topicName));
        // 步驟5. 持續的拉取Kafka有進來的訊息
        try {
            System.out.println("Start listen incoming messages ...");
            while (true) {
                // 請求Kafka把新的訊息吐出來
                ConsumerRecords records = consumer.poll(1000);
                // 如果有任何新的訊息就會進到下面的迭代
                for (ConsumerRecord record : records){
                    // ** 在這裡進行商業邏輯與訊息處理 **
                    // 取出相關的metadata
                    String topic = record.topic();
                    int partition = record.partition();
                    long offset = record.offset();
                    TimestampType timestampType = record.timestampType();
                    long timestamp = record.timestamp();
                    // 取出msgKey與msgValue
                    String msgKey = record.key();
                    String msgValue = record.value();
                    // 秀出metadata與msgKey & msgValue訊息
                    System.out.println(topic + "-" + partition + "-" + offset + " : (" + record.key() + ", " + record.value() + ")");
                }
            }
        } finally {
            // 步驟6. 如果收到結束程式的訊號時關掉Consumer實例的連線
            consumer.close();
            System.out.println("Stop listen incoming messages");
        }
    }
}

 

kafka(2): Java实现简单的Producer和Consumer

标签:eset   version   des   sage   apache   public   cep   slist   序列化   

原文地址:https://www.cnblogs.com/fangjb/p/13025972.html


评论


亲,登录后才可以留言!