Hadoop之HDFS(客户端操作) 2 HDFS的API操作 3 HDFS的I/O流操作
2021-02-08 14:16
标签:ring value name 获取 row 删除 拷贝 目录 version 1.编写源代码 2.将hdfs-site.xml拷贝到项目的根目录下 3.参数优先级 参数优先级排序:(1)客户端代码中设置的值 >(2)ClassPath下的用户自定义配置文件 >(3)然后是服务器的默认配置 1.需求:把本地文件上传到HDFS根目录 2.编写代码 1.需求:从HDFS上下载banhua.txt文件到本地e盘上 2.编写代码 1.需求:分块读取HDFS上的大文件,比如根目录下的/hadoop-2.7.2.tar.gz 2.编写代码 (1)下载第一块 (2)下载第二块 3)合并文件 在Window命令窗口中进入到目录E:\,然后执行如下命令,对数据进行合并 type hadoop-2.7.2.tar.gz.part2 >> hadoop-2.7.2.tar.gz.part1 合并完成后,将hadoop-2.7.2.tar.gz.part1重新命名为hadoop-2.7.2.tar.gz。解压发现该tar 个人代码: Hadoop之HDFS(客户端操作) 2 HDFS的API操作 3 HDFS的I/O流操作 标签:ring value name 获取 row 删除 拷贝 目录 version 原文地址:https://www.cnblogs.com/Diyo/p/11355695.html2 HDFS的API操作
2.1 HDFS文件上传(测试参数优先级)
// 文件上传
@Test
public void testPut() throws Exception {
Configuration conf = new Configuration();
conf.set("dfs.replication", "2");
// 1.获取fs对象
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
// 2.执行上传API
fs.copyFromLocalFile(new Path("D:\\Ztest\\yema.png"), new Path("/diyo/dashen/dengzhiyong/yema3.png"));
// 3.关闭资源
fs.close();
System.out.println("上传over");
}
2.2 HDFS文件下载
// 文件下载
@Test
public void testGet() throws Exception {
//1 获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
//2 执行下载操作
// fs.copyToLocalFile(new Path("/diyo/dashen/dengzhiyong/yema3.png"), new Path("D:\\Ztest\\yema2.png"));
// delSrc是否删除源,路径,路径,useRawLocalFileSystem是否使用本地校验true(不产生crc校验)
fs.copyToLocalFile(false, new Path("/diyo/dashen/dengzhiyong/yema3.png"), new Path("D:\\Ztest\\yema3.png"),
true);
//3 关闭资源
fs.close();
System.out.println("下载over");
}
2.3 HDFS文件夹删除
// 文件/文件夹删除
@Test
public void testRmdir() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
// 删除(recursive:true :递归删除)
fs.delete(new Path("/diyo/dashen/dengzhiyong/yema3.png"), true);
fs.close();
System.out.println("删除over");
}
2.4 HDFS文件名更改
// 更改文件名
@Test
public void testReName() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
fs.rename(new Path("/diyo/dashen/dengzhiyong/yema2.png"), new Path("/diyo/dashen/dengzhiyong/yema3.png"));
fs.close();
System.out.println("重命名over");
}
2.5 HDFS文件详情查看
// 查看文件详情:名称、权限、长度、块信息
@Test
public void testListFile() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
RemoteIterator
2.6 HDFS文件和文件夹判断
// 文件和文件夹的判断
@Test
public void testListStatus() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus fileStatus : listStatus) {
if (fileStatus.isFile()) {
System.out.println("文件-:" + fileStatus.getPath().getName());
}
if (fileStatus.isDirectory()) {
System.out.println("文件夹r:/" + fileStatus.getPath().getName());
fs.listFiles(fileStatus.getPath(), true);
}
}
/*
* RemoteIterator
2.7 HDFS查看文件内容目录结构
//查看文件内容
@Test
public void testCatFileContext() throws Exception{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
FSDataInputStream fdis = fs.open(new Path("/xsync"));
int len = 0;
while((len = fdis.read())!=-1) {
System.out.print((char)len);
}
}
//查看目录结构
@Test
public void showTree() throws Exception{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus sta : listStatus) {
if (sta.isFile() && sta.getLen() > 0) {
showDetail(sta);
// System.out.println("------------");
}else if (sta.isDirectory()) {
showDetail(sta);
}
}
}
private void showDetail(FileStatus sta) {
System.out.println
(sta.getPath()+"\t"+
sta.getLen()+"\t"+
sta.getOwner()+"\t"+
sta.getAccessTime());
}
3 HDFS的I/O流操作
3.1 HDFS文件上传
@Test
public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");
// 2 创建输入流
FileInputStream fis = new FileInputStream(new File("e:/banhua.txt"));
// 3 获取输出流
FSDataOutputStream fos = fs.create(new Path("/banhua.txt"));
// 4 流对拷
IOUtils.copyBytes(fis, fos, configuration);
// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}
3.2 HDFS文件下载
// 文件下载
@Test
public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");
// 2 获取输入流
FSDataInputStream fis = fs.open(new Path("/banhua.txt"));
// 3 获取输出流
FileOutputStream fos = new FileOutputStream(new File("e:/banhua.txt"));
// 4 流的对拷
IOUtils.copyBytes(fis, fos, configuration);
// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}
3.3 定位文件读取
@Test
public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");
// 2 获取输入流
FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
// 3 创建输出流
FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part1"));
// 4 流的拷贝
byte[] buf = new byte[1024];
for(int i =0 ; i ){
fis.read(buf);
fos.write(buf);
}
// 5关闭资源
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
fs.close();
}
@Test
public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");
// 2 打开输入流
FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
// 3 定位输入数据位置
fis.seek(1024*1024*128);
// 4 创建输出流
FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part2"));
// 5 流的对拷
IOUtils.copyBytes(fis, fos, configuration);
// 6 关闭资源
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
}
package com.diyo.hdfs;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class HDFSIO {
// 从本地上传到HDFS
@Test
public void testputFileToHDFS() throws Exception {
// 1 获取对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
// 2 获取输入流
FileInputStream fis = new FileInputStream("D:/Ztest/yema.png");
// 3 获取输出流
FSDataOutputStream fos = fs.create(new Path("/newyama.png"));
// 4 流的对拷
IOUtils.copyBytes(fis, fos, conf);
// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
System.out.println("over");
}
// 从HDFS下载到本地
@Test
public void testgetFileFromHDFS() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
FSDataInputStream fis = fs.open(new Path("/newyama.png"));
FileOutputStream fos = new FileOutputStream("d:/Ztest/newyema.png");
IOUtils.copyBytes(fis, fos, conf);
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
System.out.println("over");
}
// 定位文件读取(下载第一块)
@Test
public void testReadFileSeek1() throws Exception {
Configuration conf = new Configuration();
// 获取对象
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
// 获取输入流
FSDataInputStream fis = fs.open(new Path("/hadoop-3.1.0.tar.gz"));
// 获取输出流
FileOutputStream fos = new FileOutputStream("d:/Ztest/hadoop-3.1.0.tar.gz.part1");
// 流的对拷
byte[] buf = new byte[1024];
for (int i = 0; i ) {
fis.read(buf);
fos.write(buf);
}
// 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
System.out.println("over");
}
// 定位文件读取(下载第二块)
@Test
public void testReadFileSeek2() throws Exception {
Configuration conf = new Configuration();
// 获取对象
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
// 获取输入流
FSDataInputStream fis = fs.open(new Path("/hadoop-3.1.0.tar.gz"));
// 设置指定读取的起点
fis.seek(1024*1024*128);
// 获取输出流
FileOutputStream fos = new FileOutputStream("d:/Ztest/hadoop-3.1.0.tar.gz.part2");
// 流的对拷
IOUtils.copyBytes(fis, fos, conf);
//关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
System.out.println("over");
}
}
文章标题:Hadoop之HDFS(客户端操作) 2 HDFS的API操作 3 HDFS的I/O流操作
文章链接:http://soscw.com/index.php/essay/52673.html