python 实现 kakfa 的 生产消费模式 和 发布订阅模式
2021-05-28 05:03
标签:boot json time serial str end producer key receive 消费者 consumer_demo2.py python 实现 kakfa 的 生产消费模式 和 发布订阅模式 标签:boot json time serial str end producer key receive 原文地址:https://www.cnblogs.com/wjq310/p/python_kafka.htmlfrom kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
def producer_demo():
# 假设生产的消息为键值对(不是一定要键值p对),且序列化方式为json
producer = KafkaProducer(
bootstrap_servers=[‘localhost:9092‘],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode())
# 发送三条消息
for i in range(0, 3):
future = producer.send(
‘kafka_demo‘,
key=‘count_num‘, # 同一个key值,会被送至同一个分区
value=str(i),
partition=0) # 向分区1发送消息
print("send {}".format(str(i)))
try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
if __name__ == "__main__":
producer_demo()
from kafka import KafkaConsumer
import json
def consumer_demo():
consumer = KafkaConsumer(
‘kafka_demo‘,
bootstrap_servers=[‘localhost:9092‘],
group_id=‘test‘
)
for message in consumer:
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)
if __name__ == "__main__":
consumer_demo()
from kafka import KafkaConsumer
import json
def consumer_demo():
consumer = KafkaConsumer(
‘kafka_demo‘,
bootstrap_servers=[‘localhost:9092‘],
group_id=‘test‘
)
for message in consumer:
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)
if __name__ == "__main__":
consumer_demo()
上一篇:Java中内部类的使用总结
下一篇:5.19Java装饰器设计模式
文章标题:python 实现 kakfa 的 生产消费模式 和 发布订阅模式
文章链接:http://soscw.com/essay/88531.html