第2节 mapreduce深入学习:15、reduce端的join算法的实现

2020-12-13 02:12

阅读:412

标签:tostring   ast   小米   interrupt   方案   equals   set   system   val   

reduce端的join算法:

例子:

商品表数据 product: 
pid
p0001,小米5,1000,2000
p0002,锤子T1,1000,3000

订单表数据 order: 
           pid
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3

mapReduce可以实现sql语句的功能:select 。。。。。。from product p left join order o on p.pid = o.pid

 

思路:将关联的条件作为map输出的key。

 

缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。

替代解决方案: map端join实现方式。

 

代码:

ReduceJoinMain:
package cn.itcast.demo4.reduceJoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReduceJoinMain extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {

Job job = Job.getInstance(this.getConf(), ReduceJoinMain.class.getSimpleName());
// job.setJarByClass(ReduceJoinMain.class);

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\Study\\BigData\\heima\\stage2\\4、大数据离线第四天\\map端join\\input"));

job.setMapperClass(ReduceJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setReducerClass(ReduceJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\Study\\BigData\\heima\\stage2\\4、大数据离线第四天\\map端join\\reduce_join_output"));

boolean b = job.waitForCompletion(true);
return b?0:1;
}

public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new ReduceJoinMain(), args);
System.exit(run);
}

}

ReduceJoinMapper:
package cn.itcast.demo4.reduceJoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ReduceJoinMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
if(line.startsWith("p")){
context.write(new Text(split[0]),value);
}else{
context.write(new Text(split[2]),value);
}
}
}

ReduceJoinReducer:
package cn.itcast.demo4.reduceJoin;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ReduceJoinReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
String firP = "";
String secP = "";

for(Text text:values){
String value = text.toString();
if(value!=null && !"".equals(value)) {
if(value.startsWith("p"))
secP += value;
else
firP += value+"\t";
}
}

context.write(key,new Text(firP+secP));


}
}

 

第2节 mapreduce深入学习:15、reduce端的join算法的实现

标签:tostring   ast   小米   interrupt   方案   equals   set   system   val   

原文地址:https://www.cnblogs.com/mediocreWorld/p/11028591.html

上一篇:python 实现lda

下一篇:Java SDK下载方法


评论


亲,登录后才可以留言!