学习文档参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html Flume简单概括就是一个收集日志的工具,它可以通过调用接口,RPC,还有网页的一些操作进行日志的收集。它是一个分布式开源的Java编写的由Apache维护的项目。 搭建前提条件 崇尚授人以渔的思想,我说给大家怎么下载就行了,就不直接放连接了,大家可以直接输入官网地址 http://flume.apache.org ,一般在官网的上方或者左边都会有Download按钮,这个在左侧,然后点进去下载想要的版本即可。 这个会有点慢,如果嫌弃的化,可以通过相关镜像网站进行下载,可以百度搜索软件镜像,就能搜到很多镜像网站,在里面就可以下载,如果你下载的东西属于Apache旗下的,可以看的有专门的一个Apache目录,里面存的都是Apache旗下相关产品。 可以先本地下载,然后通过ftp上传,也可以直接在服务器下载。 我这里下载好后,解压到了服务器/opt 目录下面,并修改了下目录名称为flume(你也可以不改,配置环境变量的时候按照实际情况来就行了。) 进入到 conf目录下面,对flume-env.sh进行编辑,将export JAVA_HOME修改为实际路径。 cd /opt/flume/conf/ vim flume-env.sh 环境变量存放的是软件的具体位置,运行程序命令会根据配置的变量找到软件并执行,否则会报错。(通过手动下载并上传到Linux服务器的都需要配置环境变量。) vim /etc/profile source /etc/profile 直接控制台运行 显示Flume 1.6.0 就好了,如果显示了Error什么报错信息先不用管。 日志采集系统: 请参考菜鸟教程: https://www.runoob.com/linux/nginx-install-setup.html 按照上述步骤安装完后,需要对nginx配置下访问日志格式: 编辑nginx.conf,默认安装路径在/etc/nginx下 cd /etc/nginx vim nginx.conf 在http模块下面添加: 解析:(以^A为日志分隔符,remote_addr代表远程地址,msec代表访问时间,http_host代表访问主机名,request_uri代表访问资源) 在server模块下面添加: 解析:(访问地址 域名/log.gif,请求格式是image,存放地址是/opt/data/access.log ) 这样访问nginx的时候就会生成类似下面的内容: 代码思路: 代码连接: https://gitee.com/shuai7boy/BIG_DATA_LOG 下载 去官网下载,然后上传到服务器并进行解压。 在flume配置文件中配置java环境变量 找到flume-env.sh进行配置,什么?没有?没关系,将flume-env.sh.template重命名下就行了。 配置flume环境变量 vim /etc/profile 验证是否配置成功 flume-ng version 当弹出以下内容,则说明配置成功。 编写flume代码,参考官方案例: http://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html 在安装Linux的服务器上,创建一个监控文件(姑且取名optionHdfs.conf): # example.conf: A single-node Flume configuration # Name the components on this agent # Describe/configure the source # Describe the sink # Use a channel which buffers events in memory # Bind the source and sink to the channel 运行flume,同样参考官方来就行:flume-ng agent --conf conf --conf-file optionHdfs.conf --name a1 -Dflume.root.logger=INFO,console 运行后就按照文件规则将日志里面的内容导入hdfs里了。 代码思路: 代码连接:https://gitee.com/shuai7boy/BIG_DATA_ETL 数据和维度: 主要拿用户时间活跃量和用户某段时间基于某个浏览器的活跃量来讨论。 代码思路: 定义来源: 在Runner类里面定义来源为HBase。 定义维度 将基于用户活跃度,基于某平台的用户活跃度,新增用户数,基于某平台的新增用户数等维度进行设定类。 进行Map: 继承TableMapper 根据查询的数据来源,映射成 维度+用户信息。 进行Reduce: 继承Reducer 计算成 维度+用户去重。 进行To MySql: 继承OutputFormat重写getRecordWriter,checkOutputSpecs,getOutputCommitter 继承RecordWriter重写write和close 继承IDimensionConverter重写getDimensionIdByValue,executeSql 在Runner里面定义写入MySql。SQL语句都定义在了配置文件里面,根据维度进行调用。 首先判断各个维度是否存在,不存在先写入维度信息。 然后就是写入更新统计信息(每映射10条更新一次。) 代码链接: https://gitee.com/shuai7boy/BIG_DATA_TOMYSQL Sqoop是一款导入导出数据的工具,可以将MySql,Oracel等关系型数据据导入到HDFS,Hive,HBase里面。 官方文档: http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html 1.在Linux服务器对Sqoop进行解压 我这里下载的是1.4.6版本。**请注意下载的时候一定要安装sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz 压缩包,不要下载 sqoop-1.4.6.tar.gz 压缩包,因为这个压缩包少东西。** 下载地址 http://archive.apache.org/dist/sqoop/1.4.6/ , 下载好后使用ftp工具将文件上传到Linux服务器上(我上传到了/opt/sqoop下面) 然后使用tar -xvf sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz 进行解压。 2.修改配置文件名字 进入到conf文件夹下, mv sqoop-env-template.sh sqoop-env.sh 3.配置环境变量 vim /etc/profile 添加: export SQOOP_HOME=/opt/sqoop/sqoop-1.4.6 path=$SQOOP_HOME/bin 4.校验是否安装成功 sqoop --version (如果出现异常,请根据异常修改bin下面的configure-sqoop) HBase表结构:Row,Name(列族+限定符),timestamp,Value hive和hbase表关联官方文档: https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration CREATE EXTERNAL TABLE event_logs( CREATE TABLE stats_view_depth ( CREATE TABLE stats_view_depth_tmp(pl string, date string, col string, ct bigint); 注:要继承udf 将编写的内容进行打包上传到linux服务器 create function date_convert as 'shuai7boy.vip.transformer.hive.DateDimensionUDF' using jar 'hdfs://tuge2:9000/transform/transform-0.0.1.jar'; 其中一开始没加端口号报错:java.lang.IllegalArgumentException: java.net.UnknownHostException: transform ,然后参考博文 https://blog.csdn.net/heming621/article/details/53317562 解决了。 上面我们将HDFS数据导入到了HBase里面,并且做了Hive表和HBase表同步,又因为Hive表支持HQL语句。所在在Hive里面使用HQL语句就能进行分析。 计算用户的浏览深度 from ( --将行转列 with tmp as select 'all' as pl,date as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col='pv1' union all 执行可能出现异常:Failed to recognize predicate 'xxx'. Failed rule: 'identifier' in column specification。 解决方案:主要原因是使用了date关键字导致的,弃用保留关键字即可。 在hive-site.xml里面添加如下命令: 参考博客: https://blog.csdn.net/sjf0115/article/details/73244762 退出hive命令,执行以下语句: sqoop export --connect jdbc:mysql://tuge1:3306/result_db --username root --password 123456 --table stats_view_depth --export-dir /user/hive/warehouse/stats_view_depth/* --input-fields-terminated-by "\t" --update-mode allowinsert --update-key platform_dimension_id,data_dimension_id,kpi_dimension_id; 然后就能在MySql里面看到数据了,以后的事情就是把数据在平台渲染下,这里就不演示了。
export JAVA_HOME=/opt/java/jdk1.8.0_221
export FLUME_HOME=/opt/flume
export FLUME_CONF_DIR=/opt/flume/conf
2.4通过flume-ng version验证是否配置成功
flume-ng version
log_format my_format ‘$remote_addr^A$msec^A$http_host^A$request_uri‘;
location =/log.gif { default_type image/gif; access_log /opt/data/access.log my_format; }^A1577365502.563^Atuge1^A/log.gif?en=e_crt&oid=123456&on=%E6%B5%8B%E8%AF%95%E8%AE%A2%E5%8D%95123456&cua=524.01&cut=RMB&pt=alipay&ver=1&pl=website&sdk=js&u_ud=039F6588-ED65-4187-87CF-9DBBC9F19645&u_mid=zhangsan&u_sd=605DECAA-93C0-46B7-AC47-7B1898DBD6BC&c_time=1577365502881&l=zh-CN&b_iev=Mozilla%2F5.0%20(Windows%20NT%2010.0%3B%20WOW64)%20AppleWebKit%2F537.36%20(KHTML%2C%20like%20Gecko)%20Chrome%2F78.0.3904.97%20Safari%2F537.36&b_rst=1536*86
export FLUME_HOME=/opt/flume export FLUME_CONF_DIR=/opt/flume/conf
Flume 1.6.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080 Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015 From source with checksum b29e416802ce9ece3269d34233baf43f
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/access.log --监控文件路径,新增加内容就会往hdfs里面写。
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://tuge2:9000/flume/webdata/%Y-%m-%d --填写active NameNode
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c13.5编写ETL代码,将HDFS内容导入到HBase里面
key string, pl string, en string, s_time bigint, p_url string, u_ud string, u_sd string
) ROW FORMAT SERDE ‘org.apache.hadoop.hive.hbase.HBaseSerDe‘
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler‘
with serdeproperties(‘hbase.columns.mapping‘=‘:key,log:pl,log:en,log:s_time,log:p_url,log:u_ud,log:u_sd‘)
platform_dimension_id bigint ,
data_dimension_id bigint ,
kpi_dimension_id bigint ,
pv1 bigint ,
pv2 bigint ,
pv3 bigint ,
pv4 bigint ,
pv5_10 bigint ,
pv10_30 bigint ,
pv30_60 bigint ,
pv60_plus bigint ,
created string
) row format delimited fields terminated by ‘\t‘;5.1.2创建一个临时表(PS:存放中间结果)
public class DateDimensionUDF extends UDF {
IDimensionConverter dimension=new DimensionConverImpl();
public IntWritable evaluate(Text txt) {
try {
DateDimension dateDimension=DateDimension.buildDate(TimeUtil.parseString2Long(txt.toString()), DateEnum.DAY);
int id= dimension.getDimensionIdByValue(dateDimension);
return new IntWritable(1);
}catch(IOException ex){
throw new RuntimeException("获取datedimension id异常");
pl, from_unixtime(cast(s_time/1000 as bigint),‘yyyy-MM-dd‘) as day, u_ud,
(case when count(p_url) = 1 then "pv1"
when count(p_url) = 2 then "pv2"
when count(p_url) = 3 then "pv3"
when count(p_url) = 4 then "pv4"
when count(p_url) >= 5 and count(p_url)
when count(p_url) >= 10 and count(p_url)
when count(p_url) >=30 and count(p_url)
else ‘pv60_plus‘ end) as pv
from event_logs
and p_url is not null
and pl is not null
and s_time >= unix_timestamp(‘2016-06-08‘,‘yyyy-MM-dd‘)1000
and s_time 1000
group by
pl, from_unixtime(cast(s_time/1000 as bigint),‘yyyy-MM-dd‘), u_ud
) as tmp
insert overwrite table stats_view_depth_tmp
select pl,day,pv,count(distinct u_ud) as ct where u_ud is not null group by pl,day,pv;
select pl,date as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv1‘ union all
select pl,date as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv2‘ union all
select pl,date as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv3‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv4‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv5_10‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv10_30‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv30_60‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col=‘pv60_plus‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv2‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv3‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv4‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv5_10‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv10_30‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv30_60‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col=‘pv60_plus‘
from tmp
insert overwrite table stats_view_depth
select 2,date_convert(date1),6,sum(pv1),sum(pv2),sum(pv3),sum(pv4),sum(pv5_10),sum(pv10_30),sum(pv30_60),sum(pv60_plus),‘2017-01-10‘ group by pl,date1;5.5使用Sqoop将hive中的数据同步到MySql里面