ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

04_第四章 Hadoop数据压缩

2022-02-07 17:31:53  阅读:142  来源: 互联网

标签:IntWritable 04 Text Hadoop hadoop job apache org 数据压缩


1. 01 Map输出设置压缩 案例

package ComMapOutPk {

  import java.lang

  import org.apache.hadoop.conf.Configuration
  import org.apache.hadoop.fs.Path
  import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, SnappyCodec}
  import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
  import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
  import org.apache.hadoop.io.compress.CompressionCodec


  // Mapper 类
  // 每个Mapper类实例 处理一个切片文件
  class WCMapper extends Mapper[LongWritable, Text, Text, IntWritable] {
    var text = new Text
    var intWritable = new IntWritable(1)

    // 每行记录调用一次map方法
    override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) = {
      println("map enter .....")
      //1. 获取一行记录
      val line = value.toString

      //2. 切割
      val words = line.split(" ")

      //3. 输出到缓冲区
      words.foreach(
        key1 => {
          text.set(key1);
          context.write(text, intWritable)
        }
      )

    }
  }

  // Reducer 类
  // 所有Mapper实例 执行完毕后 Reducer才会执行
  // Mapper类的输出类型 = Reducer类的输入类型
  class WCReducer extends Reducer[Text, IntWritable, Text, IntWritable] {

    private val intWritable = new IntWritable

    // 每个key调用一次
    // 张飞 <1,1,1,1,1>
    override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = {
      println("reduce enter .....")
      var sum: Int = 0

      // 1. 对词频数 求sum
      values.forEach(sum += _.get)

      // 2. 输出结果
      intWritable.set(sum)
      context.write(key, intWritable)

    }
  }

  // Driver
  object Driver {
    def main(args: Array[String]): Unit = {
      //1. 获取配置信息以及 获取job对象
      //读取配置文件  Configuration: core-default.xml, core-site.xml
      var configuration = new Configuration

//      configuration.set("mapreduce.map.output.compress","true")
//      configuration.set("mapreduce.map.output.compression.codec","org.apache.hadoop.io.compress.GzipCodec")

      //开启map端输出压缩
      configuration.set("mapreduce.map.output.compress","true")

      //指定map端输出压缩算法
      //configuration.setClass("mapreduce.map.output.compress.codec",classOf[BZip2Codec],classOf[CompressionCodec]);
      configuration.setClass("mapreduce.map.output.compress.codec",classOf[org.apache.hadoop.io.compress.BZip2Codec],classOf[CompressionCodec]);
      //INFO [org.apache.hadoop.io.compress.CodecPool] - Got brand-new compressor [.bz2]、[.gz]

      var job: Job = Job.getInstance(configuration)

      //2. 注册本Driver程序的jar
      job.setJarByClass(this.getClass)

      job.setJobName("compress mr")

      //3. 注册 Mapper 和 Reducer的jar
      job.setMapperClass(classOf[WCMapper])
      job.setReducerClass(classOf[WCReducer])

      //4. 设置Mapper 类输出key-value 数据类型
      job.setMapOutputKeyClass(classOf[Text])
      job.setMapOutputValueClass(classOf[IntWritable])

      //5. 设置最终输出key-value 数据类型
      job.setOutputKeyClass(classOf[Text])
      job.setOutputValueClass(classOf[IntWritable])

      //6. 设置输入输出路径
      FileInputFormat.setInputPaths(job, "src/main/data/input/1.txt")
      FileOutputFormat.setOutputPath(job, new Path("src/main/data/output"))


      //7. 提交job
      val bool: Boolean = job.waitForCompletion(true)
      System.exit(bool match {
        case true => "0".toInt
        case false => "1".toInt
      })

    }


  }


}

 

2. 02 Reduce输出设置压缩 案例

/**
  * @author gaocun
  * @create 2022-01-06 8:10 PM */
package ComReduceOutPk {

  import java.lang

  import org.apache.hadoop.conf.Configuration
  import org.apache.hadoop.fs.Path
  import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, SnappyCodec}
  import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
  import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
  import org.apache.hadoop.io.compress.CompressionCodec


  // Mapper 类
  // 每个Mapper类实例 处理一个切片文件
  class WCMapper extends Mapper[LongWritable, Text, Text, IntWritable] {
    var text = new Text
    var intWritable = new IntWritable(1)

    // 每行记录调用一次map方法
    override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) = {
      println("map enter .....")
      //1. 获取一行记录
      val line = value.toString

      //2. 切割
      val words = line.split(" ")

      //3. 输出到缓冲区
      words.foreach(
        key1 => {
          text.set(key1);
          context.write(text, intWritable)
        }
      )

    }
  }

  // Reducer 类
  // 所有Mapper实例 执行完毕后 Reducer才会执行
  // Mapper类的输出类型 = Reducer类的输入类型
  class WCReducer extends Reducer[Text, IntWritable, Text, IntWritable] {

    private val intWritable = new IntWritable

    // 每个key调用一次
    // 张飞 <1,1,1,1,1>
    override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = {
      println("reduce enter .....")
      var sum: Int = 0

      // 1. 对词频数 求sum
      values.forEach(sum += _.get)

      // 2. 输出结果
      intWritable.set(sum)
      context.write(key, intWritable)

    }
  }

  // Driver
  object Driver {
    def main(args: Array[String]): Unit = {
      //1. 获取配置信息以及 获取job对象
      //读取配置文件  Configuration: core-default.xml, core-site.xml
      var configuration = new Configuration

      var job: Job = Job.getInstance(configuration)

      //2. 注册本Driver程序的jar
      job.setJarByClass(this.getClass)

      job.setJobName("compress mr")

      //3. 注册 Mapper 和 Reducer的jar
      job.setMapperClass(classOf[WCMapper])
      job.setReducerClass(classOf[WCReducer])

      //4. 设置Mapper 类输出key-value 数据类型
      job.setMapOutputKeyClass(classOf[Text])
      job.setMapOutputValueClass(classOf[IntWritable])

      //5. 设置最终输出key-value 数据类型
      job.setOutputKeyClass(classOf[Text])
      job.setOutputValueClass(classOf[IntWritable])

      //6. 设置输入输出路径
      FileInputFormat.setInputPaths(job, "src/main/data/input/1.txt")
      FileOutputFormat.setOutputPath(job, new Path("src/main/data/output"))

      //7. reduce 输出设置压缩

      //开启reduce端输出压缩
      FileOutputFormat.setCompressOutput(job, true)

      //指定reduce端输出压缩算法
      //FileOutputFormat.setOutputCompressorClass(job,classOf[BZip2Codec])
      //FileOutputFormat.setOutputCompressorClass(job,classOf[GzipCodec])


      //7. 提交job
      val bool: Boolean = job.waitForCompletion(true)
      System.exit(bool match {
        case true => "0".toInt
        case false => "1".toInt
      })

    }


  }


}

 

标签:IntWritable,04,Text,Hadoop,hadoop,job,apache,org,数据压缩
来源: https://www.cnblogs.com/bajiaotai/p/15868590.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有