ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

MR之排序

2021-09-20 18:59:37  阅读:144  来源: 互联网

标签:MR hadoop job org apache import 排序 public


1、MR 中的排序

MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑.上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

  • MapTask它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,溢写完毕后,它会对磁盘上所有文件进行归并排序。
  • ReduceTask 当所有数据拷贝完毕后,ReduceTask统-对内存和磁盘上的所有数据进行一次归并排序。
  1. 部分排序.
    MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
  2. 全排序
    最终输出结果只有一个文件,且文件内部有序。实现方式是只设置- -个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件完全丧失了MapReduce所提供的并行架构。
  3. 辅助排序: ( GroupingComparator分组)
    在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
  4. 二次排序.
    在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

2、WritableComparable

Bean对象如果作为Map输出的key时,需要实WritableComparable接口并重写compareTo方法指定排序规则

2.1 全排序

数据

01	    a00df6s	 kar	 120.196.100.99	    384	       33	                       200
日志id   设备id  厂商id    ip           自有内容时长(秒) 第三方内容时长(秒)     网络状态码

需求

每台设备按照总播放时长(自由时长+第三方时长)  降序排序

bean

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class SpeakCompareBean implements WritableComparable<SpeakCompareBean> {
    private  String deviceId;
    private long selfDuration;  //  自有播放时长
    private long thirdPartDuration;  //  第三方播放时长
    private long sumDuration;   //   总时长

    public SpeakCompareBean() {
    }

    public SpeakCompareBean(String deviceId, long selfDuration, long thirdPartDuration) {
        this.deviceId = deviceId;
        this.selfDuration = selfDuration;
        this.thirdPartDuration = thirdPartDuration;
        this.sumDuration = selfDuration + thirdPartDuration;
    }

  
    @Override
    public int compareTo(SpeakCompareBean o) {
        long sumDuration =  o.sumDuration;
        int  result ;
        if ( this.sumDuration > sumDuration){
            result = 1;
        }  else if (this.sumDuration < sumDuration){
            result  = -1;
        } else {
            result = 0;
        }
        return  result;

    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(deviceId);
        dataOutput.writeLong(selfDuration);
        dataOutput.writeLong(thirdPartDuration);
        dataOutput.writeLong(sumDuration);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.deviceId= dataInput.readUTF();
        this.selfDuration = dataInput.readLong();
        this.thirdPartDuration = dataInput.readLong();
        this.sumDuration = dataInput.readLong();
    }

    @Override
    public String toString() {
        return   deviceId + '\t' +
                 selfDuration +  "\t" +
                 thirdPartDuration +  "\t" +
                 sumDuration ;
    }
}

map

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class SpeakCompareBeanMapper extends Mapper<LongWritable, Text,SpeakCompareBean, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //super.map(key, value, context);
        String[] fields = value.toString().trim().split("\t");
        String deviceId = fields[1];
        long selfDuration  = Long.parseLong(fields[4]);
        long thirdPartDuration = Long.parseLong(fields[5]);
        SpeakCompareBean speakCompareBean =  new SpeakCompareBean(deviceId,selfDuration,thirdPartDuration);
        NullWritable  e = NullWritable.get();
        context.write(speakCompareBean, e);                                                                                
    }
}

reduce

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SpeakCompareBeanReducer extends Reducer<SpeakCompareBean, NullWritable, NullWritable,SpeakCompareBean> {
    @Override
    protected void reduce(SpeakCompareBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        for(NullWritable value : values){
            context.write(value,key);
        }
    }
}

main

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class SpeakCompareBeanDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(SpeakCompareBeanDriver.class);

        job.setMapperClass(SpeakCompareBeanMapper.class);
        job.setReducerClass(SpeakCompareBeanReducer.class);

        job.setMapOutputKeyClass(SpeakCompareBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(SpeakCompareBean.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        boolean result =job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }
}

运行结果
在这里插入图片描述

2.2 GroupingComparator

GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑。

数据
在这里插入图片描述
需求

需要求出每一个订单中成交金额最大的一笔交易

实现思路

  • Mapper
    读取一行文本数据,切分出每个字段;订单id和金额封装为一个Bean对象,Bean对象的排序规则指定为先按照订单Id排序,订单Id相等再按照金额降序排;map()方法输出kv;key–>bean对象 ,value–>NullWritable.get();

  • Shuffle
    指定分区器,保证相同订单id的数据去往同个分区(自定义分区器)

  • 指定GroupingComparator,分组规则指定只要订单Id相等则认为属于同一组;

  • Reduce
    每个reduce()方法写出一组key的第一个

bean

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {
    private  String  orderId;
    private Double price;
    public OrderBean(){}

    public OrderBean(String  orderId,Double price){
        this.orderId = orderId;
        this.price = price;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    @Override
    public int compareTo(OrderBean o) {
        int  i = orderId.compareTo(o.getOrderId());
        if (i == 0){
            return - price.compareTo(o.getPrice());
        }
        return - i;
    }

    @Override
    public String toString() {
        return
                "orderId='" + orderId + '\'' +
                ", price=" + price ;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(orderId);
        dataOutput.writeDouble(price);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.orderId = dataInput.readUTF();
        this.price = dataInput.readDouble();
    }
}

partitioner

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {
    @Override
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
        //自定义分区,将相同订单id的数据发送到同一个reduce里面去
        return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % i;
    }
}

WritableComparator

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupingComparator  extends WritableComparator {
    //将我们自定义的OrderBean注册到我们自定义的CustomGroupIngCompactor当中来
    //表示我们的分组器在分组的时候,对OrderBean这一种类型的数据进行分组
    //传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象
    public OrderGroupingComparator() {
        super(OrderBean.class, true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean first = (OrderBean) a;
        OrderBean second = (OrderBean) b;
        final int i = first.getOrderId().compareTo(second.getOrderId());
        if (i == 0) {
            System.out.println(first.getOrderId() + "----" +
                    second.getOrderId());
        }
        return i;
    }
}

map

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //super.map(key, value, context);
        String[]  fields = value.toString().trim().split("\t");
        String orderId = fields[0];
        Double price = Double.parseDouble(fields[2]);
        context.write(new OrderBean(orderId,price),NullWritable.get());
    }
}

reduce

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class OrderReducer  extends Reducer<OrderBean, NullWritable,OrderBean, NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //每个reduce里面第一个就已经是金额最大的了;
        //
        context.write(key, NullWritable.get());
    }
}

main

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class OrderDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
       // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
       // 2 指定本程序的jar包所在的本地路径
        job.setJarByClass(OrderDriver.class);
        // 3 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);
        // 4 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);
        // 6 指定job的输入原始文件所在目录

        FileInputFormat.setInputPaths(job, new Path("E:\\hdfs_test_dir\\groupingComparator.txt"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\hdfs_test_dir\\order_output"));
        // 7 指定分区器,指定分组比较器,设置reducetask数量
        job.setPartitionerClass(OrderPartitioner.class);
        job.setGroupingComparatorClass(OrderGroupingComparator.class);
        job.setNumReduceTasks(2);
        // 8 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

运行结果
在这里插入图片描述
在这里插入图片描述

标签:MR,hadoop,job,org,apache,import,排序,public
来源: https://blog.csdn.net/ly13607255628/article/details/120391703

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有