kafka flumn sparkstreaming java实现监听文件夹内容保存到Phoenix中
2020-12-13 03:01
// broker地址,可写多个“,”分隔
String brokers = "master:9092";
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("streamingKafka");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("WARN");
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(10));
// kafka相关参数,必要!缺了会报错
MapString, Object> kafkaParams = new HashMap();
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "cloudera_mirrormaker");
// Topic分区 也可以通过配置项实现
// 如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
// earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
// latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
// none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
kafkaParams.put("auto.offset.reset", "latest");
// 如果是使用spark-streaming-kafka-0-10,那么我们建议将enable.auto.commit设为false。
// 这个配置只是在这个版本生效,enable.auto.commit如果设为true的话,那么意味着offsets会按照auto.commit.interval.ms中所配置的间隔来周期性自动提交到Kafka中。
// 在Spark Streaming中,将这个选项设置为true的话会使得Spark应用从kafka中读取数据之后就自动提交,
// 而不是数据处理之后提交,这不是我们想要的。所以为了更好地控制offsets的提交,我们建议将enable.auto.commit设为false。
kafkaParams.put("enable.auto.commit", false);
// 设置消费的topic
CollectionString> topicsColl = Arrays.asList("topic-test", "test");
// 获取DStream
JavaInputDStreamConsumerRecordString, String>> lines = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsColl, kafkaParams));
// 遍历JavaInputDStream的到RDD
lines.foreachRDD(rdd -> {
// 遍历RDD得到每一列的信息
rdd.foreach(x -> {
// 获取行
String value = x.value();
// 获取需要保存的每一行的信息
String[] split = value.split(";");
// 判断是否为保存数据需要的格式
// 保存offset,保证每次获取新数据
//获取偏移量
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
//保存偏移量到Kafka
((CanCommitOffsets) lines.inputDStream()).commitAsync(offsetRanges);
});
ssc.start();
try {
ssc.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
文章标题:kafka flumn sparkstreaming java实现监听文件夹内容保存到Phoenix中
文章链接:http://soscw.com/essay/26775.html