kafka(3)API使用
2021-07-12 03:07
标签:规则 manual 技术 ted 代理信息 执行 ada interrupt final 相关依赖 一个简单的Kafka生产者一般步骤如下: 创建 Properties 对象,设置生产者级别配置。以下3个配置是必须指定的。 (1) bootstrap.servers 配置连接 Kafka 代理列表,不必包含 Kafka 集群所有的代理地址,当 连接上一个代理后,会从集群元数据信息中获取其他存活的代理信息。但为了保证能 够成功连上 Kafka 集群 在多代理集群的情况下建议至少配置两个代理。 key.serializer :配置用于序列化消息 Key 的类。 value.serializer :配置用于序列化消息实际数据的类。 (2)根据 Properties 对象实例化一个 KafkaProducer 对象。 (3)实例化 ProducerRecord 对象, 每条消息对应一个 ProducerRecord 对象。 (4)调用 KafkaProducer 发送消息的方法将 ProducerRecord 发送到 Kafka 相应节点。 Kafka提供了两个发送消息的方法,即 send(ProducerRecord
消费者 kafka(3)API使用 标签:规则 manual 技术 ted 代理信息 执行 ada interrupt final 原文地址:https://www.cnblogs.com/liclBlog/p/9613421.html
kafka_2.11
kafka-clients
package com.kafka.action.chapter6.producer;
import java.text.DecimalFormat;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import com.kafka.action.chapter6.dto.StockQuotationinfo;
/**
*
* @Title: QuotationProducer.java
* @Package com.kafka.action.chapter6.producer
* @Description: 单线程生产者
* @author licl
* @date 2018年9月9日
*/
public class QuotationProducer {
// 设置实例生产消息的总数
private static final int MSG_SIZE = 100;
// 主题名称
private static final String TOPIC = "test";
// kafka集群
private static final String BROKER_LIST = "192.168.1.106:9092";
private static KafkaProducer
package com.kafka.action.chapter6.producer;
import java.text.DecimalFormat;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
*
* @Title: QuotationProducer.java
* @Package com.kafka.action.chapter6.producer
* @Description: 多线程生产者
* @date 2018年9月9日
*/
import com.kafka.action.chapter6.dto.StockQuotationinfo;
public class KafkaProducerThread implements Runnable {
// 设置实例生产消息的总数
private static final int MSG_SIZE = 100;
private static final String TOPIC = "test";
private KafkaProducer
package com.kafka.action.chapter6.dto;
import java.io.Serializable;
public class StockQuotationinfo implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
public StockQuotationinfo() {
super();
}
//股票代码
private String stockCode ;
//股票名称
private String stockName ;
@Override
public String toString() {
return "StockQuotationinfo [stockCode=" + stockCode + ", stockName="
+ stockName + ", tradeTime=" + tradeTime + ", preClosePrice="
+ preClosePrice + ", openPrice=" + openPrice
+ ", currentPrice=" + currentPrice + ", highPrice=" + highPrice
+ ", lowPrice=" + lowPrice + "]";
}
//交易时间
private long tradeTime;
//昨日收盘价
private float preClosePrice;
//开盘价
private float openPrice ;
//当前价,收盘时即为当日收盘价
private float currentPrice ;
//今日最高
private float highPrice;
//今日最低
private float lowPrice;
public StockQuotationinfo(String stockCode, String stockName,
long tradeTime, float preClosePrice, float openPrice,
float currentPrice, float highPrice, float lowPrice) {
super();
this.stockCode = stockCode;
this.stockName = stockName;
this.tradeTime = tradeTime;
this.preClosePrice = preClosePrice;
this.openPrice = openPrice;
this.currentPrice = currentPrice;
this.highPrice = highPrice;
this.lowPrice = lowPrice;
}
public String getStockCode() {
return stockCode;
}
public void setStockCode(String stockCode) {
this.stockCode = stockCode;
}
public String getStockName() {
return stockName;
}
public void setStockName(String stockName) {
this.stockName = stockName;
}
public long getTradeTime() {
return tradeTime;
}
public void setTradeTime(long tradeTime) {
this.tradeTime = tradeTime;
}
public float getPreClosePrice() {
return preClosePrice;
}
public void setPreClosePrice(float preClosePrice) {
this.preClosePrice = preClosePrice;
}
public float getOpenPrice() {
return openPrice;
}
public void setOpenPrice(float openPrice) {
this.openPrice = openPrice;
}
public float getCurrentPrice() {
return currentPrice;
}
public void setCurrentPrice(float currentPrice) {
this.currentPrice = currentPrice;
}
public float getHighPrice() {
return highPrice;
}
public void setHighPrice(float highPrice) {
this.highPrice = highPrice;
}
public float getLowPrice() {
return lowPrice;
}
public void setLowPrice(float lowPrice) {
this.lowPrice = lowPrice;
}
public static long getSerialversionuid() {
return serialVersionUID;
}
}
package demo2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
public class MyKafkaConsumer {
/**
* 自动提交offset
*/
@Test
public void comsumeMsgAutoCommit() {
Properties props = new Properties();
props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT);
props.put("group.id", Constants.GROUP_ID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer
package demo2;
import java.net.InetAddress;
public class Constants {
final static String GROUP_ID = "test_group";
final static String MY_TOPIC = "test";
final static String KAFKA_SERVER_ADRESS = "192.168.1.106";
final static int KAFKA_SERVER_PORT = 9092;
}
上一篇:第一个C#程序
下一篇:win10安装JDK