ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

HDFS的API使用

2021-08-19 18:04:29  阅读:247  来源: 互联网

标签:HDFS fs Configuration FileSystem API 使用 new configuration friends


前提:请参考这里,然后再阅读此内容.

1.HDFS文件上传 下载 删除 改名称 文件详情查看 文件和文件夹的判断

package com.lxz.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Test;


import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class HdfsClient {

    @Test
    public void testMkdirs() throws IOException,InterruptedException, URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        //2.配置在集群上运行
        configuration.set("fs.defaultFS","hdfs://hadoop1:9000");
//        FileSystem fs = FileSystem.get(configuration);
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        // 3.1 创建目录
        fs.mkdirs(new Path("//lxz/hdfs/first/tt/oo"));
        //4.关闭资源
        fs.close();
    }

    // HDFS文件上传
    @Test
    public void testCopyFromLocalFile() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        configuration.set("dfs.replication","2");
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.上传文件
        fs.copyFromLocalFile(new Path("D:/input/friends/friends.txt"), new Path("/lxz/friends/friends.txt"));
        //3.关闭资源
        fs.close();
    }

    // HDFS文件下载
    @Test
    public void testCopyToLocalFile() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.执行下载操作
//        boolean delSrc //是否将原文件删除
//        Path src // 要下载的文件路径
//        Path dst // 将文件下载到的路径
//        boolean useRawLocalFileSystem // 是否开启文件校验
        fs.copyToLocalFile(false,new Path("/lxz/friends/friends.txt"), new Path("D:/input"), true);
        // 3.关闭资源
        fs.close();
    }

    // HDFS文件删除
    @Test
    public void testDelete() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.执行删除
        fs.delete(new Path("/input/friends/friends.txt"),true);
        //3.关闭资源
        fs.close();
    }

    // HDFS文件名更改
    @Test
    public void testRename() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.修改文件名称
        fs.rename(new Path("/input/friends/friends.txt"), new Path("/input/friends/friends_two.txt"));
        //3.关闭资源
        fs.close();
    }

    // HDFS文件详情查看
    @Test
    public void testListFiles() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.获取文件详情
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"),true);

        while   (listFiles.hasNext()){
            LocatedFileStatus status = listFiles.next();
            //输出详情
            //文件名称
            System.out.println(status.getPath().getName());
            //长度
            System.out.println(status.getLen());
            //权限
            System.out.println(status.getPermission());
            //分组
            System.out.println(status.getGroup());

            //获取存储的块信息
            BlockLocation[] blockLocations = status.getBlockLocations();
            for (BlockLocation blockLocation:blockLocations){
                //获取块存储的主机节点
                String[] hosts = blockLocation.getHosts();
                for (String host:hosts){
                    System.out.println(host);
                }
            }
            System.out.println("-----运行成功标志-------");

            //3.关闭资源
            fs.close();
        }
    }

    // HDFS文件和文件夹的判断
    @Test
    public void testListStatus() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.判断是文件还是文件夹
        FileStatus[] listStatus = fs.listStatus(new Path("/"));
        for (FileStatus fileStatus:listStatus){
            //如果是文件
            if (fileStatus.isFile()){
                System.out.println("f:" + fileStatus.getPath().getName());
            }else   {
                System.out.println("d:" + fileStatus.getPath().getName());
            }
        }
        //3.关闭资源
        fs.close();
    }

}

 

2.HDFS的IO流操作

HDFS文件上传 下载 大文件分块下载

package com.lxz.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;


import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

// 通过IO流 处理HDFS API

public class hdfsIoClient {

    //HDFS文件上传
    @Test
    public void putFileToHDFS() throws IOException,InterruptedException, URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.创建输入流
        FileInputStream fileInputStream = new FileInputStream(new File("d:/friends/friends.txt"));
        //3.获取输出流
        FSDataOutputStream fsDataOutputStream = fs.create(new Path("/input/friends/friends.txt"));
        //4.流对拷
        IOUtils.copyBytes(fileInputStream,fsDataOutputStream,configuration);
        //5.关闭资源
        IOUtils.closeStream(fileInputStream);
        IOUtils.closeStream(fsDataOutputStream);
        fs.close();
    }

    //HDFS文件下载
    //需求描述:从HDFS上下载friends.txt文件到本地D:/input/friends目录下
    @Test
    public void getFileFromHDFS() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.获取输入流
        FSDataInputStream fis = fs.open(new Path("/lxz/input/friends/friends.txt"));
        //3.获取输出流
        FileOutputStream fos = new FileOutputStream(new Path("D:/input/friends/friends.txt"));
        //4.流对拷
        IOUtils.copyBytes(fis,fos,configuration);
        //5.关闭资源
        IOUtils.closeStream(fis);
        IOUtils.closeStream(fos);
        fs.close();
    }

    //HDFS下载大文件分块下载
    //需求描述:分块读取HDFS上的大文件,比如根目录下的hadoop-2.7.2.tar.gz

    //1.下载第一块
    @Test
    public void readFileSeek1() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration, "root");
        //2.获取输入流
        FSDataInputStream fis = fs.open(new Path("./hadoop-2.7.2.tar.gz"));
        //3.获取输出流
        FileOutputStream fos = new FileOutputStream(new File("D:/input/friends/hadoop-2.7.2.tar.gz.part1"));
        //4.流对拷
        byte[] buf = new byte[1024];
        for (int i = 0; i < 1024 * 128; i++) {
            fis.read(buf);
            fos.write(buf);
        }
        //5.关闭资源
        IOUtils.closeStream(fis);
        IOUtils.closeStream(fos);
        fs.close();
    }
    //2.下载第二块
    @Test
    public void readFileSeek2() throws IOException,InterruptedException,URISyntaxException{
        //1.获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration"root");
        //2.打开输入流
        FSDataInputStream fis = fs.open(new Path("./hadoop-2.7.2.tar.gz"));
        //3.定位输入数据位置
        fis.seek(1024*1024*128);
        //4.创建输出流
        FileOutputStream fos = new FileOutputStream(new File("D:/input/friends/hadoop-2.7.2.tar.gz.part2"));
        //5.流对拷
        IOUtils.copyBytes(fis,fos,configuration);
        //6.关闭资源
        IOUtils.closeStream(fis);
        IOUtils.closeStream(fos);
        fs.close();

//        合并文件
//        在Window命令窗口中进入到文件输出目录,然后执行如下命令,对数据进行合并
//        type hadoop-2.7.2.tar.gz.part2 >> hadoop-2.7.2.tar.gz.part1
//        合并完成后,将hadoop-2.7.2.tar.gz.part1重新命名为hadoop-2.7.2.tar.gz。解压发现该tar包非常完整。
    }
}

 

标签:HDFS,fs,Configuration,FileSystem,API,使用,new,configuration,friends
来源: https://www.cnblogs.com/lxzcloud/p/15163036.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有