Debuting a Modern C++ API for Apache Kafka
2021-05-29 07:02
标签:follow loading set byte efi wrapper aced resources port Morgan Stanley uses Apache Kafka? to publish market data to internal clients and to persist it for replay purposes. We started out using First, let’s take a look at an example of the This program configures a Kafka producer, sends user-specified messages using the producer, and then waits until all messages are delivered or a timeout occurs. Finally, it closes the producer. Although this works, it doesn’t take advantage of modern C++ features: Modern C++ features allow us to increase performance and usability, such as the following: Let’s dive into the modern C++ API that we built: Reimplement the previously shown functionality but using the There are several key differences: But this isn’t perfect yet! The synchronous nature prevents us from sending multiple messages concurrently, and a slower network will quickly degrade the performance of our application. This brings us to the asynchronous producer: With the asynchronous producer, we can have multiple messages in flight. We no longer have to derive a new class from a library-defined callback type: a regular Lambda can be used instead, improving readability and making the code more concise. The callback now takes Unfortunately, now we have to copy the message. Let’s fix that: Now the message is owned by a smart pointer that gets captured by the Lambda/callback that gets invoked after the message is delivered (or after an error occurs). Therefore, the message is kept alive as long as it is needed. So far, we managed to send some messages. Let’s see if we can consume them! This example initializes a An interesting detail of this consumer is the scheduling of commits, that is, when the consumer signals to the broker that a given message was successfully consumed. To get more control over the scheduling of commits, In this example, a manual commit happens roughly once per second. The examples above provide an overview of the Let’s summarize the basic concepts: There are three Kafka clients: The We are actively maintaining and improving the project. For example, the Debuting a Modern C++ API for Apache Kafka 标签:follow loading set byte efi wrapper aced resources port 原文地址:https://www.cnblogs.com/felixzh/p/14771431.htmllibrdkafka
’s C++ API, which maintains C++98 compatibility. C++ is evolving quickly, and we wanted to break away from this compatibility requirement so we could take advantage of new C++ features. This led us to create a new C++ API for Kafka that uses modern C++ features (i.e. C++14 and later). We’ve open sourced this client and hope you enjoy it.An example producer from
librdkafka
librdkafka
project, slightly stripped for brevity:// https://github.com/edenhill/librdkafka/blob/master/examples/producer.cpp
#include "librdkafka/rdkafkacpp.h"
int main (int argc, char **argv) {
if (argc != 3) {
std::cerr "Usage: " 0] "
GOTO
, instead of continuations popularized by AsioRdKafka::DeliveryReportCb
, where a Lambda would have been enough
A modern Kafka producer
modern-cpp-kafka
, available on GitHub.modern-cpp-kafka
API. First, let’s use the synchronous producer:// https://github.com/morganstanley/modern-cpp-kafka/blob/main/examples/kafka_sync_producer.cc
#include "kafka/KafkaProducer.h"
#include
Properties
instance has enable.idempotence=true
configured, thus the producer will ensure that messages are successfully sent exactly once and in the original order// Create a producer instance.
kafka::KafkaAsyncProducer producer(props);
// Read messages from stdin and produce to the broker.
std::cout "% Type message value and hit enter to produce message. (empty line to quit)" std::endl;
for (std::string line; std::getline(std::cin, line);) {
// The ProducerRecord doesn‘t own `line`, it is just a thin wrapper
auto record = kafka::ProducerRecord(topic,
kafka::NullKey,
kafka::Value(line.c_str(), line.size()));
// Send the message.
producer.send(record,
// The delivery report handler
[](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
if (!ec)
std::cout "% Message delivered: " std::endl;
else
std::cerr "% Message delivery failed: " std::endl;
},
// The memory block given by record.value() will be copied
kafka::KafkaProducer::SendOption::ToCopyRecordValue);
if (line.empty()) break;
};
std::error_code
instead of RdKafka::ErrorCode
, a more intuitive choice for modern C++ applications.producer.send(...)
will keep waiting if the internal queue is full (on ERR__QUEUE_FULL
), but only as long as one message is either delivered or a timeout occurs, freeing up space in the internal queue. for (auto line = std::make_shared<:>string>();
std::getline(std::cin, *line);
line = std::make_shared<:>string>())
{
// The ProducerRecord doesn‘t own `line`, it is just a thin wrapper
auto record = kafka::ProducerRecord(topic,
kafka::NullKey,
kafka::Value(line->c_str(), line->size()));
// Send the message.
producer.send(record,
// The delivery report handler
// Note: Here we capture the shared_pointer of `line`,
// which holds the content for `record.value()`.
// It makes sure the memory block is valid until the lambda finishes.
[line](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
if (!ec)
std::cout "% Message delivered: " std::endl;
else
std::cerr "% Message delivery failed: " std::endl; });
if (line->empty()) break;
};
A modern Kafka consumer
KafkaAutoCommitConsumer
is the simplest of all:// https://github.com/morganstanley/modern-cpp-kafka/blob/main/examples/kafka_auto_commit_consumer.cc
#include "kafka/KafkaConsumer.h"
#include
KafkaAutoCommitConsumer
, subscribes to a given topic, and consumes messages until it receives an empty one. As expected, the destructor of the consumer properly cleans up its resources.KafkaAutoCommitConsumer
commits its position before each poll (not after the poll), effectively acknowledging the messages received during the previous poll. This ensures that even if the consumer crashes, unprocessed messages will not be acknowledged (assuming processing atomically completes between polls).KafkaManualCommitConsumer
can be used: // Create a consumer instance.
kafka::KafkaManualCommitConsumer consumer(props);
// Subscribe to topics
consumer.subscribe({topic});
auto lastTimeCommitted = std::chrono::steady_clock::now();
// Read messages from the topic.
std::cout "% Reading messages from topic: " std::endl;
bool allCommitted = true;
bool running = true;
while (running) {
auto records = consumer.poll(std::chrono::milliseconds(100));
for (const auto& record: records) {
// In this example, quit on empty message
if (record.value().size() == 0) {
running = false;
break;
}
if (!record.error()) {
std::cout "% Got a new message..." std::endl;
std::cout " Topic : " std::endl;
std::cout " Partition: " std::endl;
std::cout " Offset : " std::endl;
std::cout " Timestamp: " std::endl;
std::cout " Headers : " std::endl;
std::cout " Key [" "]" std::endl;
std::cout " Value [" "]" std::endl;
allCommitted = false;
} else {
// No special handling is required,
// since the consumer will attempt to auto-recover.
std::cerr std::endl;
}
}
if (!allCommitted) {
auto now = std::chrono::steady_clock::now();
if (now - lastTimeCommitted > std::chrono::seconds(1)) {
// Commit offsets for messages polled
std::cout "% syncCommit offsets: " std::endl;
consumer.commitSync(); // or commitAsync()
lastTimeCommitted = now;
allCommitted = true;
}
}
}
commitSync
waits for commit acknowledgement, while commitAsync
does not.Summary of
modern-cpp-kafka
and basic conceptsmodern-cpp-kafka
API.KafkaProducer
, KafkaConsumer
, and AdminClient
(not shown in this article).
KafkaProducer
ProducerRecord
: The “message type” for a KafkaProducer
to send, constructed with Topic
, Partition
, Key
, Value
, and Headers
.Producer::Callback
: The callback method used to provide asynchronous handling of request completion. This method will be called when the record sent to the server has been acknowledged.KafkaAsyncProducer
: Publishes records to the Kafka cluster asynchronously. Each send
operation requires a per-message Producer::Callback
.KafkaSyncProducer
: Publishes records to the Kafka cluster synchronously. The send
operation does not return until the delivery is completed.Producer::RecordMetadata
: The metadata for a record that has been acknowledged by the server. It contains Topic
, Partitions
, Offset
, KeySize
, ValueSize
, Timestamp
, and PersistedStatus
. A KafkaAsyncProducer
passes this metadata as an input parameter of the Producer::Callback
. KafkaSyncProducer
returns the metadata with the synchronized send
method.KafkaConsumer
ConsumerRecord
: The message type returned by a KafkaConsumer
instance. It contains Topic
, Partition
, Offset
, Key
, Value
, Timestamp
, and Headers
.KafkaAutoCommitConsumer
: Automatically commits previously polled offsets on each poll
operation.KafkaManualCommitConsumer
: Provides manual commitAsync
and commitSync
methods to acknowledge messages.AdminClient
: The administrative client for Kafka that supports managing and inspecting topics. Examples can be found on GitHub.Conclusion
modern-cpp-kafka
is a header-only C++ library that uses idiomatic C++ features to provide a safe, efficient, and easy way of producing and consuming Kafka messages.modern-cpp-kafka
project on GitHub has been thoroughly tested within Morgan Stanley. After we replaced a legacy implementation with it, throughput for a key middleware system improved by 26%.transactional
interface is on the way and new components, such as streamer
and connector
, are also on the roadmap. If you’re interested in contributing, we’d be very happy to have you involved in the project, whether it’s raising an issue or submitting a PR.
上一篇:/usr/lib64/python2.6/site-packages/pycurl.so: undefined symbol: CRYPTO_set_locking_callback
下一篇:链表相交 三种思路 C++ 代码
文章标题:Debuting a Modern C++ API for Apache Kafka
文章链接:http://soscw.com/index.php/essay/89000.html