springboot kafka发送消息支持成功失败通知

2021-05-06 01:27

阅读:773

标签:tms   cti   list   gen   cep   llb   span   get   public   

springboot集成kafka是比较简单的是事情,但是kafka发送消息的失败回调在日常工作中,如果不容忍消息丢失的话,发送失败需要再次发送或者放到数据库中用任务重推。
以下是演示用的发送类代码

@Slf4j
@Component
public class TestRunner implements ApplicationRunner {
    @Autowired
    KafkaTemplate kafkaTemplate;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        KafkaMsgEntity kafkaMsgEntity = new KafkaMsgEntity();
        kafkaMsgEntity.setActionName("login");
        String tmpStr = "id:%d,msg:login";
        for (int i = 1; i ) {
            String tmpStr1 = tmpStr.replace("%d", String.valueOf(i));
            Thread.sleep(500);
            kafkaMsgEntity.setMsgBody(tmpStr1);
            kafkaTemplate.send("test", JSON.toJSONString(kafkaMsgEntity)).addCallback(new ListenableFutureCallback>() {
                @Override
                public void onFailure(Throwable throwable) {
                    if (throwable instanceof KafkaProducerException) {
                        String value = (String) ((KafkaProducerException) throwable).getProducerRecord().value();
                        log.info("{} get throwable msg:{}", value, throwable.getMessage());
                    } else {
                        log.info("get throwable msg:{}", throwable.getMessage());
                    }
                }

                @Override
                public void onSuccess(SendResult o) {
                    log.info("{}, success", o.getProducerRecord().value());
                }
            });
        }
    }
}

在kafka运行过程中kill进程达到异常发送的条件。

 

技术图片

 

springboot kafka发送消息支持成功失败通知

标签:tms   cti   list   gen   cep   llb   span   get   public   

原文地址:https://www.cnblogs.com/gavinjunftd/p/13191166.html


评论


亲,登录后才可以留言!