Flume配置项目 离线数仓项目
2021-01-29 18:16
标签:star write elf class eating avro inf 建立 ret 1.安装的 Flume是1.7的版本 安装略 日志文件分为两种一种是启动日志,一种是事件日志. 通过selector选择器把这两个日志分开,并且过滤一些空数据. 我们先写选择器吧. 1. 建立一个工程. maven工程 Utils 打成jar包传入lib目录下 我们打开log4j文件 开启 flume 即可在生成日志的情况下,查看 cd /tmp/log lg.sh 命令 扯到另外一个问题,如何制造日志文件 开启 一下的测试kafkachannel 把kafka的安装省略 开启kafka 查看在zk中的注册情况 然后开启flume ,下面就是最新的配置文件 ,flume channel到kafka里面去 在咖啡卡里查看有没有channel到的数据 flume-hdfs.conf Flume配置项目 离线数仓项目 标签:star write elf class eating avro inf 建立 ret 原文地址:https://www.cnblogs.com/mengbin0546/p/13198550.html 第一层Flume 从 source-> KAFKA .[kafka作为channel]
"1.0" encoding="UTF-8"?>
以下两个类是自己写的有问题 。
MyInterceptor package com.atguigu.dw.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class MyInterceptor implements Interceptor {
//创建符合要求的数据集合
private List
package com.atguigu.dw.flume;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
public class ETLUtils {
//判断启动startLog日志是否符合要求
//验证json字符串的完整性,是否已{} 开通
public static boolean validaStratLog(String source) {
//1. 判断body部分是否有数据
if ((StringUtils.isBlank(source))) {
return false;
}
//2. 去前后空格
String trimStr = source.trim();
//3. 验证json字符串的完整性,是否以{}开头
if (trimStr.startsWith("{") && trimStr.endsWith("}")) {
return true;
}
return false;
}
//判断事件Event日志是否符合要求
//"1593089978858|{\"cm\":{\"ln\":\"-75.0\",\"sv\":\n" +
// "\"V2.3.8\",\"os\":\"8.1.2\",\"g\":\"4LALWMU9@gmail.com\",\"mid\":\"5\",\"nw\":\"4G\",\"l\":\"pt\",\"vc\":\"17\",\"hw\":\"750*1134\",\"ar\":\"\n" +
// "MX\",\"uid\":\"5\",\"t\":\"1593008192223\",\"la\":\"-35.0\",\"md\":\"HTC-13\",\"vn\":\"1.1.1\",\"ba\":\"HTC\",\"sr\":\"M\"},\"ap\":\"app\",\"e\n" +
// "t\":[{\"ett\":\"1593072416695\",\"en\":\"loading\",\"kv\":{\"extend2\":\"\",\"loading_time\":\"3\",\"action\":\"2\",\"extend1\":\"\",\"t\n" +
// "ype\":\"3\",\"type1\":\"325\",\"loading_way\":\"1\"}},{\"ett\":\"1593070499363\",\"en\":\"ad\",\"kv\":{\"entry\":\"1\",\"show_style\":\"\n" +
// "5\",\"action\":\"4\",\"detail\":\"201\",\"source\":\"3\",\"behavior\":\"2\",\"content\":\"1\",\"newstype\":\"0\"}},{\"ett\":\"1593022214\n" +
// "129\",\"en\":\"active_foreground\",\"kv\":{\"access\":\"\",\"push_id\":\"3\"}}]}\n";
public static boolean validEvent(String source) {
//1. 判断body部分是否有数据
if ((StringUtils.isBlank(source))) {
return false;
}
//2. 去前后空格
String trimStr = source.trim();
//3.
String[] words = trimStr.split("\\|");
if (words.length != 2) {
return false;
}
//判斷时间戳
if (words[0].length() != 13 || !NumberUtils.isDigits(words[0])) {
return false;
}
//3. 验证json字符串的完整性,是否以{}开头
if (words[1].startsWith("{") && words[1].endsWith("}")) {
return true;
}
return false;
}
}
Flume conf文件编写案例1 . 编辑 config文件 这个是测试用的.
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$ //定义了数据源的格式
#json保存的位置
a1.sources.r1.positionFile=/opt/module/log_postition.json
#定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor //Lib目录下的拦截器
#定义sink
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog=100
#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
# -Dflume.root.logger=DEBUG,console when launching flume. 这个是debug的命令 ,可以很好的排错
#flume.root.logger=DEBUG,console
flume.root.logger=INFO,LOGFILE
flume.log.dir=./logs
flume.log.file=flume.log
log4j.logger.org.apache.flume.lifecycle = INFO
log4j.logger.org.jboss = WARN
log4j.logger.org.mortbay = INFO
log4j.logger.org.apache.avro.ipc.NettyTransceiver = WARN
log4j.logger.org.apache.hadoop = INFO
log4j.logger.org.apache.hadoop.hive = ERROR
# Define the root logger to the system property "flume.root.logger".
log4j.rootLogger=${flume.root.logger}
# Stock log4j rolling file appender
# Default log rotation configuration
log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.LOGFILE.MaxFileSize=100MB
log4j.appender.LOGFILE.MaxBackupIndex=10
log4j.appender.LOGFILE.File=${flume.log.dir}/${flume.log.file}
log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.LOGFILE.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n
# Warning: If you enable the following appender it will fill up your disk if you don‘t have a cleanup job!
# This uses the updated rolling file appender from log4j-extras that supports a reliable time-based rolling policy.
# See http://logging.apache.org/log4j/companions/extras/apidocs/org/apache/log4j/rolling/TimeBasedRollingPolicy.html
# Add "DAILY" to flume.root.logger above if you want to use this
log4j.appender.DAILY=org.apache.log4j.rolling.RollingFileAppender
log4j.appender.DAILY.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy
log4j.appender.DAILY.rollingPolicy.ActiveFileName=${flume.log.dir}/${flume.log.file}
log4j.appender.DAILY.rollingPolicy.FileNamePattern=${flume.log.dir}/${flume.log.file}.%d{yyyy-MM-dd}
log4j.appender.DAILY.layout=org.apache.log4j.PatternLayout
log4j.appender.DAILY.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n
# console
# Add "console" to flume.root.logger above if you want to use this
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n
/opt/module/apache-flume-1.7.0-bin/bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/module/apache-flume-1.7.0-bin/conf/flume-test.conf -Dflume.root.logger=DEBUG
#! /bin/bash
for i in hadoop101 hadoop101
do
ssh $i " source /etc/profile ; java -classpath /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.atgu #这个是自己制作的 log生成jar文件.
igu.appclient.AppMain $1 $2 >/opt/module/test.log &" #红色输出到一个位置,意义不大.
done
代码里会看到放入到了/tmp/log目录下.#一下是正确的 信息
-2.7.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-2.7.3/share/hadoop/mapreduce/*:/opt/module/hadoop-2.7.3/contrib/capacity-scheduler/*.jar:/lib/*‘ -Djava.library.path=:/opt/module/hadoop-2.7.3/lib/native org.apache.flume.node.Application --name a1 --conf-file /opt/module/apache-flume-1.7.0-bin/conf/flume-test.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/apache-flume-1.7.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/apache-flume-1.7.0-bin/lib/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
20/06/27 10:54:39 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
20/06/27 10:54:39 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/opt/module/apache-flume-1.7.0-bin/conf/flume-test.conf
20/06/27 10:54:39 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
20/06/27 10:54:39 INFO conf.FlumeConfiguration: Processing:k1
20/06/27 10:54:39 INFO conf.FlumeConfiguration: Processing:k1
20/06/27 10:54:39 INFO conf.FlumeConfiguration: Processing:k1
20/06/27 10:54:39 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
20/06/27 10:54:39 INFO node.AbstractConfigurationProvider: Creating channels
20/06/27 10:54:39 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
20/06/27 10:54:39 INFO node.AbstractConfigurationProvider: Created channel c1
20/06/27 10:54:39 INFO source.DefaultSourceFactory: Creating instance of source r1, type TAILDIR
20/06/27 10:54:39 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
20/06/27 10:54:39 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
20/06/27 10:54:39 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=PollableSourceRunner: { source:Taildir source: { positionFile: /opt/module/log_postition.json, skipToEnd: false, byteOffsetHeader: false, idleTimeout: 120000, writePosInterval: 3000 } counterGroup:{ name:null counters:{} } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@736c21d4 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
20/06/27 10:54:39 INFO node.Application: Starting Channel c1
20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
20/06/27 10:54:39 INFO node.Application: Starting Sink k1
20/06/27 10:54:39 INFO node.Application: Starting Source r1
20/06/27 10:54:39 INFO taildir.TaildirSource: r1 TaildirSource source starting with directory: {f1=/tmp/logs/^app.+.log$}
20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: taildirCache: [{filegroup=‘f1‘, filePattern=‘/tmp/logs/^app.+.log$‘, cached=true}]
20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: headerTable: {}
20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: Opening file: /tmp/logs/app-2020-06-27.log, inode: 2643708, pos: 0
20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: Updating position from position file: /opt/module/log_postition.json
20/06/27 10:54:39 INFO taildir.TailFile: Updated position, file: /tmp/logs/app-2020-06-27.log, inode: 2643708, pos: 5693224
20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
#一下是正确的 信息
-2.7.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-2.7.3/share/hadoop/mapreduce/*:/opt/module/hadoop-2.7.3/contrib/capacity-scheduler/*.jar:/lib/*‘ -Djava.library.path=:/opt/module/hadoop-2.7.3/lib/native org.apache.flume.node.Application --name a1 --conf-file /opt/module/apache-flume-1.7.0-bin/conf/flume-test.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/apache-flume-1.7.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/apache-flume-1.7.0-bin/lib/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
20/06/27 10:54:39 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
20/06/27 10:54:39 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/opt/module/apache-flume-1.7.0-bin/conf/flume-test.conf
20/06/27 10:54:39 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
20/06/27 10:54:39 INFO conf.FlumeConfiguration: Processing:k1
20/06/27 10:54:39 INFO conf.FlumeConfiguration: Processing:k1
20/06/27 10:54:39 INFO conf.FlumeConfiguration: Processing:k1
20/06/27 10:54:39 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
20/06/27 10:54:39 INFO node.AbstractConfigurationProvider: Creating channels
20/06/27 10:54:39 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
20/06/27 10:54:39 INFO node.AbstractConfigurationProvider: Created channel c1
20/06/27 10:54:39 INFO source.DefaultSourceFactory: Creating instance of source r1, type TAILDIR
20/06/27 10:54:39 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
20/06/27 10:54:39 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
20/06/27 10:54:39 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=PollableSourceRunner: { source:Taildir source: { positionFile: /opt/module/log_postition.json, skipToEnd: false, byteOffsetHeader: false, idleTimeout: 120000, writePosInterval: 3000 } counterGroup:{ name:null counters:{} } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@736c21d4 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
20/06/27 10:54:39 INFO node.Application: Starting Channel c1
20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
20/06/27 10:54:39 INFO node.Application: Starting Sink k1
20/06/27 10:54:39 INFO node.Application: Starting Source r1
20/06/27 10:54:39 INFO taildir.TaildirSource: r1 TaildirSource source starting with directory: {f1=/tmp/logs/^app.+.log$}
20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: taildirCache: [{filegroup=‘f1‘, filePattern=‘/tmp/logs/^app.+.log$‘, cached=true}]
20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: headerTable: {}
20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: Opening file: /tmp/logs/app-2020-06-27.log, inode: 2643708, pos: 0
20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: Updating position from position file: /opt/module/log_postition.json
20/06/27 10:54:39 INFO taildir.TailFile: Updated position, file: /tmp/logs/app-2020-06-27.log, inode: 2643708, pos: 5693224
20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
一下是生成的debug信息.
20/06/27 10:52:10 INFO sink.LoggerSink: Event: { headers:{topic=topic_event} body: 31 35 39 33 32 32 36 33 32 34 35 39 36 7C 7B 22 1593226324596|{" }
20/06/27 10:52:10 INFO sink.LoggerSink: Event: { headers:{topic=topic_event} body: 31 35 39 33 32 32 36 33 32 34 35 39 37 7C 7B 22 1593226324597|{" }
20/06/27 10:52:10 INFO sink.LoggerSink: Event: { headers:{topic=topic_event} body: 31 35 39 33 32 32 36 33 32 34 35 39 37 7C 7B 22 1593226324597|{" }
20/06/27 10:52:10 INFO sink.LoggerSink: Event: { headers:{topic=topic_start} body: 7B 22 61 63 74 69 6F 6E 22 3A 22 31 22 2C 22 61 {"action":"1","a }
20/06/27 10:52:10 INFO sink.LoggerSink: Event: { headers:{topic=topic_start} body: 7B 22 61 63 74 69 6F 6E 22 3A 22 31 22 2C 22 61 {"action":"1","a }/opt/module/kafka_2.11-1.0.1/bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
/opt/module/kafka_2.11-1.0.1/bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --replication-factor 2 --partitions 1 --topic topic_start 创建 topic ,在创建个event 的.
/opt/module/kafka_2.11-1.0.1/bin/kafka-topics.sh --delete --zookeeper hadoop102:2181 --topic topic_start
/opt/module/kafka_2.11-1.0.1/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-1.0.1/config/server.properties
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[30, 20, 10] 三台注册正常.
/opt/module/apache-flume-1.7.0-bin/bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/module/apache-flume-1.7.0-bin/conf/flume.conf -Dflume.root.logger=DEBUG,console
案例2 . flume conf文件测试
以下是flume的配置文件 ,channel到kafkachannel里面去,没有sink阶段.
[root@hadoop101 conf]# more flume.conf
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.channels = c1 c2
#组名名.属性名=属性值
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.batchSize =1000
a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$
#json保存的位置
a1.sources.r1.positionFile=/opt/module/log_postition.json
#定义拦截器
a1.sources.r1.interceptors = i1
#a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder
#定义选择器
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header =topic
a1.sources.r1.selector.mapping.topic_start =c1
a1.sources.r1.selector.mapping.topic_event =c2
#定义channel
a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092,
a1.channels.c1.kafka.topic=topic_start
a1.channels.c1.parseAsFlumeEvent=false
a1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092,
a1.channels.c2.kafka.topic=topic_event
a1.channels.c2.parseAsFlumeEvent=false
#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1 c2
第二层Flume 从 kafka->HDFS .[kafka作作为kafka_source]