标签:实战 Combiner Hadoop hadoop job org apache import class
目录为何使用combiner
- 减少洗牌的键值对数量
- 缓解数据倾斜问题
combiner的设计
combiner在数据转换上必须与reducer等价
- 若Reducer仅处理分配型函数(最大值/最小值/求和/计数),可以使用reducer为combiner
- 其他:自己设计combiner和reducer
求均值Combiner的例子
在输出中增加了一列count,将求均值任务转换为value和count的求和任务,使得reducer具有分配特性,因而可直接用于combiner(输出略微调整)。
- Mapper输出:(key:【value count】)
- Combiner输出:(key:【value count】)
- Reducer输出:(key:【sum(value) / sum(count)】)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
// 求均值Combiner的例子
public class AverageByAttributeWithCombiner extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",", -20);
String country = fields[4];
String numClaims = fields[8];
if (numClaims.length() > 0 && !numClaims.startsWith("\"")) {
context.write(new Text(country), new Text(numClaims + ",1"));
}
}
}
public static class ReduceClass extends Reducer<Text, Text, Text, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sum = 0;
int count = 0;
for (Text val: values) {
String fields[] = val.toString().split(",");
sum += Double.parseDouble(fields[0]);
count += Integer.parseInt(fields[1]);
}
context.write(key, new DoubleWritable(sum / count));
}
}
public static class Combiner extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sum = 0;
int count = 0;
for (Text val: values) {
String fields[] = val.toString().split(",");
sum += Double.parseDouble(fields[0]);
count += Integer.parseInt(fields[1]);
}
context.write(key, new Text(sum + "," + count));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "AverageByAttributeWithCombiner");
job.setJarByClass(AverageByAttributeWithCombiner.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setCombinerClass(Combiner.class);
job.setReducerClass(ReduceClass.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new AverageByAttributeWithCombiner(), args);
System.exit(exitCode);
}
}
查看combine的效果
- Map output records:Map输出的记录数量
- Reduce input Records:Reduce输入记录的数量
标签:实战,Combiner,Hadoop,hadoop,job,org,apache,import,class 来源: https://www.cnblogs.com/vvlj/p/14101265.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。