python 实现 kakfa 的 生产消费模式 和 发布订阅模式

2021-05-28 05:03

阅读:527

标签:boot   json   time   serial   str   end   producer   key   receive   

python 实现 kakfa 的 生产消费模式 和 发布订阅模式(已安装好 kafka 的情况下)
生产者 producer_demo.py
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
def producer_demo():
    # 假设生产的消息为键值对(不是一定要键值p对),且序列化方式为json
    producer = KafkaProducer(
        bootstrap_servers=[localhost:9092],
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode())
    # 发送三条消息
    for i in range(0, 3):
        future = producer.send(
            kafka_demo,
            key=count_num,  # 同一个key值,会被送至同一个分区
            value=str(i),
            partition=0)  # 向分区1发送消息
        print("send {}".format(str(i)))
        try:
            future.get(timeout=10) # 监控是否发送成功
        except kafka_errors:  # 发送失败抛出kafka_errors
            traceback.format_exc()
if __name__ == "__main__":
    producer_demo()
 
消费者 consumer_demo1.py
from kafka import KafkaConsumer
import json
def consumer_demo():
    consumer = KafkaConsumer(
        kafka_demo,
        bootstrap_servers=[localhost:9092],
        group_id=test
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
            )
        )
if __name__ == "__main__":
    consumer_demo()

 

消费者 consumer_demo2.py

from kafka import KafkaConsumer
import json
def consumer_demo():
    consumer = KafkaConsumer(
        kafka_demo,
        bootstrap_servers=[localhost:9092],
        group_id=test
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
            )
        )
if __name__ == "__main__":
    consumer_demo()

 

总结 : kafka 的生产和消费文件的 topic 都是 kafka_demo , 消费者 consumer_demo1.py 和consumer_demo2.py 文件中,
如果 group_id 都为 test 的话,则为 生产消费模式,两个消费者只有一个会消费 topic 的消息;
如果 group_id 不都为 test 的话,则为 发布订阅模式,两个消费者都会消费 topic 的消息。

 
 

 

python 实现 kakfa 的 生产消费模式 和 发布订阅模式

标签:boot   json   time   serial   str   end   producer   key   receive   

原文地址:https://www.cnblogs.com/wjq310/p/python_kafka.html


评论


亲,登录后才可以留言!