ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Hadoop1.*版本 统计文件中字符串出现的数量 或收集 《未完待续》

2021-09-28 12:05:06  阅读:214  来源: 互联网

标签:org args hadoop 未完待续 apache conf Hadoop1 字符串 import


入门级项目,实践一下,分析并统计服务器运行日志中调用量最多的SQL语句,把它进行缓存

pom.xml 引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>test.hadoop</groupId>
	<artifactId>WordCount</artifactId>
	<version>0.0.1-acute</version>
	<packaging>jar</packaging>

	<name>WordCount</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>0.23.11</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>0.23.11</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>0.23.11</version>
		</dependency>
	</dependencies>
</project>

总涉及3个类,一个是程序启动类及两个执行不同统计的功能类

package test.hadoop.line;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

public class LineMatcherStarter {

	public static void main(String[] args) throws IOException {
        // 根据参数调用不同功能
		int key = -1;
		try {
			key = Integer.valueOf(args[0]);
		} catch (NumberFormatException e) {
			e.printStackTrace();
			key = 0;
			return;
		}
		switch (key) {
		case 1:
			countJob(key, args); // 计数任务
			break;
		case 2:
			collectJob(key, args); // 收集任务
			break;
		default:
			printUsage();
		}
	}

	private static void printUsage() {
		System.out.println("Usage: java [-options] -jar jarfile class [args...]");
		System.out.println("  class a.b.c.Starter");
		System.out.println("  args[0] 1=count 2=line");
		System.out.println("  args[1] source");
		System.out.println("  args[2] destination");
	}

	private static void collectJob(int key, String[] args) throws IOException {
		if (args.length < 4) {
			printUsage();
			System.out.println("  args[3] expression");
			System.out.println("  args[4] rule=[starts|contains|ends]");
			System.out.println("  args[5] max line  default=9999");
			return;
		}
        // Hadoop任务的初始化操作,版本不同写法不同
		JobConf conf = new JobConf(LineMatcherStarter.class);
		conf.setJobName("LineCollect");
		conf.setMapperClass(TextWithLineMapperReducer.class);
		conf.setCombinerClass(TextWithLineReducer.class);
		conf.setReducerClass(TextWithLineReducer.class);
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(LineArrayWritable.class);
		conf.setOutputFormat(TextWithLineOutputFormat.class);
        // 指定文件输入路径 和 输出路径
		FileInputFormat.setInputPaths(conf, args[1]);
		FileOutputFormat.setOutputPath(conf, new Path(args[2]));	
        // 自定义属性,用于搜索的 字符串 和 匹配规则(开头,包含,结尾)
		conf.set("TEXTWITHLINE.search", args[3]);
		conf.set("TEXTWITHLINE.rule", args[4]);	
		if (args.length == 6) {
            // 每个任务在分布式机器上的最大统计行数
            // 根据内存估算,不然有可能会引发OOM异常,别问我是怎么知道的
			conf.set("TEXTWITHLINE.maxLine", args[5]);	
		}
        // 执行任务
		JobClient.runJob(conf);
	}

	private static void countJob(int key, String[] args) throws IOException {
		if (args.length < 4) {
			printUsage();
			System.out.println("  args[3] expression");
			System.out.println("  args[4] rule=[starts|contains|ends]");
			return;
		}
		JobConf conf = new JobConf(LineMatcherStarter.class);
		conf.setJobName("LineCount");
		conf.setMapperClass(LineCountMapperReducer.class);
		conf.setCombinerClass(LineCountReducer.class);
		conf.setReducerClass(LineCountReducer.class);
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);
		FileInputFormat.setInputPaths(conf, args[1]);
		FileOutputFormat.setOutputPath(conf, new Path(args[2]));	
		conf.set("TEXTWITHLINE.search", args[3]);
		conf.set("TEXTWITHLINE.rule", args[4]);	
		JobClient.runJob(conf);
	}

}
package test.hadoop.line;

import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

public class TextWithLineMapperReducer extends MapReduceBase implements Mapper<LongWritable,Text,Text,LineArrayWritable>{
	private Text keyText;
	private String search;
	public TextWithLineMapperReducer() throws FileNotFoundException, IOException {
	}
	public void configure(JobConf job) {
		search = job.get("TEXTWITHLINE.search");
	}
	public void map(LongWritable k,Text v,OutputCollector<Text,LineArrayWritable> o,Reporter r)throws IOException{
		if (search == null || keyText == null) {
			keyText = new Text(search);
			if (search.contentEquals("") || keyText == null) {
				throw new RuntimeException("Search is empty!");
			}
		}
		String line = v.toString();
		if (line.indexOf(search) >= 0) {
			o.collect(keyText, new LineArrayWritable(new Text[]{v}));
		}
	}
}

class TextWithLineReducer extends MapReduceBase implements Reducer<Text,LineArrayWritable,Text,LineArrayWritable>{
	private int max = Integer.MAX_VALUE;
	public void configure(JobConf job) {
		max = Integer.valueOf(job.get("TEXTWITHLINE.maxLine", "9999"));
	}
	public void reduce(Text k,Iterator<LineArrayWritable> v,OutputCollector<Text,LineArrayWritable> o,Reporter r)throws IOException{
		List<Text> list = new ArrayList<>();
		int i = 0;
		while (v.hasNext()) {
			String[] ss = v.next().toStrings();
			for (String s : ss) {
				if (i++ < max)
					list.add(new Text(s));
			}
		}
		o.collect(k, new LineArrayWritable(list.toArray(new Text[0])));
	}
}

class TextWithLineWriter implements RecordWriter<Text, LineArrayWritable> {
	private static final byte[] newline = getBytes("\r\n");
    static {
      
    }
    private static byte[] getBytes(String s) {
    	try {
            return s.getBytes("UTF-8");
    	} catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + "UTF-8" + " encoding");
        }
    }
    protected DataOutputStream out;
    public TextWithLineWriter(DataOutputStream s) {
    	out = s;
    }
	public synchronized void write(Text key, LineArrayWritable value) throws IOException {
		out.write(getBytes("----->" + key.toString()));
        out.write(newline);
        writeArray(value);
        out.write(newline);
	}
    private void writeArray(LineArrayWritable aw) throws IOException {
    	int i = 0;
    	for (String s : aw.toStrings()) {
    		out.write(getBytes("-->" + (i++) + "->" + s));
            out.write(newline);
    	}
    }
	public void close(Reporter reporter) throws IOException {
		out.close();
	}
}

class TextWithLineOutputFormat extends TextOutputFormat<Text,LineArrayWritable>{
	public RecordWriter<Text,LineArrayWritable> getRecordWriter(FileSystem ignored,JobConf job,String name,Progressable p)throws IOException{
		boolean isCompressed = getCompressOutput(job);
	    if (!isCompressed) {
	      Path file = FileOutputFormat.getTaskOutputPath(job, name);
	      FileSystem fs = file.getFileSystem(job);
	      FSDataOutputStream fileOut = fs.create(file, p);
	      return new TextWithLineWriter(fileOut);
	    } else {
	      Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
	      // create the named codec
	      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
	      // build the filename including the extension
	      Path file = FileOutputFormat.getTaskOutputPath(job,name+codec.getDefaultExtension());
	      FileSystem fs = file.getFileSystem(job);
	      FSDataOutputStream fileOut = fs.create(file, p);
	      return new TextWithLineWriter(fileOut);
	    }
	}
}

class LineArrayWritable extends ArrayWritable {
	public LineArrayWritable() {
		super(Text.class);
	}
	public LineArrayWritable(Text[] array) {
		super(Text.class);
		Text[] texts = new Text[array.length];
		for (int i = 0; i < array.length; ++i) {
			texts[i] = new Text(array[i]);
		}
		set(texts);
	}
}
package test.hadoop.line;

import java.io.IOException;
import java.util.Iterator;
import java.util.function.Predicate;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class LineCountMapperReducer extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{
	private static final IntWritable ONE = new IntWritable(1);
	private String search;
	private Text key;
	private Predicate<String> rule;
	private Predicate<String> starts = s -> s.startsWith(search);
	private Predicate<String> contains = s -> s.contains(search);
	private Predicate<String> ends = s -> s.endsWith(search);
	public void configure(JobConf job) {
		search = job.get("TEXTWITHLINE.search");
		key = new Text(search);
		switch (job.get("TEXTWITHLINE.rule")) {
			case "starts":
				rule = starts;
				break;
			case "ends":
				rule = ends;
				break;
			case "contains":
			default:
				rule = contains;
		}
	}
	public void map(LongWritable k,Text v,OutputCollector<Text,IntWritable> o,Reporter r)throws IOException{
		String line = v.toString();
		if (rule.test(line)) {
			o.collect(key, ONE);
		}
	}
}

class LineCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
	public void reduce(Text k,Iterator<IntWritable> v,OutputCollector<Text,IntWritable> o,Reporter r)throws IOException{
		int sum = 0;
		while (v.hasNext()) {
			sum += v.next().get();
		}
		o.collect(k, new IntWritable(sum));
	}
}

本人使用了虚拟机,安装,克隆,修改主机名和用户,参考准备工作的文章 Ubuntu 14.04 上实现 更改用户名 用户组 域名 主机名  和  Ubuntu 14.04 上实现 SSH 无密码访问

《未完待续》

标签:org,args,hadoop,未完待续,apache,conf,Hadoop1,字符串,import
来源: https://blog.csdn.net/u011225581/article/details/120526199

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有