ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Hadoop支持的文件格式之Parquet

2021-06-10 18:04:31  阅读:272  来源: 互联网

标签:parquet Parquet Hadoop hadoop 文件格式 org apache import


文章目录

0x00 文章内容
  1. 行存储与列存储
  2. 编码实现Parquet格式的读写
  3. 彩蛋
0x01 行存储与列存储
1. Avro与Parquet

a. 请参考文章:Hadoop支持的文件格式之Avro0x01 行存储与列存储

0x02 编码实现Parquet格式的读写
1. 编码实现读写Parquet文件

a. 引入Parquet相关jar包

    <!--添加Parquet依赖-->
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-column</artifactId>
        <version>1.8.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-hadoop</artifactId>
        <version>1.8.1</version>
    </dependency>

b. 完整的写Parquet文件代码(写到HDFS)

package com.shaonaiyi.hadoop.filetype.parquet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;

import java.io.IOException;

/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/18 10:14
 * @Description 编码实现写Parquet文件
 */
public class ParquetFileWriter {

    public static void main(String[] args) throws IOException {
        MessageType schema = MessageTypeParser.parseMessageType("message Person {\n" +
                "    required binary name;\n" +
                "    required int32 age;\n" +
                "    required int32 favorite_number;\n" +
                "    required binary favorite_color;\n" +
                "}");

        Configuration configuration = new Configuration();
        Path path = new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/parquet/data.parquet");
        GroupWriteSupport writeSupport = new GroupWriteSupport();
        GroupWriteSupport.setSchema(schema, configuration);
        ParquetWriter<Group> writer = new ParquetWriter<Group>(path, writeSupport,
                CompressionCodecName.SNAPPY,
                ParquetWriter.DEFAULT_BLOCK_SIZE,
                ParquetWriter.DEFAULT_PAGE_SIZE,
                ParquetWriter.DEFAULT_PAGE_SIZE,
                ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
                ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
                ParquetProperties.WriterVersion.PARQUET_1_0, configuration);

        GroupFactory groupFactory = new SimpleGroupFactory(schema);
        Group group = groupFactory.newGroup()
                .append("name", "shaonaiyi")
                .append("age", 18)
                .append("favorite_number", 7)
                .append("favorite_color", "red");

        writer.write(group);

        writer.close();
    }

}

c. 完整的读Parquet文件代码(从HDFS读)

package com.shaonaiyi.hadoop.filetype.parquet;

import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;

import java.io.IOException;

/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/18 10:18
 * @Description 编码实现读Parquet文件
 */
public class ParquetFileReader {

    public static void main(String[] args) throws IOException {


        Path path = new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/parquet/parquet-data.parquet");
        GroupReadSupport readSupport = new GroupReadSupport();
        ParquetReader<Group> reader = new ParquetReader<>(path, readSupport);

        Group result = reader.read();
        System.out.println("name:" + result.getString("name", 0).toString());
        System.out.println("age:" + result.getInteger("age", 0));
        System.out.println("favorite_number:" + result.getInteger("favorite_number", 0));
        System.out.println("favorite_color:" + result.getString("favorite_color", 0));
    }

}
2. 查看读写Parquet文件结果

a. 写Parquet文件
在这里插入图片描述
b. 读Parquet文件
在这里插入图片描述

3. 编码实现读写Parquet文件(HDFS)

a. 引入Parquet与Avro关联的jar包

    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.8.1</version>
    </dependency>

从上面的代码我们可以看出,以下面这种方式定义Schema很不友好:

    MessageType schema = MessageTypeParser.parseMessageType("message Person {\n" +
            "    required binary name;\n" +
            "    required int32 age;\n" +
            "    required int32 favorite_number;\n" +
            "    required binary favorite_color;\n" +
            "}");

所以我们可以将Parquet与Avro关联,直接使用Avro的Schema即可。

b. 完整的写Parquet文件代码(HDFS)

package com.shaonaiyi.hadoop.filetype.parquet;

import com.shaonaiyi.hadoop.filetype.avro.Person;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.parquet.avro.AvroParquetOutputFormat;

import java.io.IOException;

/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/18 10:47
 * @Description 编码实现写Parquet文件(HDFS)
 */
public class MRAvroParquetFileWriter {

    public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException, InterruptedException {
        //1 构建一个job实例
        Configuration hadoopConf = new Configuration();
        Job job = Job.getInstance(hadoopConf);

        //2 设置job的相关属性
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Person.class);
        job.setOutputFormatClass(AvroParquetOutputFormat.class);
        //AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
        AvroParquetOutputFormat.setSchema(job, Person.SCHEMA$);


        //3 设置输出路径
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/avro-parquet"));

        //4 构建JobContext
        JobID jobID = new JobID("jobId", 123);
        JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);

        //5 构建taskContext
        TaskAttemptID attemptId = new TaskAttemptID("attemptId", 123, TaskType.REDUCE, 0, 0);
        TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);

        //6 构建OutputFormat实例
        OutputFormat format = job.getOutputFormatClass().newInstance();

        //7 设置OutputCommitter
        OutputCommitter committer = format.getOutputCommitter(hadoopAttemptContext);
        committer.setupJob(jobContext);
        committer.setupTask(hadoopAttemptContext);

        //8 获取writer写数据,写完关闭writer
        RecordWriter<Void, Person> writer = format.getRecordWriter(hadoopAttemptContext);
        Person person = new Person();
        person.setName("shaonaiyi");
        person.setAge(18);
        person.setFavoriteNumber(7);
        person.setFavoriteColor("red");
        writer.write(null, person);
        writer.close(hadoopAttemptContext);

        //9 committer提交job和task
        committer.commitTask(hadoopAttemptContext);
        committer.commitJob(jobContext);
    }

}

c. 完整的读Parquet文件代码(HDFS)

package com.shaonaiyi.hadoop.filetype.parquet;

import com.shaonaiyi.hadoop.filetype.avro.Person;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.parquet.avro.AvroParquetInputFormat;

import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/18 10:52
 * @Description 编码实现读Parquet文件(HDFS)
 */
public class MRAvroParquetFileReader {

    public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException {
        //1 构建一个job实例
        Configuration hadoopConf = new Configuration();

        Job job = Job.getInstance(hadoopConf);

        //2 设置需要读取的文件全路径
        FileInputFormat.setInputPaths(job, "hdfs://master:9999/user/hadoop-sny/mr/filetype/avro-parquet");

        //3 获取读取文件的格式
        AvroParquetInputFormat inputFormat = AvroParquetInputFormat.class.newInstance();

        AvroParquetInputFormat.setAvroReadSchema(job, Person.SCHEMA$);
        //AvroJob.setInputKeySchema(job, Person.SCHEMA$);

        //4 获取需要读取文件的数据块的分区信息
        //4.1 获取文件被分成多少数据块了
        JobID jobID = new JobID("jobId", 123);
        JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);

        List<InputSplit> inputSplits = inputFormat.getSplits(jobContext);

        //读取每一个数据块的数据
        inputSplits.forEach(new Consumer<InputSplit>() {
            @Override
            public void accept(InputSplit inputSplit) {
                TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.MAP, 0, 0);
                TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);
                RecordReader<NullWritable, Person> reader = null;
                try {
                    reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext);
                    reader.initialize(inputSplit, hadoopAttemptContext);
                    while (reader.nextKeyValue()) {
                        System.out.println(reader.getCurrentKey());
                        Person person = reader.getCurrentValue();
                        System.out.println(person);
                    }
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

    }

}

4. 查看读写Parquet文件(HDFS)结果

a. 写Parquet文件(HDFS)
在这里插入图片描述
b. 读Parquet文件(HDFS),Key没有设置值
在这里插入图片描述

0x03 彩蛋
  1. 编写读写Parquet文件Demo
package com.shaonaiyi.hadoop.filetype.parquet;

import com.shaonaiyi.hadoop.filetype.avro.Person;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/18 11:11
 * @Description 编写读写Parquet文件Demo
 */
public class AvroParquetDemo {

    public static void main(String[] args) throws IOException {
        Person person = new Person();
        person.setName("shaonaiyi");
        person.setAge(18);
        person.setFavoriteNumber(7);
        person.setFavoriteColor("red");

        Path path = new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/avro-parquet2");

        ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
                .withSchema(Person.SCHEMA$)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .build();

        writer.write(person);

        writer.close();

        ParquetReader<Object> avroParquetReader = AvroParquetReader.builder(path).build();
        Person record = (Person)avroParquetReader.read();
        System.out.println("name:" + record.getName());
        System.out.println("age:" + record.get("age").toString());
        System.out.println("favorite_number:" + record.get("favorite_number").toString());
        System.out.println("favorite_color:" + record.get("favorite_color"));

    }


}
  1. 控制台可以读出文件
    在这里插入图片描述
  2. HDFS上也有数据了
    在这里插入图片描述
0xFF 总结
  1. 在MapReduce作业中如何使用:
    job.setInputFormatClass(AvroParquetInputFormat.class);
    AvroParquetInputFormat.setAvroReadSchema(job, Person.SCHEMA$);
    
    job.setOutputFormatClass(ParquetOutputFormat.class);
    AvroParquetOutputFormat.setSchema(job, Person.SCHEMA$);
  1. 文章:网站用户行为分析项目之会话切割(二)9. 保存统计结果 时就是以Parquet的格式保存下来的。
  2. Hadoop支持的文件格式系列:
    Hadoop支持的文件格式之Text
    Hadoop支持的文件格式之Avro
    Hadoop支持的文件格式之Parquet
    Hadoop支持的文件格式之SequenceFile

作者简介:邵奈一
全栈工程师、市场洞察者、专栏编辑
| 公众号 | 微信 | 微博 | CSDN | 简书 |

福利:
邵奈一的技术博客导航
邵奈一 原创不易,如转载请标明出处。


标签:parquet,Parquet,Hadoop,hadoop,文件格式,org,apache,import
来源: https://blog.51cto.com/u_12564104/2891702

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有