python生产消费Kafka
2021-04-25 01:26
标签:imp nis toc trap server res return api rom python生产消费Kafka主要是跟KafkaConsumer和KafkaProducer两个类打交道. python生产消费Kafka 标签:imp nis toc trap server res return api rom 原文地址:https://www.cnblogs.com/wangbin2188/p/13260526.htmlfrom kafka import KafkaProducer, KafkaConsumer
class PythonKafka(object):
def __init__(self, topic=None, username=None, password=None):
self.topic = topic
self.username = username
self.password = password
self.count = 0
self.error_count = 0
def on_send_success(self, *args, **kwargs):
self.count = self.count + 1
def on_send_error(self, *args, **kwargs):
self.error_count = self.error_count + 1
def get_kafka_producer(self):
kafka_producer = KafkaProducer(bootstrap_servers=[‘ip1:port‘, ‘ip2:port‘, ‘ip3:port‘],
security_protocol=‘SASL_PLAINTEXT‘,
sasl_mechanism=‘PLAIN‘,
sasl_plain_username=‘username‘,
sasl_plain_password=‘password‘,
request_timeout_ms=40000,
api_version=(0, 10, 1))
return kafka_producer
def get_kafka_consumer(self):
# 指定超时时间,不然会一直阻塞
consumer_params = {
‘security_protocol‘: ‘SASL_PLAINTEXT‘,
‘sasl_mechanism‘: ‘PLAIN‘,
‘group_id‘: ‘test_group2‘,
‘sasl_plain_username‘: ‘username‘,
‘sasl_plain_password‘: ‘password‘,
‘auto_offset_reset‘: ‘earliest‘,
‘api_version‘: (0, 10),
‘consumer_timeout_ms‘: 10000
}
return KafkaConsumer(self.topic,
bootstrap_servers=[‘ip1:port‘, ‘ip2:port‘, ‘ip3:port‘],
**consumer_params)
# 异步发送
def send_data(self, json_str):
json_bytes = str.encode(json_str)
producer = self.get_kafka_producer()
producer.send(self.topic, value=json_bytes).add_callback(self.on_send_success).add_errback(
self.on_send_error)
producer.flush()
producer.close()
# 接收数据
def receive_data(self):
consumer = self.get_kafka_consumer()
for msg in consumer:
msg_str = str(msg.value, encoding=‘utf8‘)
print(msg_str)
consumer.close()