Spring cloud kafka stream pitfalls
2021-01-04 11:27
                         标签:whether   except   message   att   interval   art   obj   reference   current    If kafka broker is not started,  surprisingly , even if spring cloud kafka stream fails to create a kafka producer, it will not fail fast and application will startup as normal and only an exception appears on console.   See org.springframework.cloud.stream.binding.BindingService.doBindProducer As the code snippet shows, BindingService will start a scheduler to retry creating binder at bindingRetryInterval.  But what if we start to send a kafka message to when late binding is not resolved yet?The “channel.send” method will immediately throw an exception from  “UnicastDispatcher.doDispatch”, complaining that no messageHandler is registered from the dispatcher.   channel.send will block for 60 seconds and returns, no exception will be thrown and the method returns true! 60 seconds is from standard kafka producer config: max.block.ms. As in official kafka documentation: We know that the default KafkaProducer is asynchronous by itself,  the send method returns immediately regardless of whether the message is successfully sent or not, The send method can optionally take a callback parameter that allow users to handle unsuccessful delivery attempt. Spring KafkaTemplate wraps kafkaProducer’s callback with a producerListener.  ProducerListener’s onError method can be used to handle delivery error.  Eg.   If a ProducerListener is registered in applicationContext, it will be automatically injected into in KafkaMessageChannelBinder in KafkaBinderConfiguration Key configurations: StreamListener in same JVM:    So what if an exception is thrown from streamListener? The exception is passed from listener → InvocableHandlerMethod → StreamListenerMessageHandler →  AbstractMessageHandler, and finally processed by UnicastingDispatcher.handleExceptions method.  Since StreamListenerMessageHandler is the first and failover by default is true, the exception is silently ignored! After that, the message is sent to kafka as per normal.  StreamListener in different JVM: Internally, kafka message is received by KafkaMessageListenerContainer$ListenerConsumer.run and handled by doInvokeRecordListener method.  The method catches consumer exception and can handle it with a customized handler.  Another interesting point is that if processing message 1 throws an exception and processing message 2 works fine, commit offset will be updated to message 2.  Consumer offset commit:   See KafkaMessageChannelBinder.createConsumerEndPoint for details. Spring cloud kafka stream supports two different modes to commit consumer offset.  In SyncMode: (ackMode=RECORD) KafkaMessageListenerContainer’s  will invoke ackCurrent method immediately after doInvokeOnMessage method, which sends offset to commit to broker after processing of each kafka record.  In AsyncMode (ackMode=BATCH)  KafkaMessageListenerContainer’s  will add current consumer record to a acks  blockingQueue and batch commit offsets at the beginning of pollAndInvoke loop if accumulated records waiting to ack exceed threshold.  this strategy has a performance advantage since it interact less with  kafka broker. KafkaBinderConfiguration declared in spring.binders file in  spring-cloud-stream-binder-kafka.jar will create an instance of KafkaMessageChannelBinder.  InputBindingLifecycle.start →  https://programmer.help/blogs/spring-cloud-stream-exception-handling.html Spring cloud kafka stream pitfalls 标签:whether   except   message   att   interval   art   obj   reference   current    原文地址:https://blog.51cto.com/shadowisper/2499412public 
The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods can be blocked either because the buffer is full or metadata unavailable.   @Configuration
class KafkaProducerMonitorConfig {
    @Bean
    fun producerListener(criticalTopics: SetConsumer behavior:
spring.cloud.stream.bindings.[name].consumer.concurrency:  determines number of kafka consumers created (ConcurrentMessageListenerContainer)
It is worth noticing that if you have a producer message channel as well as a streamListener for the same message channel in same JVM, the streamListener will not even read message from kafka. It is directly passed to the streamListener in UnicastingDispatcher.dispatch method.  You will see that there are actually two messageHandlers registered with UnicastingDispatcher.  The first one is StreamListenerMessageHandler, which internally calls streamListener. The second one is AbstractMessageChannelBinder$SendingHandler, which send message to kafka.if (isLast || !this.failover) {
            if (allExceptions != null && allExceptions.size() == 1) {
                throw allExceptions.get(0);
            }
            throw new AggregateMessageDeliveryException(message, //NOSONAR - false positive
                    "All attempts to deliver Message to MessageHandlers failed.", allExceptions);
        }
By default, if streamListener throws an exception on processing a message,  the message will be reconsumed and retried for 3 times. After that, next message will be processed.  Internally, RetryingMessageListenerAdapter constructs a RetryTemplate and retries the message. However, due to the exception, message offset will not be commited to broker. That implies that if the JVM is restarted or another consumer joins the cluster and take over some partitions, it will start from earlier offset before the error.  The following configurations are related to retrying. spring:
  cloud:
    stream:
      bindings:
        :
          consumer:
            # Up to a few attempts, default 3
            maxAttempts: 3
            # Initial backoff interval at retry, unit milliseconds, default 1000
            backOffInitialInterval: 1000
            # Maximum backoff interval, unit milliseconds, default 10000
            backOffMaxInterval: 10000
            # Backoff multiplier, default 2.0
            backOffMultiplier: 2.0
            # Whether to retry when listen throws an exception not listed in retryableExceptions
            defaultRetryable: true
            # Are exceptions allowed to retry map mappings
            retryableExceptions:
              java.lang.RuntimeException: true
              java.lang.IllegalStateException: false
Spring kafka supports 7 different types of offset commit mode:  RECORD,   BATCH,    TIME,    COUNT,   COUNT_TIME,   MANUAL,  MANUAL_IMMEDIATE. While only a subset of modes (BATCH (default),  RECORD, MANUAL) are supported by spring cloud kafka stream.  It does not allow user to config ackMode directly, or rather the ackMode is set based on the combination of some other properties.  ackMode will be set to RECORD if configuration properties spring.cloud.stream.kafka.bindings.[inputname].consumer.ackEachRecord = true.  ackMode will be set to MANUAL if spring.cloud.stream.kafka.bindings.[inputname].consumer.autoCommitOffset = false and you have to manually acknowledge the message in streamListener. @StreamListener(Sink.INPUT)
      public void process(Message> message) {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
           acknowledgment.acknowledge();
        }
      }  Bootstrap process:
BindableProxyFactory.createAndBindInputs → 
BindingService.bindConsumer (create consumerProperties by BindingServiceProperties.getConsumerProperties) Reference:
文章标题:Spring cloud kafka stream pitfalls
文章链接:http://soscw.com/index.php/essay/40030.html