【记录】【springboot】【kafka】【KafkaStreams】报错Use a different TimestampExtractor to process this data
2021-06-08 03:05
标签:row ica tco data bean input producer streams rate 问题:springboot集成kafka,并由KafkaStreams处理,启动报错 org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = crawler_events, partition = 0, offset = 0, CreateTime = -1, serialized key size = -1, serialized value size = 187, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {XXX}) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data. 解决:去掉@EnableKafkaStreams,加入bean,和相关类 参考文章:http://www.voidcn.com/article/p-fepwxwfx-bmq.html https://blog.csdn.net/dcm19920115/article/details/93386750 【记录】【springboot】【kafka】【KafkaStreams】报错Use a different TimestampExtractor to process this data 标签:row ica tco data bean input producer streams rate 原文地址:https://www.cnblogs.com/xiaostudy/p/14538223.html@Bean(
name = {"defaultKafkaStreamsBuilder"}
)
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(@Qualifier("defaultKafkaStreamsConfig") ObjectProvider
package com.bda.clientportait.strategy.common.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
/**
* TODO
*
* @author liwei
* @version 1.0
* @date 2021/3/15 15:25
* @className com.bda.clientportait.strategy.common.kafka.MyEventTimeExtractor
**/
public class MyEventTimeExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord
上一篇:Java对象与Map的互转
下一篇:webpack学习笔记一
文章标题:【记录】【springboot】【kafka】【KafkaStreams】报错Use a different TimestampExtractor to process this data
文章链接:http://soscw.com/index.php/essay/92036.html