Flume(四)【配置文件总结】
2021-01-24 17:14
标签:bin intercept pattern 脚本 interval inf pool pac head Agent的配置文件最好根据Flume的拓扑架构,依次写好每个节点的配置文件; 开头都是先要定义agent,sorce,channel,sink名 注意:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。 channel选择器有两种:replicating(默认),multiplexing logger 打印控制台 Flume(四)【配置文件总结】 标签:bin intercept pattern 脚本 interval inf pool pac head 原文地址:https://www.cnblogs.com/wh984763176/p/13252516.html
一.Agent
# Name the components on this agent( 描述这个Agent,给各个组件取名字)
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
二.Source
taildir
# Describe/configure the source
a1.sources.r3.type = TAILDIR
#维护这每个文件读取到的最新的位置
a1.sources.r3.positionFile = /opt/module/flume/tail_dir.json
#可配置多目录
a1.sources.r3.filegroups = f1 f2
#正则匹配文件名
a1.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a1.sources.r3.filegroups.f2 = /opt/module/flume/files/.*log.*
arvo
# Describe/configure the source
# source端的avro是一个数据接收服务
a1.sources.r1.type = avro
#接收的主机
a1.sources.r1.bind = hadoop102
#要和上级的avro的sink的端口一致
a1.sources.r1.port = 4141
netstat
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
exec
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
spooldir
# Describe/configure the source
a1.sources.r3.type = spooldir
# 指定文件夹
a1.sources.r3.spoolDir = /opt/module/flume/upload
#指定文件上传后的后缀
a1.sources.r3.fileSuffix = .COMPLETED
a1.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a1.sources.r3.ignorePattern = ([^ ]*\.tmp)
三.Sink
hdfs
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否对时间戳取整
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#创建文件夹的时间单位
a1.sinks.k1.hdfs.roundUnit = day
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件,单位:s
a1.sinks.k1.hdfs.rollInterval = 3600
#设置每个文件的滚动大小,一般略小于128M
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0
kafka(待续)
hbase(待续)
arvo
# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
#发送的目的主机ip
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
logger
# Describe the sink
a1.sinks.k1.type = logger
本地目录(file_roll)
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/datas/flume3
四.Channel
# Describe the channel
#channel的类型为memory或者file
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
五.组件绑定
# Bind the source and sink to the channel
#组件绑定,1个source,2个channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
六.自定义拦截器和channle选择器
a1.sources.r1.interceptors = i1
#自定义拦截器的全类名
a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.TypeInterceptor$Builder
#channel选择器选用multiplexing类型
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.hello = c1
a1.sources.r1.selector.mapping.nohello = c2
七.负载均衡和故障转移
# Name the components on this agent
a1.sources = r1
a1.channels = c1
#添加sink组
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#配置为故障转移(failover)
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
#sink组的绑定
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
八.启动flume
#启动脚本 flume的conf目录 agent名字 执行的配置文件
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
#缩写形式
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console