第2节 mapreduce深入学习:15、reduce端的join算法的实现
2020-12-13 02:12
标签:tostring ast 小米 interrupt 方案 equals set system val reduce端的join算法: 例子: 商品表数据 product: 订单表数据 order: mapReduce可以实现sql语句的功能:select 。。。。。。from product p left join order o on p.pid = o.pid 思路:将关联的条件作为map输出的key。 缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。 替代解决方案: map端join实现方式。 代码: 第2节 mapreduce深入学习:15、reduce端的join算法的实现 标签:tostring ast 小米 interrupt 方案 equals set system val 原文地址:https://www.cnblogs.com/mediocreWorld/p/11028591.html
pid
p0001,小米5,1000,2000
p0002,锤子T1,1000,3000
pid
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3ReduceJoinMain:
package cn.itcast.demo4.reduceJoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ReduceJoinMain extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(this.getConf(), ReduceJoinMain.class.getSimpleName());
// job.setJarByClass(ReduceJoinMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\Study\\BigData\\heima\\stage2\\4、大数据离线第四天\\map端join\\input"));
job.setMapperClass(ReduceJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(ReduceJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\Study\\BigData\\heima\\stage2\\4、大数据离线第四天\\map端join\\reduce_join_output"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new ReduceJoinMain(), args);
System.exit(run);
}
}ReduceJoinMapper:
package cn.itcast.demo4.reduceJoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ReduceJoinMapper extends Mapper
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
if(line.startsWith("p")){
context.write(new Text(split[0]),value);
}else{
context.write(new Text(split[2]),value);
}
}
}ReduceJoinReducer:
package cn.itcast.demo4.reduceJoin;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ReduceJoinReducer extends Reducer
@Override
protected void reduce(Text key, Iterable
String firP = "";
String secP = "";
for(Text text:values){
String value = text.toString();
if(value!=null && !"".equals(value)) {
if(value.startsWith("p"))
secP += value;
else
firP += value+"\t";
}
}
context.write(key,new Text(firP+secP));
}
}
上一篇:python 实现lda
下一篇:Java SDK下载方法
文章标题:第2节 mapreduce深入学习:15、reduce端的join算法的实现
文章链接:http://soscw.com/essay/25113.html