ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

一文学会MapReduce编程

2021-01-04 17:02:40  阅读:230  来源: 互联网

标签:IntWritable Text 编程 hadoop MapReduce 文学 job class


MapReduce编程模型,相对于初学者来说,会有一些门槛,没关系,这一篇让你学会使用MapReduce进行分布式处理。

基础知识

MapReduce 框架只对 <key, value> 形式的键值对进行处理。MapReduce会将任务的输入当成一组 <key, value> 键值对,最后也会生成一组 <key, value> 键值对作为结果。常见的输入为文件,此时读取的行偏移量会作为Key,文件内容作为Value。

key 和 value 的类必须由框架来完成序列化,所以需要实现其中的可写接口(Writable)。如果需要进行数据排序,还必须实现 WritableComparable 接口。MapReduce已经提供了基本数据类型的Writable实现类,自定义类需要自行实现接口。

常见的基本数据类型的Writable有IntWritable、LongWritable、Text等等。

MapReduce任务由Map和Reduce两个过程,所以需要分别进行编写。Map的实现需要继承Mapper类,实现map方法完成数据处理;Reduce则要继承Reduer类,实现reduce方法完成数据聚合。

/*
 * KEYIN:输入kv数据对中key的数据类型
 * VALUEIN:输入kv数据对中value的数据类型
 * KEYOUT:输出kv数据对中key的数据类型
 * VALUEOUT:输出kv数据对中value的数据类型
 * 数据类型为Writable类型
 */
public static class MyMapper extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>{
    // Context为MapReduce上下文,在Map中通常用于将数据处理结果输出
    public void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
		// Map功能的实现
    } 
}

public static class MyReducer extends Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
	// 这里reduce方法的输入的Value值是可迭代Iterable类型,因为Reduce阶段会将Key值相同的数据放置在一起
    public void reduce(KEYIN key, Iterable<VALUEIN> values, Context context ) throws IOException, InterruptedException {
    	// Reduce功能的实现
    }
  }

除了MapReduce,为了提高Shuffle效率,减少Shuffle过程中传输的数据量,在Map端可以提前对数据进行聚合:将Key相同的数据进行处理合并,这个过程称为Combiner。Combiner需要在Job中进行指定,一般指定为Reducer的实现类。

Map和Reduce的功能编写完成之后,在main函数中创建MapReduce的Job实例,填写MapReduce作业运行所必要的配置信息,并指定Map和Reduce的实现类,用于作业的创建。

 public static void main(String[] args) throws Exception {
 	// 配置类
    Configuration conf = new Configuration();
    // 创建MapReduce Job实例
    Job job = Job.getInstance(conf, "Job Name");
    // 为MapReduce作业设置必要的配置
    // 设置main函数所在的入口类
    job.setJarByClass(WordCount.class);
    // 设置Map和Reduce实现类,并指定Combiner
    job.setMapperClass(MyMapper.class);
    job.setCombinerClass(MyReducer.class);
    job.setReducerClass(IntSumReducer.class);
    // 设置结果数据的输出类
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    // 设置结果数据的输入和输出路径
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    // 作业运行,并输出结束标志
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

除了基本的设置外,还可以指定Reduce的个数

job.setNumReduceTasks(int)

MapReduce提供的常见类,除Mapper、Reduer之外,还有Partitioner和Counter。其中Partitioner可以自定义Map中间结果输出时对Key的Partition分区,其目的是为了优化并减少计算量;如果不做自定义实现,HashPartitioner 是 MapReduce 使用的默认分区程序。

Counter (计数器)是 MapReduce 应用程序报告统计数据的一种工具。在 Mapper 和 Reducer 的具体实现中,可以利用 Counter 来报告统计信息。

WordCount

接下来,实现最经典的入门案例,词频统计。编写MapReduce程序,统计单词出现的次数。

数据样例:

首先准备数据,并上传到HDFS中:

// 在HDFS中创建作业输入目录
hadoop fs -mkdir -p /tmp/mr/data/wc_input
// 为目录赋权
hadoop fs -chmod 777 /tmp/mr/data/wc_input
// 在本地创建词频统计文件
echo -e "hello hadoop\nhello hdfs\nhello yarn\nhello mapreduce" > wordcount.txt
// 将wordcount.txt上传到作业输入目录
hadoop fs -put wordcount.txt /tmp/mr/data/wc_input

在linux本地创建WordCount.java文件,编辑MapReduce程序,完成词频统计功能:

注意:使用vim打开WordCount.java,进行复制时,可能会出现格式问题,最好使用vi。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  /*
 * 实现Mapper,文件的每一行数据会执行一次map运算逻辑
 * 因为输入是文件,会将处理数据的行数作为Key,这里应为LongWritable,设置为Object也可以;Value类型为Text:每一行的文件内容
 * Mapper处理逻辑是将文件中的每一行切分为单词后,将单词作为Key,而Value则设置为1,<Word,1>
 * 因此输出类型为Text,IntWritable
 */
  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{
	
	// 事先定义好Value的值,它是IntWritable,值为1
    private final static IntWritable one = new IntWritable(1);
    // 事先定义好Text对象word,用于存储提取出来的每个单词
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      // 将文件内容的每一行数据按照空格拆分为单词
      StringTokenizer itr = new StringTokenizer(value.toString());
      // 遍历单词,处理为<word,1>的Key-Value形式,并输出(这里会调用上下文输出到buffer缓冲区)
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  /*
 * 实现Reducer
 * 接收Mapper的输出,所以Key类型为Text,Value类型为IntWritable
 * Reducer的运算逻辑是Key相同的单词,对Value进行累加
 * 因此输出类型为Text,IntWritable,只不过IntWritable不再是1,而是最终累加结果
 */
  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    // 预先定义IntWritable对象result用于存储词频结果
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      // 遍历key相同单词的value值,进行累加
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      // 将结果输出
      context.write(key, result);
    }
  }

  // 实现Main方法
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

接下来将代码编译为jar包:

export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
hadoop com.sun.tools.javac.Main WordCount.java
jar cf wc.jar WordCount*.class

当然也可以使用IDE进行编译打包。

打包完成之后,便可以提交作业了,在main函数中,定义了两个参数:输入路径和输出路径,所以调用作业时需要指定参数。

hadoop jar wc.jar WordCount /tmp/mr/data/wc_input /tmp/mr/data/wc_output

运行结束后,查看运行结果是否正确:

hadoop fs -cat /tmp/mr/data/wc_output/part-r-*

结束语

如果有帮助的,记得点赞、关注。在公众号《数舟》中,可以免费获取专栏《数据仓库》配套的视频课程、大数据集群自动安装脚本,并获取进群交流的途径。

我所有的大数据技术内容也会优先发布到公众号中。如果对某些大数据技术有兴趣,但没有充足的时间,在群里提出,我为大家安排分享。

公众号自取:

公众号

标签:IntWritable,Text,编程,hadoop,MapReduce,文学,job,class
来源: https://blog.csdn.net/qq_33876553/article/details/112186934

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有