kafka flumn sparkstreaming java实现监听文件夹内容保存到Phoenix中

2020-12-13 03:01

阅读:360

       // broker地址,可写多个“,”分隔

String brokers = "master:9092";

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("streamingKafka");

JavaSparkContext sc = new JavaSparkContext(conf);

sc.setLogLevel("WARN");

JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(10));

// kafka相关参数,必要!缺了会报错

MapString, Object> kafkaParams = new HashMap();

kafkaParams.put("bootstrap.servers", brokers);

kafkaParams.put("key.deserializer", StringDeserializer.class);

kafkaParams.put("value.deserializer", StringDeserializer.class);

kafkaParams.put("group.id", "cloudera_mirrormaker");

// Topic分区 也可以通过配置项实现

// 如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性

// earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

// latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

// none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

kafkaParams.put("auto.offset.reset", "latest");

// 如果是使用spark-streaming-kafka-0-10,那么我们建议将enable.auto.commit设为false。

// 这个配置只是在这个版本生效,enable.auto.commit如果设为true的话,那么意味着offsets会按照auto.commit.interval.ms中所配置的间隔来周期性自动提交到Kafka中。

// 在Spark Streaming中,将这个选项设置为true的话会使得Spark应用从kafka中读取数据之后就自动提交,

// 而不是数据处理之后提交,这不是我们想要的。所以为了更好地控制offsets的提交,我们建议将enable.auto.commit设为false。

kafkaParams.put("enable.auto.commit", false);

// 设置消费的topic

CollectionString> topicsColl = Arrays.asList("topic-test", "test");

// 获取DStream

JavaInputDStreamConsumerRecordString, String>> lines = KafkaUtils.createDirectStream(ssc,

LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsColl, kafkaParams));

// 遍历JavaInputDStream的到RDD

lines.foreachRDD(rdd -> {

// 遍历RDD得到每一列的信息

rdd.foreach(x -> {

// 获取行

String value = x.value();

// 获取需要保存的每一行的信息

String[] split = value.split(";");

// 判断是否为保存数据需要的格式

// 保存offset,保证每次获取新数据

//获取偏移量

OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

 //保存偏移量到Kafka

((CanCommitOffsets) lines.inputDStream()).commitAsync(offsetRanges);

});

ssc.start();

try {

ssc.awaitTermination();

catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}


评论


亲,登录后才可以留言!