ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

小文件转存SequenceFile

2021-10-20 13:32:54  阅读:145  来源: 互联网

标签:文件 String org SequenceFile file new apache import 转存


  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.FileUtil;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.BytesWritable;
  5 import org.apache.hadoop.io.SequenceFile;
  6 import org.apache.hadoop.io.SequenceFile.Reader;
  7 import org.apache.hadoop.io.SequenceFile.Writer;
  8 import org.apache.hadoop.io.Text;
  9 import org.slf4j.Logger;
 10 import org.slf4j.LoggerFactory;
 11 
 12 import javax.imageio.stream.FileImageOutputStream;
 13 import java.io.File;
 14 import java.io.FileInputStream;
 15 import java.nio.charset.StandardCharsets;
 16 import java.util.ArrayList;
 17 import java.util.List;
 18 
 19 public class MergeSmallFilesToSequenceFile {
 20     private static Logger logger = LoggerFactory.getLogger(MergeSmallFilesToSequenceFile.class);
 21     private static Configuration configuration = new Configuration();
 22     private static List<String> smallFilePaths = new ArrayList<String>();
 23 
 24     /**
 25      *添加路径,读取文件夹下所有的文件绝对路径
 26      * @param inputPath
 27      * @throws Exception
 28      */
 29     public void addInputPath(String inputPath) throws Exception{
 30 
 31         File file = new File(inputPath);
 32 
 33         if(file.isDirectory()){
 34             File[] files = FileUtil.listFiles(file);
 35             for(File sFile:files){
 36                 smallFilePaths.add(sFile.getPath());
 37                 logger.info("添加小文件路径:" + sFile.getPath());
 38             }
 39         }else{
 40             smallFilePaths.add(file.getPath());
 41             logger.info("添加小文件路径:" + file.getPath());
 42         }
 43     }
 44 
 45     /**
 46      *合并小文件序列化存储到hdfs
 47      * @throws Exception
 48      */
 49     public void mergeFile() throws Exception{
 50 
 51         Writer.Option bigFile = Writer.file(new Path("/SequenceFile_Test/test/wangxin.test"));
 52 
 53         Writer.Option keyClass = Writer.keyClass(Text.class);
 54         Writer.Option valueClass = Writer.valueClass(BytesWritable.class);
 55 
 56         Writer writer = SequenceFile.createWriter(configuration, bigFile, keyClass, valueClass);
 57 
 58         Text key = new Text();
 59         for(String path:smallFilePaths){
 60             File file = new File(path);
 61             long fileSize = file.length();
 62             byte[] fileContent = new byte[(int)fileSize];
 63             FileInputStream inputStream = new FileInputStream(file);
 64             inputStream.read(fileContent, 0, (int)fileSize);
 65 
 66             //String md5Str = DigestUtils.md5Hex(fileContent);
 67 
 68             //logger.info("merge小文件:"+path+",md5:"+md5Str);
 69             key.set(path);
 70 
 71             writer.append(key, new BytesWritable(fileContent));
 72         }
 73         writer.hflush();
 74 
 75         /*for (String path : smallFilePaths) {
 76             File file = new File(path);
 77             file.deleteOnExit();
 78         }*/
 79         writer.close();
 80     }
 81 
 82     /**
 83      * 在hdfs指定位置读取文件,读取k,v
 84      * @throws Exception
 85      */
 86     public void readMergedFile() throws Exception{
 87         MergeSmallFilesToSequenceFile smf = new MergeSmallFilesToSequenceFile();
 88         configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
 89         Reader.Option file = Reader.file(new Path("hdfs://集群名/hdfs路径"));
 90         Reader reader = new Reader(configuration, file);
 91         Text key = new Text();
 92         BytesWritable value = new BytesWritable();
 93         while(reader.next(key, value)){
 94             byte[] bytes = value.copyBytes();
 95             //String md5 = DigestUtils.md5Hex(bytes);
 96             String content = new String(bytes, StandardCharsets.UTF_8);
 97             //logger.info("读取到文件:"+key+",md5:"+md5+",content:"+content);
 98             byte[] keyBytes = key.copyBytes();
 99             String keyStr = new String(keyBytes, StandardCharsets.UTF_8);
100             //判断是不是图片,是图片转存到本地路径
101             if (keyStr.contains(".png")) {
102                 String[] split = keyStr.split("\\\\");
103                 String fileName = split[split.length - 1];
104                 //拼接转存位置
105                 smf.byte2image(bytes,"/linux路径" + fileName);
106             }else {
107                 System.out.println("读取到文件:" + keyStr + ",content:" + content);
108             }
109 
110         }
111     }
112 
113     /**
114      *将byte数组转成图片
115      * @param data
116      * @param path
117      */
118     public  void byte2image(byte[] data,String path){
119         if(data.length<3||path.equals("")) return;
120         try{
121             FileImageOutputStream imageOutput = new FileImageOutputStream(new File(path));
122             imageOutput.write(data, 0, data.length);
123             imageOutput.close();
124             System.out.println("转换图片成功 " + path);
125         } catch(Exception ex) {
126             System.out.println("Exception: " + ex);
127             ex.printStackTrace();
128         }
129     }
130 
131 
132     //测试
133     public static void main(String[] args) throws Exception {
134         MergeSmallFilesToSequenceFile msf = new MergeSmallFilesToSequenceFile();
135 
136         /*List<String> smallFilePaths = new ArrayList<String>();
137         smallFilePaths = msf.getFiles("/tmp/logs",smallFilePaths);*/
138 
139 /*        msf.addInputPath("C:\\Users\\HR\\Desktop\\bigfile");
140 
141         for (String smallFilePath : smallFilePaths) {
142             System.out.println(smallFilePath);
143         }
144 
145         msf.mergeFile();*/
146 
147         msf.readMergedFile();
148 
149         /*msf.mergeFile();
150 
151         msf.readMergedFile();*/
152     }
153 }

 

标签:文件,String,org,SequenceFile,file,new,apache,import,转存
来源: https://www.cnblogs.com/China2008512/p/15428461.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有