nodejs中的kafkajs,消费顺序,不重复消费
2021-03-06 09:28
标签:http roc ejs function group write producer 参考 span 2.producer.js 3.consumer.js 经实际测试,没有发现消费问题。如有发现问题,请多多指教,谢谢。。。 nodejs中的kafkajs,消费顺序,不重复消费 标签:http roc ejs function group write producer 参考 span 原文地址:https://www.cnblogs.com/qiyc/p/12898107.html参考:https://kafka.js.org/docs
1.封装kafkaUtil类
const {
Kafka,
logLevel
} = require(‘kafkajs‘)
//const cache = require(‘../conn/redis.js‘);
const kafka = new Kafka({
clientId: ‘my-app‘,
brokers: [
"lcoalhost:8092",
"localhost:8093",
"localhost:8094",
"lcoalhost:8095",
"localhost:8096",
],
retry: {
retries: 8
},
logLevel: logLevel.ERROR
})
/**
* 如果groupId已存在重复的,建立不同的kafka实例会报错
*/
/**
* kafka生产者发送消息
* messages: [{
value: ‘Hello KafkaJS user!‘,
}, {
value: ‘Hello KafkaJS user2!‘,
}],
*/
exports.producer = async (topic, groupId, msg) => {
try {
const producer = kafka.producer({
groupId: groupId
})
await producer.connect()
await producer.send({
topic: topic,
messages: msg,
acks: 1
})
} catch (error) {
throw error;
}
}
exports.consumer = async (topic, groupId, callback) => {
try {
const consumer = kafka.consumer({
groupId: groupId
})
await consumer.connect()
await consumer.subscribe({
topic: topic
})
await consumer.run({
autoCommit: true,
eachMessage: async ({
topic,
partition,
message
}) => {
//防止重复消费数据
await consumer.commitOffsets([{
topic: topic,
partition: partition,
offset: Number(message.offset) + 1
}])
let msg = message.value.toString()
console.log(72, ‘消费者接收到的数据为:‘, msg);
callback(msg);
}
})
} catch (err) {
throw err;
}
}
const kafka = require(‘./kafkaUtil‘);
(async function () {
const topic = ‘MY——TOPIC1‘
const groupId = ‘MY——TOPIC1‘
try {
for (let i = 0; i {
setTimeout(async () => {
resolve(1)
}, 1000)
}).then(async () => {
console.log(‘发送的数据为:‘, i)
await kafka.producer(topic, groupId, [{
key: "a",//key值为了保证消费者按照生产者生产的数据顺序,消费数据,key值必须一致;如果不需要消费者按照生产的顺序消费,key去掉即可,参考: https://www.zhihu.com/question/266390197
value: `${i}`
}])
})
}
} catch (error) {
console.log(14, error)
throw error;
}
})()
const kafka = require(‘./kafkaUtil‘);
(async function () {
const fs = require(‘fs‘);
let count = 1;
const topic = ‘MY——TOPIC1‘
const groupId = ‘MY——TOPIC1‘
try {
await kafka.consumer(topic, groupId, async (msg) => {
let str = `第${count}接收到的数据为:${msg}`;
count++;
fs.writeFileSync(`${process.cwd()}/test01.txt`, str, {
flag: ‘a‘,
})
console.log(str)
})
} catch (error) {
console.log(14, error)
throw error;
}
})()
上一篇:js模拟select下拉菜单
下一篇:Django 报错 Refused to display 'http://127.0.0.1:8000/ in a frame because it set 'X-Fr