Flume整合Kafka(基于kerberos认证)——完成实时数据采集
2021-01-26 20:14
标签:event mem color ons buffers ber getname 程序 use 如果现在要想将flume中的sink设置为kafka,因为在实际的开发中,可能会有若干个子系统或者若干个客户端进行flume日志采集,那么能够承受这种采集任务量的只有kafka来完成,可是需要注意一个问题,现在的kafka是采用了Kerberos认证,所以要想在flume之中去使用kafka操作,就需要考虑到开发包以及jaas配置问题。 1、将kafka的客户端的程序jar文件拷贝到flume的lib目录之中: 2、在"D:\"目录下建立jass配置文件 3、修改flume.cnf文件追加kafka 4、window启动flume 5、启动kafka消费端——FlumeReceiveMessageConsumer.java 6、启动业务程序,模拟打印消息——TestFlumeDemo.java 7、FlumeReceiveMessageConsumer.java消费端会接收到flume采集的日志数据 Flume整合Kafka(基于kerberos认证)——完成实时数据采集 标签:event mem color ons buffers ber getname 程序 use 原文地址:https://www.cnblogs.com/linjiqin/p/13227991.htmlmv kafka-clients-0.10.2.1.jar D:\dev\apache-flume-1.7.0-bin\lib
vim D:\kafka_client_jaas.confKafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice-pwd";
};
vim D:\dev\apache-flume-1.7.0-bin\conf\flume.conf# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#a1.sources.r1.type = netcat
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.0.106
a1.sources.r1.port = 44444
# Describe the sink
# a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# Use a channel which buffers events in memory
# a1.channels.c1.type = memory
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = 203.195.205.63:9092
a1.channels.c1.kafka.topic = mldn-topic
a1.channels.c1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.channels.c1.kafka.producer.sasl.mechanism = PLAIN
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
cd D:\dev\apache-flume-1.7.0-bin\bind:
flume-ng.cmd agent --conf D:/dev/apache-flume-1.7.0-bin/conf --conf-file D:/dev/apache-flume-1.7.0-bin/conf/flume.conf --name a1 -property "flume.root.logger=INFO,console;java.security.auth.login.config=D:/kafka_client_jaas.conf"
package cn.mldn.mykafka.consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
* Flume整合Kafka -- kafka消费端
*
* @author hp
*
*/
public class FlumeReceiveMessageConsumer {
public static final String SERVERS = "203.195.205.63:9092";
public static final String TOPIC = "mldn-topic";
static {
System.setProperty("java.security.auth.login.config",
"d:/kafka_client_jaas.conf"); // 表示系统环境属性
}
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
// 定义消息消费者的连接服务器地址
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
// 消息消费者一定要设置反序列化的程序类,与消息生产者完全对应
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
// 定义消费者处理对象
Consumer
package cn.mldn.myflume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestFlumeDemo {
private static final Logger LOGGER = LoggerFactory
.getLogger(TestFlumeDemo.class);
public static void main(String[] args) {
for (int x = 0 ; x ) {
LOGGER.info("lynch.cn" + x);
}
}
}
flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705707577flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level
20000Fflume.client.log4j.message.encodingUTF8
flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705716934flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level
20000Fflume.client.log4j.message.encodingUTF8
flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705717194flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level
20000Fflume.client.log4j.message.encodingUTF8
文章标题:Flume整合Kafka(基于kerberos认证)——完成实时数据采集
文章链接:http://soscw.com/essay/47424.html