Java 操作 Hadoop 的 Mapreduce 数据处理
2020-12-13 05:51
标签:context throw writable 配置 内容 依赖 i++ 测试文件 XML 1.导入pom依赖 2.基本使用 a.创建 test1.txt 文件用于统计 b.创建 Mapper c.创建 Reducer d.使用 e.注意:若运行后报 "HADOOP_HOME and hadoop.home.dir are unset." 异常,则需要客户端也配置 HADOOP_HOME 环境变量,并重启电脑 3.数据分区 a.创建 test2.txt 文件用于统计 b.创建开销实体类 c.创建 Mapper d.创建 Reducer e.创建分区 Partition f.使用 4.数据排序 a.创建 test3.txt 文件用于统计 b.创建开销实体类,实现 WritableComparable,重写 compareTo 方法 c.创建 Mapper d.创建 Reducer e.使用 5.数据汇总(Combiner) a.执行顺序:Mapper—Combiner—Reducer b.作用:就是在各个map中预先进行一次,减少在reducer阶段的数据量,这样能提升很高的效率。 b.加入anjs中文分词依赖,用于分词(此依赖与Combiner无关) c.创建 Mapper d.创建 Reducer e.创建Combiner f.使用 6.参考文章:https://www.cnblogs.com/zhuxiaojie/p/7224772.html Java 操作 Hadoop 的 Mapreduce 数据处理 标签:context throw writable 配置 内容 依赖 i++ 测试文件 XML 原文地址:https://www.cnblogs.com/vettel0329/p/11151770.html properties>
......
hadoop.version>3.1.2hadoop.version>
properties>
dependencies>
dependency>
groupId>org.apache.hadoopgroupId>
artifactId>hadoop-commonartifactId>
version>${hadoop.version}version>
dependency>
dependency>
groupId>org.apache.hadoopgroupId>
artifactId>hadoop-hdfsartifactId>
version>${hadoop.version}version>
dependency>
dependency>
groupId>org.apache.hadoopgroupId>
artifactId>hadoop-clientartifactId>
version>${hadoop.version}version>
dependency>
dependency>
groupId>org.apache.hadoopgroupId>
artifactId>hadoop-mapreduce-client-coreartifactId>
version>${hadoop.version}version>
dependency>
......
dependencies>
hello zhangsan
lisi nihao
hi zhangsan
nihao lisi
x xiaoming
/**
* 这部分的输入是由mapreduce自动读取进来的
* 简单的统计单词出现次数
* KEYIN 默认情况下,是mapreduce所读取到的一行文本的起始偏移量,Long类型,在hadoop中有其自己的序列化类LongWriteable
* VALUEIN 默认情况下,是mapreduce所读取到的一行文本的内容,hadoop中的序列化类型为Text
* KEYOUT 是用户自定义逻辑处理完成后输出的KEY,在此处是单词,String
* VALUEOUT 是用户自定义逻辑输出的value,这里是单词出现的次数,Long
*/
public class WordCountMapper extends Mapper/**
* 第一个Text: 是传入的单词名称,是Mapper中传入的
* 第二个:LongWritable 是该单词出现了多少次,这个是mapreduce计算出来的,比如 hello出现了11次
* 第三个Text: 是输出单词的名称 ,这里是要输出到文本中的内容
* 第四个LongWritable: 是输出时显示出现了多少次,这里也是要输出到文本中的内容
*
*/
public class WordCountReduce extends Reducer
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
public class TestMain {
private static final String HDFS_PREFIX = "hdfs://localhost:9527";
private static FileSystem fs;
/**
* 上传测试文件到HDFS文件系统
*/
public static void doBeforeWork(String sourceFilePath, String inFile, String outDir) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", HDFS_PREFIX); // 对应 core-site.xml 中配置的端口
// 拿到操作HDFS的一个实例,并且设置其用户(由于windows权限问题"zwj"需替换为管理员账号)
fs = FileSystem.get(new URI(HDFS_PREFIX),conf,"zwj");
FSDataOutputStream fout = fs.create(new Path(inFile), true);
InputStream in = new FileInputStream(sourceFilePath);
IOUtils.copyBytes(in, fout, 1024,true);
//删除结果文件夹
fs.delete(new Path(outDir), true);
}
/**
* 打印结果
*/
public static void doAfterWork(String outFilePath) throws Exception {
FSDataInputStream fin = fs.open(new Path(outFilePath));
IOUtils.copyBytes(fin, System.out, 1024,true);
}
/**
* 运行
*/
public static void run(String inFilePath, String outFilePath) throws Exception{
Configuration conf = new Configuration();
//如果是打包在linux上运行,则不需要写这两行代码
// //指定运行在yarn中
// conf.set("mapreduce.framework.name", "yarn");
// //指定resourcemanager的主机名
// conf.set("yarn.resourcemanager.hostname", "localhost");
Job job = Job.getInstance(conf);
//使得hadoop可以根据类包,找到jar包在哪里
job.setJarByClass(TestMain.class);
//指定Mapper的类
job.setMapperClass(WordCountMapper.class);
//指定reduce的类
job.setReducerClass(WordCountReduce.class);
//设置Mapper输出的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//设置最终输出的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定输入文件的位置
FileInputFormat.setInputPaths(job, new Path(inFilePath));
//指定输入文件的位置
FileOutputFormat.setOutputPath(job, new Path(outFilePath));
//将job中的参数,提交到yarn中运行
// job.submit();
try {
job.waitForCompletion(true);
//这里的为true,会打印执行结果
} catch (ClassNotFoundException | InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
String sourceFilePath = "E:/tmp/test1.txt";
String inFile = "/mydir/test1.txt";
String inFileUrl = HDFS_PREFIX + inFile;
String outDir = "/outdir";
String outDirUrl = HDFS_PREFIX + outDir;
String outFileUrl = outDirUrl + "/part-r-00000";
doBeforeWork(sourceFilePath, inFile, outDir);
run(inFileUrl, outDirUrl);
doAfterWork(outFileUrl);
}catch (Exception e){
e.printStackTrace();
}
}
}
张三 江西 打车 200
李四 广东 住宿 600
王五 北京 伙食 320
张三 江西 话费 50
张三 湖南 打车 900
周六 上海 采购 3000
李四 西藏 旅游 1000
王五 北京 借款 500
李四 上海 话费 50
周六 北京 打车 600
张三 广东 租房 3050
public class SpendBean implements Writable {
private Text userName;
private IntWritable money;
private Text province;
public SpendBean(Text userName, IntWritable money, Text province) {
this.userName = userName;
this.money = money;
this.province = province;
}
/**
* 反序列化时必须有一个空参的构造方法
*/
public SpendBean(){}
/**
* 序列化的代码
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
userName.write(out);
money.write(out);
province.write(out);
}
/**
* 反序列化的代码
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
userName = new Text();
userName.readFields(in);
money = new IntWritable();
money.readFields(in);
province = new Text();
province.readFields(in);
}
public Text getUserName() {
return userName;
}
public void setUserName(Text userName) {
this.userName = userName;
}
public IntWritable getMoney() {
return money;
}
public void setMoney(IntWritable money) {
this.money = money;
}
public Text getProvince() {
return province;
}
public void setProvince(Text province) {
this.province = province;
}
@Override
public String toString() {
return "[SpendBean]: userName[" + userName + "], money[" + money + "], province[" + province + "]";
}
}
public class GroupUserMapper extends Mapper
public class GroupUserReducer extends Reducer
public class ProvincePartitioner extends Partitioner
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
public class TestMain {
private static final String HDFS_PREFIX = "hdfs://localhost:9527";
private static FileSystem fs;
/**
* 上传测试文件到HDFS文件系统
*/
public static void doBeforeWork(String sourceFilePath, String inFile, String outDir) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", HDFS_PREFIX); // 对应 core-site.xml 中配置的端口
// 拿到操作HDFS的一个实例,并且设置其用户(由于windows权限问题"zwj"需替换为管理员账号)
fs = FileSystem.get(new URI(HDFS_PREFIX),conf,"zwj");
FSDataOutputStream fout = fs.create(new Path(inFile), true);
InputStream in = new FileInputStream(sourceFilePath);
IOUtils.copyBytes(in, fout, 1024,true);
//删除结果文件夹
fs.delete(new Path(outDir), true);
}
/**
* 打印结果
*/
public static void doAfterWork(List
张三 2980
李四 8965
王五 1987
小黑 6530
小陈 2963
小梅 980
public class Spend implements WritableComparable
public class SortMapper extends Mapper
public class SortReducer extends Reducer
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
public class TestMain {
private static final String HDFS_PREFIX = "hdfs://localhost:9527";
private static FileSystem fs;
/**
* 上传测试文件到HDFS文件系统
*/
public static void doBeforeWork(String sourceFilePath, String inFile, String outDir) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", HDFS_PREFIX); // 对应 core-site.xml 中配置的端口
// 拿到操作HDFS的一个实例,并且设置其用户(由于windows权限问题"zwj"需替换为管理员账号)
fs = FileSystem.get(new URI(HDFS_PREFIX),conf,"zwj");
FSDataOutputStream fout = fs.create(new Path(inFile), true);
InputStream in = new FileInputStream(sourceFilePath);
IOUtils.copyBytes(in, fout, 1024,true);
//删除结果文件夹
fs.delete(new Path(outDir), true);
}
/**
* 打印结果
*/
public static void doAfterWork(String outFilePath) throws Exception {
FSDataInputStream fin = fs.open(new Path(outFilePath));
IOUtils.copyBytes(fin, System.out, 1024,true);
}
/**
* 运行
*/
public static void run(String inFilePath, String outFilePath) throws Exception {
Configuration config = new Configuration();
Job job = Job.getInstance(config);
job.setJarByClass(TestMain.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setMapOutputKeyClass(Spend.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Spend.class);
FileInputFormat.setInputPaths(job,new Path(inFilePath));
FileOutputFormat.setOutputPath(job,new Path(outFilePath));
boolean b = job.waitForCompletion(true);
if(b){
//success
}
}
public static void main(String[] args) {
try {
String sourceFilePath = "E:/tmp/test3.txt";
String inFile = "/mydir/test3.txt";
String inFileUrl = HDFS_PREFIX + inFile;
String outDir = "/outdir";
String outDirUrl = HDFS_PREFIX + outDir;
String outFileUrl = outDirUrl + "/part-r-00000";
doBeforeWork(sourceFilePath, inFile, outDir);
run(inFileUrl, outDirUrl);
doAfterWork(outFileUrl);
}catch (Exception e){
e.printStackTrace();
}
}
}
dependency>
groupId>org.ansjgroupId>
artifactId>ansj_segartifactId>
version>5.1.1version>
dependency>
public class StoryMapper extends Mapper
public class StoryReducer extends Reducer
public class StoryCombiner extends Reducer
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
public class TestMain {
private static final String HDFS_PREFIX = "hdfs://localhost:9527";
private static FileSystem fs;
/**
* 上传测试文件到HDFS文件系统
*/
public static void doBeforeWork(String sourceFilePath, String inFile, String outDir) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", HDFS_PREFIX); // 对应 core-site.xml 中配置的端口
// 拿到操作HDFS的一个实例,并且设置其用户(由于windows权限问题"zwj"需替换为管理员账号)
fs = FileSystem.get(new URI(HDFS_PREFIX),conf,"zwj");
FSDataOutputStream fout = fs.create(new Path(inFile), true);
InputStream in = new FileInputStream(sourceFilePath);
IOUtils.copyBytes(in, fout, 1024,true);
//删除结果文件夹
fs.delete(new Path(outDir), true);
}
/**
* 打印结果
*/
public static void doAfterWork(String outFilePath) throws Exception {
FSDataInputStream fin = fs.open(new Path(outFilePath));
IOUtils.copyBytes(fin, System.out, 1024,true);
}
/**
* 运行
*/
public static void run(String inFilePath, String outFilePath) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(TestMain.class);
job.setMapperClass(StoryMapper.class);
job.setReducerClass(StoryReducer.class);
job.setCombinerClass(StoryCombiner.class);//设置Combiner
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,new Path(inFilePath));
FileOutputFormat.setOutputPath(job,new Path(outFilePath));
boolean b = job.waitForCompletion(true);
if(b){
//success
}
}
public static void main(String[] args) {
try {
String sourceFilePath = "E:/tmp/test4.txt";
String inFile = "/mydir/test4.txt";
String inFileUrl = HDFS_PREFIX + inFile;
String outDir = "/outdir";
String outDirUrl = HDFS_PREFIX + outDir;
String outFileUrl = outDirUrl + "/part-r-00000";
doBeforeWork(sourceFilePath, inFile, outDir);
run(inFileUrl, outDirUrl);
doAfterWork(outFileUrl);
}catch (Exception e){
e.printStackTrace();
}
}
}
上一篇:Test Windows Live Write Exception
下一篇:[The network path was not found][0x80070035] 为何不能访问Windows共享?
文章标题:Java 操作 Hadoop 的 Mapreduce 数据处理
文章链接:http://soscw.com/essay/31865.html