ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

31.电视采集项目流程spark篇通过sparksql处理业务逻辑

2020-02-03 17:00:16  阅读:208  来源: 互联网

标签:String 31 sql sparksql org apache import spark public


新建包

 

 

package com.it19gong.clickproject;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class AccessLogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    Text text = new Text();
    @Override
protected void map(LongWritable key, Text value,Context context)
        throws IOException, InterruptedException {
       String itr[] = value.toString().split(" ");
       if (itr.length < 11)
        {
            return;
        }
        String ip = itr[0];
        String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);
        String url = itr[6];
        String upFlow = itr[9];
        
        text.set(ip+","+date+","+url+","+upFlow);
        context.write(text, NullWritable.get());
       
}
}

 

 

package com.it19gong.clickproject;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AnalysisNginxTool
{
    private static Logger logger = LoggerFactory.getLogger(AnalysisNginxTool.class);

    public static String nginxDateStmpToDate(String date)
    {
        String res = "";
        try
        {
            SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");
            String datetmp = date.split(" ")[0].toUpperCase();
            String mtmp = datetmp.split("/")[1];
            DateToNUM.initMap();
            datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));
            System.out.println(datetmp);
            Date d = df.parse(datetmp);
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");
            res = sdf.format(d);
        }
        catch (ParseException e)
        {
            logger.error("error:" + date, e);
        }
        return res;
    }

    public static long nginxDateStmpToDateTime(String date)
    {
        long l = 0;
        try
        {
            SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");
            String datetmp = date.split(" ")[0].toUpperCase();
            String mtmp = datetmp.split("/")[1];
            datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));

            Date d = df.parse(datetmp);
            l = d.getTime();
        }
        catch (ParseException e)
        {
            logger.error("error:" + date, e);
        }
        return l;
    }
}

 

 

package com.it19gong.clickproject;

import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;

/**
 * Unit test for simple App.
 */
public class AppTest 
    extends TestCase
{
    /**
     * Create the test case
     *
     * @param testName name of the test case
     */
    public AppTest( String testName )
    {
        super( testName );
    }

    /**
     * @return the suite of tests being tested
     */
    public static Test suite()
    {
        return new TestSuite( AppTest.class );
    }

    /**
     * Rigourous Test :-)
     */
    public void testApp()
    {
        assertTrue( true );
    }
}

 

 

package com.it19gong.clickproject;

import java.util.HashMap;

public class DateToNUM
{
    public static HashMap map = new HashMap();

    public static void initMap()
    {
        map.put("JAN", "01");
        map.put("FEB", "02");
        map.put("MAR", "03");
        map.put("APR", "04");
        map.put("MAY", "05");
        map.put("JUN", "06");
        map.put("JUL", "07");
        map.put("AUG", "08");
        map.put("SEPT", "09");
        map.put("OCT", "10");
        map.put("NOV", "11");
        map.put("DEC", "12");
    }
}

 

 

 

新建AccessLogDriver类

package com.it19gong.clickproject;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class AccessLogDriver {
    
    public static void main(String[] args) throws Exception {
        
        // 创建SparkConf、JavaSparkContext、SQLContext
                SparkConf conf = new SparkConf()
                        .setMaster("local")  
                        .setAppName("RDD2DataFrameProgrammatically");  
                JavaSparkContext sc = new JavaSparkContext(conf);
                SQLContext sqlContext = new SQLContext(sc);
            
                // 第一步,创建一个普通的RDD,但是,必须将其转换为RDD<Row>的这种格式
                JavaRDD<String> lines = sc.textFile("E:\\Mycode\\dianshixiangmu\\sparkproject\\data\\access.log");
                
                // 分析一下
                // 它报了一个,不能直接从String转换为Integer的一个类型转换的错误
                // 就说明什么,说明有个数据,给定义成了String类型,结果使用的时候,要用Integer类型来使用
                // 而且,错误报在sql相关的代码中
                // 所以,基本可以断定,就是说,在sql中,用到age<=18的语法,所以就强行就将age转换为Integer来使用
                // 但是,肯定是之前有些步骤,将age定义为了String
                // 所以就往前找,就找到了这里
                // 往Row中塞数据的时候,要注意,什么格式的数据,就用什么格式转换一下,再塞进去
                JavaRDD<Row> clickRDD = lines.map(new Function<String, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Row call(String line) throws Exception {
                        String itr[] = line.split(" ");
                           
                            String ip = itr[0];
                            String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);
                            String url = itr[6];
                            String upFlow = itr[9];
                        
                        return RowFactory.create(
                                ip,
                                date,
                                url,
                                Integer.valueOf(upFlow)
                                );      
                    }
                    
                });
                
                // 第二步,动态构造元数据
                // 比如说,id、name等,field的名称和类型,可能都是在程序运行过程中,动态从mysql db里
                // 或者是配置文件中,加载出来的,是不固定的
                // 所以特别适合用这种编程的方式,来构造元数据
                List<StructField> structFields = new ArrayList<StructField>();
                structFields.add(DataTypes.createStructField("ip", DataTypes.StringType, true));  
                structFields.add(DataTypes.createStructField("date", DataTypes.StringType, true));  
                structFields.add(DataTypes.createStructField("url", DataTypes.StringType, true)); 
                structFields.add(DataTypes.createStructField("upflow", DataTypes.IntegerType, true));  
                StructType structType = DataTypes.createStructType(structFields);
                
                // 第三步,使用动态构造的元数据,将RDD转换为DataFrame
                DataFrame studentDF = sqlContext.createDataFrame(clickRDD, structType);
            
                // 后面,就可以使用DataFrame了
                studentDF.registerTempTable("log");  
                
                DataFrame sumFlowDF = sqlContext.sql("select ip,sum(upflow) as sum from log group by ip order by sum desc"); 
                
                List<Row> rows = sumFlowDF.javaRDD().collect();
                for(Row row : rows) {
                    System.out.println(row);  
                }
        
    }

}

 

 

运行程序

 

 

 

 

新建DBHelper类

 

 

package com.it19gong.clickproject;


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class DBHelper {

    public static final String url ="jdbc:mysql://192.168.86.131:3306/userdb";
    public static final String name="com.mysql.jdbc.Driver";
    public static final String user="sqoop";
    public static final String password="sqoop";
    
    //获取数据库连接
    public Connection conn=null;
    
    public DBHelper(){
        try {
            Class.forName(name);
            conn = DriverManager.getConnection(url, user, password);
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }
    }    
    

    public void close(){
        try {
            this.conn.close();
        } catch (SQLException e) {
            // TODO: handle exception
            e.printStackTrace();
        }
    }
    
}

 

 

 

修改AccessLogDriver类

package com.it19gong.clickproject;



import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;



import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;



public class AccessLogDriver {
    static DBHelper db1=null;
    public static void main(String[] args) throws Exception {
        
        // 创建SparkConf、JavaSparkContext、SQLContext
                SparkConf conf = new SparkConf()
                        .setMaster("local")  
                        .setAppName("RDD2DataFrameProgrammatically");  
                JavaSparkContext sc = new JavaSparkContext(conf);
                SQLContext sqlContext = new SQLContext(sc);
            
                // 第一步,创建一个普通的RDD,但是,必须将其转换为RDD<Row>的这种格式
                JavaRDD<String> lines = sc.textFile("E:\\Mycode\\dianshixiangmu\\sparkproject\\data\\access.log");
                
                // 分析一下
                // 它报了一个,不能直接从String转换为Integer的一个类型转换的错误
                // 就说明什么,说明有个数据,给定义成了String类型,结果使用的时候,要用Integer类型来使用
                // 而且,错误报在sql相关的代码中
                // 所以,基本可以断定,就是说,在sql中,用到age<=18的语法,所以就强行就将age转换为Integer来使用
                // 但是,肯定是之前有些步骤,将age定义为了String
                // 所以就往前找,就找到了这里
                // 往Row中塞数据的时候,要注意,什么格式的数据,就用什么格式转换一下,再塞进去
                JavaRDD<Row> clickRDD = lines.map(new Function<String, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Row call(String line) throws Exception {
                        String itr[] = line.split(" ");
                           
                            String ip = itr[0];
                            String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);
                            String url = itr[6];
                            String upFlow = itr[9];
                        
                        return RowFactory.create(
                                ip,
                                date,
                                url,
                                Integer.valueOf(upFlow)
                                );      
                    }
                    
                });
                
                // 第二步,动态构造元数据
                // 比如说,id、name等,field的名称和类型,可能都是在程序运行过程中,动态从mysql db里
                // 或者是配置文件中,加载出来的,是不固定的
                // 所以特别适合用这种编程的方式,来构造元数据
                List<StructField> structFields = new ArrayList<StructField>();
                structFields.add(DataTypes.createStructField("ip", DataTypes.StringType, true));  
                structFields.add(DataTypes.createStructField("date", DataTypes.StringType, true));  
                structFields.add(DataTypes.createStructField("url", DataTypes.StringType, true)); 
                structFields.add(DataTypes.createStructField("upflow", DataTypes.IntegerType, true));  
                StructType structType = DataTypes.createStructType(structFields);
                
                // 第三步,使用动态构造的元数据,将RDD转换为DataFrame
                DataFrame studentDF = sqlContext.createDataFrame(clickRDD, structType);
            
                // 后面,就可以使用DataFrame了
                studentDF.registerTempTable("log");  
                
                DataFrame sumFlowDF = sqlContext.sql("select ip,sum(upflow) as sum from log group by ip order by sum desc"); 
                
                db1=new DBHelper();
                final String sql="insert into upflow(ip,sum) values(?,?) ";
                sumFlowDF.javaRDD().foreach(new VoidFunction<Row>() {
                    
                    @Override
                    public void call(Row t) throws Exception {
                        // TODO Auto-generated method stub
                        PreparedStatement pt = db1.conn.prepareStatement(sql);
                        pt.setString(1,t.getString(0));
                        pt.setString(2,String.valueOf(t.getLong(1)));
                        pt.executeUpdate();
                    }
                });;
                
        
    }

}

 

 

运行

 

 

 

可以看到mysql数据库里面对了两条数据

 

标签:String,31,sql,sparksql,org,apache,import,spark,public
来源: https://www.cnblogs.com/braveym/p/12256338.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有