kafka(3)API使用

2021-07-12 03:07

阅读:564

标签:规则   manual   技术   ted   代理信息   执行   ada   interrupt   final   

相关依赖



org.apache.kafka
kafka_2.11
0.10.1.1



org.apache.kafka
kafka-clients
0.10.1.1

 

一个简单的Kafka生产者一般步骤如下:

创建 Properties 对象,设置生产者级别配置。以下3个配置是必须指定的。

(1)

  bootstrap.servers 配置连接 Kafka 代理列表,不必包含 Kafka 集群所有的代理地址,当 连接上一个代理后,会从集群元数据信息中获取其他存活的代理信息。但为了保证能 够成功连上 Kafka 集群 在多代理集群的情况下建议至少配置两个代理。

  key.serializer :配置用于序列化消息 Key 的类。

  value.serializer :配置用于序列化消息实际数据的类。

(2)根据 Properties 对象实例化一个 KafkaProducer 对象。

(3)实例化 ProducerRecord 对象, 每条消息对应一个 ProducerRecord 对象。

(4)调用 KafkaProducer 发送消息的方法将 ProducerRecord 发送到 Kafka 相应节点。 Kafka提供了两个发送消息的方法,即 send(ProducerRecord record 方法和sendσroducerRecord record,Callback callback)方法,带有回调函数的 send() 方法要实现 org.apache kafka.clients.producer Callback 接口。如果消息发送发生异常, Callback 接口的 onCompletion会捕获到相应异常。 KafkaProducer 默认是异步发送消息, 会将消息缓存到消息缓冲区中,当消息 在消息缓冲区中累计到一定数量后作为一个 RecordBatch 再发迭。生产者发送消息实质分两个阶段:第一阶段是将消息发送到消息缓冲区;第二阶段是 Sender 线程负责将缓冲区的消息发送到代理,执行真正的I/O操作,而在第一阶段执行完后就返回一个Future 象,根据对Future对象处理方式的不同,KafkaProducer 支持两种发送消息方式。

 

技术分享图片

 

 

package com.kafka.action.chapter6.producer;

import java.text.DecimalFormat;
import java.util.Properties;
import java.util.Random;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import com.kafka.action.chapter6.dto.StockQuotationinfo;


/**
 * 
* @Title: QuotationProducer.java  
* @Package com.kafka.action.chapter6.producer  
* @Description: 单线程生产者
* @author licl 
* @date 2018年9月9日
 */
public class QuotationProducer {
	// 设置实例生产消息的总数
	private static final int MSG_SIZE = 100;
	// 主题名称
	private static final String TOPIC = "test";
	// kafka集群
	private static final String BROKER_LIST = "192.168.1.106:9092";

	private static KafkaProducer producer = null;
	static {
		/*
		 * I I 1. 构造用于实例化 Kaf kaProducer Properties 信息
		 */
		Properties configs = initConfig();
		// II 2. 初始化一个 KafkaProducer
		producer = new KafkaProducer(configs);
	}

	/*
	 * 初始化 Kafka 配置
	 */

	private static Properties initConfig() {
		Properties properties = new Properties();
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
				StringSerializer.class.getName());
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
				StringSerializer.class.getName());
		return properties;
	}

	// 生产股票行情信息

	private static StockQuotationinfo createQuotationinfo() {
		StockQuotationinfo quotationinfo = new StockQuotationinfo();
		// 随机产生 1-10 之间的整数,然后与 600100 相加组成股票代码
		Random r = new Random();
		Integer stockCode = 600100 + r.nextInt(10);
		// /随机产生一个 0-1之间的浮点数
		float random = (float) Math.random();
		// 设置涨跌规则
		if (random / 2  record = null;
		StockQuotationinfo quotationinfo = null;
		try {
			int num = 0;
			for (int i = 0; i (TOPIC, null,
						quotationinfo.getTradeTime(),
						quotationinfo.getStockCode(), quotationinfo.toString());
				// 异步发送消息
				// 1.正常发送
				//producer.send(record);
				
				
				
				// 2.指定回调实现逻辑
				producer.send(record, new Callback() {
					
					@Override
					public void onCompletion(RecordMetadata metadata, Exception exception) {
						if(exception != null){
							System.out.println("Send message occurs exception");
							exception.printStackTrace();
						}
						if(exception == null){
							System.out.println(String.format("offset:%s,partition:%s", metadata.offset(),metadata.partition()));
						}
						
					}
				});
				if (num++ % 10 == 0) {
					// 休眠 2s
					Thread.sleep(2000L);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();

		}finally{
			producer.close();
		}

	}

}

  

package com.kafka.action.chapter6.producer;

import java.text.DecimalFormat;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
 * 
* @Title: QuotationProducer.java  
* @Package com.kafka.action.chapter6.producer  
* @Description: 多线程生产者
* @date 2018年9月9日
 */
import com.kafka.action.chapter6.dto.StockQuotationinfo;

public class KafkaProducerThread implements Runnable {
	
	// 设置实例生产消息的总数
	private static final int MSG_SIZE = 100;
	private static final String TOPIC = "test";
	private KafkaProducer producer = null;
	private ProducerRecord record = null;
	StockQuotationinfo quotationinfo = null;

	ExecutorService executor = Executors.newFixedThreadPool(10);
	long current = System.currentTimeMillis();

	private static StockQuotationinfo createQuotationinfo() {
		StockQuotationinfo quotationinfo = new StockQuotationinfo();
		// 随机产生 1-10 之间的整数,然后与 600100 相加组成股票代码
		Random r = new Random();
		Integer stockCode = 600100 + r.nextInt(10);
		// /随机产生一个 0-1之间的浮点数
		float random = (float) Math.random();
		// 设置涨跌规则
		if (random / 2 (TOPIC, null,
						quotationinfo.getTradeTime(),
						quotationinfo.getStockCode(), quotationinfo.toString());
				executor.submit(new KafkaProducerThread(producer, record));
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			producer.close();
			executor.shutdown();
		}

	}

	public KafkaProducerThread(KafkaProducer producer,
			ProducerRecord record) {
		this.producer = producer;
		this.record = record;
	}

}

  

package com.kafka.action.chapter6.dto;

import java.io.Serializable;

public class StockQuotationinfo implements Serializable{

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	public StockQuotationinfo() {
		super();
	}
	//股票代码
	private String stockCode ;
	//股票名称
	private String stockName ;
	@Override
	public String toString() {
		return "StockQuotationinfo [stockCode=" + stockCode + ", stockName="
				+ stockName + ", tradeTime=" + tradeTime + ", preClosePrice="
				+ preClosePrice + ", openPrice=" + openPrice
				+ ", currentPrice=" + currentPrice + ", highPrice=" + highPrice
				+ ", lowPrice=" + lowPrice + "]";
	}
	//交易时间
	private long tradeTime;
	//昨日收盘价
	private float preClosePrice;
	//开盘价
	private float openPrice ;
	//当前价,收盘时即为当日收盘价
	private float currentPrice ;
	//今日最高
	private float highPrice;
	//今日最低
	private float lowPrice;
	public StockQuotationinfo(String stockCode, String stockName,
			long tradeTime, float preClosePrice, float openPrice,
			float currentPrice, float highPrice, float lowPrice) {
		super();
		this.stockCode = stockCode;
		this.stockName = stockName;
		this.tradeTime = tradeTime;
		this.preClosePrice = preClosePrice;
		this.openPrice = openPrice;
		this.currentPrice = currentPrice;
		this.highPrice = highPrice;
		this.lowPrice = lowPrice;
	}
	public String getStockCode() {
		return stockCode;
	}
	public void setStockCode(String stockCode) {
		this.stockCode = stockCode;
	}
	public String getStockName() {
		return stockName;
	}
	public void setStockName(String stockName) {
		this.stockName = stockName;
	}
	public long getTradeTime() {
		return tradeTime;
	}
	public void setTradeTime(long tradeTime) {
		this.tradeTime = tradeTime;
	}
	public float getPreClosePrice() {
		return preClosePrice;
	}
	public void setPreClosePrice(float preClosePrice) {
		this.preClosePrice = preClosePrice;
	}
	public float getOpenPrice() {
		return openPrice;
	}
	public void setOpenPrice(float openPrice) {
		this.openPrice = openPrice;
	}
	public float getCurrentPrice() {
		return currentPrice;
	}
	public void setCurrentPrice(float currentPrice) {
		this.currentPrice = currentPrice;
	}
	public float getHighPrice() {
		return highPrice;
	}
	public void setHighPrice(float highPrice) {
		this.highPrice = highPrice;
	}
	public float getLowPrice() {
		return lowPrice;
	}
	public void setLowPrice(float lowPrice) {
		this.lowPrice = lowPrice;
	}
	public static long getSerialversionuid() {
		return serialVersionUID;
	}


}

 

消费者

package demo2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;


public class MyKafkaConsumer {


       /**
     * 自动提交offset
     */
    @Test
    public void comsumeMsgAutoCommit() {

        Properties props = new Properties();
        props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT);
        props.put("group.id", Constants.GROUP_ID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

		KafkaConsumer consumer = new KafkaConsumer(props);

        consumer.subscribe(Arrays.asList(Constants.MY_TOPIC));
        
        while (true) {
            ConsumerRecords records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            sleep(1);
        }
    }

    /**
     * 手动提交offset
     */
    @Test
    public void consumerMsgManualCommit() {
        Properties props = new Properties();
        props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT);
        props.put("group.id", Constants.GROUP_ID);
        props.put("max.poll.records", 10); 
        props.put("auto.offset.reset", "earliest"); 
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList(Constants.MY_TOPIC));
        final int minBatchSize = 100;
        List> buffer = new ArrayList();
        while (true) {
            ConsumerRecords records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                buffer.add(record);
                System.out.println(buffer.size());
            }
            if (buffer.size() >= minBatchSize) {
            	System.out.println("进入手动提交offset");
                insertIntoDb(buffer);
                consumer.commitSync();
                buffer.clear();
            }
            

        }
    }

    private void insertIntoDb(List> buffer) {
        for (ConsumerRecord record : buffer) {
            System.out.printf("insertIntoDb:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }


    private void sleep(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  

package demo2;

import java.net.InetAddress;

public class Constants {

    final static String GROUP_ID = "test_group";
    final static String MY_TOPIC = "test";
    final static String KAFKA_SERVER_ADRESS = "192.168.1.106";
    final static int KAFKA_SERVER_PORT = 9092;

	
}

  

 

kafka(3)API使用

标签:规则   manual   技术   ted   代理信息   执行   ada   interrupt   final   

原文地址:https://www.cnblogs.com/liclBlog/p/9613421.html

上一篇:第一个C#程序

下一篇:win10安装JDK


评论


亲,登录后才可以留言!