ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

日活需求

2022-05-05 09:01:12  阅读:150  来源: 互联网

标签:需求 String val redis kafka 日活 import org


1、需求分析&实现思路

1.1、用户首次登录趋势图

从项目的日志中获取用户的启动日志,如果是当日第一次启动,纳入统计。将统计结果保存到ES中,利用Kibana进行分析展示

1.2、实现思路

第一步:SparkStreaming 消费Kafka数据:Kafka作为数据来源,从kafka中获取日志,kafka中的日志类型有两种,启动和事件,我们这里统计日活,只获取启动日志即可;

第二步:使用redis 对以及完成首次登录的数据进行剔重:每个用户每天可能启动多次。要想计算日活,我们只需要把当前用户每天的第一次启动日志获取即可,所以要对启动日志进行去重,相当于做了一次清洗。

第三步:对剔重Jon过后的明细数据保存到ES中

第四步、利用 Kibana 进行展示

2、功能实现

2.1、创建maven 工程,导入相关依赖

 <properties>
        <spark.version>3.0.0</spark.version>
        <scala.version>2.12.11</scala.version>
        <kafka.version>2.4.1</kafka.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>

        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
            <version>5.3.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.11.0</version>
        </dependency>

        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna</artifactId>
            <version>4.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>commons-compiler</artifactId>
            <version>3.0.16</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.6.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
View Code

相关工具类参见之前的文章

2.2、SparkStreaming 消费kafka 数据

  • 模拟日志程序运行生成启动和事件日志
  • 请求交给Nginx进行处理
  • Nginx反向代理三台处理日志的服务器
  • 日志处理服务将日志写到Kafka的主题中
  • 编写基本业务类,使用SparkStreming从Kafka主题中消费数据
  • 目前只做打印输出

代码实现

import java.lang
import java.text.SimpleDateFormat
import java.util.Date

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.wdh01.gmall.realtime.bean.DauInfo
import org.wdh01.gmall.realtime.util.{MyESutil, MyKafkaUtil, MyRedisUtil, OffsetManagerUtil}
import redis.clients.jedis.Jedis

import scala.collection.mutable.ListBuffer

/**
 * 日活
 */
object DauAPP {
  def main(args: Array[String]): Unit = {
    //使用 SparkStreaming 消费 kafka 数据
    val conf: SparkConf = new SparkConf().setAppName("DauAPP").setMaster("local[4]")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))
    var topic = "gmall_start_0423"
    var groupId = "gmall_dau_0423"
    //从 redis 获取跑偏移量
    val offsetMap: Map[TopicPartition, Long] = OffsetManagerUtil.getOffset(topic, groupId)
    var recordDStream: InputDStream[ConsumerRecord[String, String]] = null
    if (offsetMap != null && offsetMap.size > 0) {
      // //offsetMap!=null && offsetMap.size>0 表示非首次消费
      //redis 存在当前消费者组的偏移量信息,那么从指定偏移量位置开始消费
      recordDStream = MyKafkaUtil.getKafkaStream(topic, ssc, offsetMap, groupId)
    } else {
      // 如果redsi 没有存放偏移量信息,从开始最新位置开始消费
      recordDStream = MyKafkaUtil.getKafkaStream(topic, ssc, groupId)
    }
    var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
    //获取采集周期消费 kafka 的起始偏移量和结束偏移量
    val offsetDStream: DStream[ConsumerRecord[String, String]] = recordDStream.transform {
      rdd => {
        // recordDStream 底层封装的是 KafkaRDD,混入了 HasOffsetRange 特质,
        // 其底层提供了 可以获取偏移量范围的方法
        offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd
      }
    }

    // jsonStr.print()
    //对读取的数据进行处理
    val jsonDStream: DStream[JSONObject] = offsetDStream.map {
      record => {
        val str: String = record.value()
        //转换为对象
        val jsonObject: JSONObject = JSON.parseObject(str)
        //获取时间戳
        val ts: lang.Long = jsonObject.getLong("ts")
        //将 ts 转换为日期 小时
        val dateStr: String = new SimpleDateFormat("yyyy-MM-dd HH").format(new Date(ts))
        //切分日期 和 小时
        val date: Array[String] = dateStr.split(" ")
        val dt: String = date(0)
        val hr: String = date(1)
        //向原有 json 增加两个字段,即 日期 & 小时
        jsonObject.put("dt", dt)
        jsonObject.put("hr", hr)
        //返回新的 json
        jsonObject
      }
    }

测试:启动 zk,kafka,Nginx,log 处理jar 程序,运行idea,在启动 模拟日志即可。

2.3、使用Redis进行剔重

  • 利用Redis保存今天访问过系统的用户清单 即SparkStreaming从Kafka中读取到用户的启动日志之后,将用户的启动日志保存到Redis中,进行去重
  • 根据保存反馈得到用户是否已存在 Redis的五大数据类型中,String和Set都可以完成去重功能,但是String管理不适合整体操作,比如设置失效时间或者获取当天用户等操作,所以我们项目中使用的是Set类型,处理批量管理以外,还可以根据saddAPI的返回结果判断用户是否已经存在

Key

Value

dau:2019-01-22

设备id

  /**
     * 对采集到的数据进行剔重 方式 1 :此处使用 redis Set进行天然剔重
     * Redis 使用 set key:dau:2022-04-23 value:mid expire:3600*24
     * 方式1 缺点 :采集周期每一条数据都要获取 jedis 连接,浪费资源
     */
    /* val filterDStream: DStream[JSONObject] = jsonDStream.filter {
       jsonObj => {
         //获取登录日期
         val dt: String = jsonObj.getString("dt")
         //获取设备ID
         val mid: String = jsonObj.getJSONObject("common").getString("mid")
         //拼接 key
         val dauKey: String = "dau:" + dt
         //链接 redis 获取jedis
         val jedis: Jedis = MyRedisUtil.getJedisClient()
         //判断是否已经存在 :isFirst 表示是否添加成功:成功 1 表示第一次登录 失败 0 非第一次登录
         val isFirst: lang.Long = jedis.sadd(dauKey, mid)
         if (jedis.ttl(dauKey) < 0) { //jedis.ttl(dauKey)<0 key 永久有效
           //设置有效时间 一天
           jedis.expire(dauKey, 3600 * 24)
         }
         //关闭链接
         jedis.close()
         if (isFirst == 1L) {
           true
         } else {
           false
         }
       }
     }*/
    /**
     * 对采集到的数据进行剔重 方式 2 :此处使用 redis Set进行天然剔重
     * * Redis 使用 set key:dau:2022-04-23 value:mid expire:3600*24
     * * 方式2 使用 mapPartition ,每一个分区获取一个 jedis 连接
     */
    val filterDStream1: DStream[JSONObject] = jsonDStream.mapPartitions {
      jsonObjItr => {
        //以分区为单位处理数据
        val jedis: Jedis = MyRedisUtil.getJedisClient()
        //声明一个集合;存放当前分区首次登录的数据
        val listBuffer: ListBuffer[JSONObject] = new ListBuffer[JSONObject]
        //对分区数据进行遍历
        for (jsonObj <- jsonObjItr) {
          //获取 jsonObj  相关属性
          //获取日期
          val dt: String = jsonObj.getString("dt")
          //获取设备ID
          val mid: String = jsonObj.getJSONObject("common").getString("mid")
          //拼接key
          val dauKey: String = "dau:" + dt
          //判断是否首次登录 :isFirst 表示是否添加成功:成功 1 表示第一次登录 失败 0 非第一次登录
          val isFirst: lang.Long = jedis.sadd(dauKey, mid)
          if (jedis.ttl(dauKey) < 0) { //jedis.ttl(dauKey)<0 key 永久有效
            //设置有效时间 一天
            jedis.expire(dauKey, 3600 * 24)
          }
          if (isFirst == 1L) {
            listBuffer.append(jsonObj)
          }
        }
        //关闭链接
        jedis.close()
        listBuffer.toIterator
      }
    }

测试时需要在启动 redis 即可,此处建议使用方案2实现,必将频频繁创建redis 连接,过多消耗资源。

2.4、批量保存 ES

将去重后的结果保存的ElasticSearch中,以便后续业务操作

首先创建 ES 模板 

PUT   _template/gmall0423_dau_info_template
{
  "index_patterns": ["gmall0423_dau_info*"],                  
  "settings": {                                               
    "number_of_shards": 3
  },
  "aliases" : { 
    "{index}-query": {},
    "gmall0423_dau_info-query":{}
  },
   "mappings": {
     "_doc":{  
       "properties":{
         "mid":{
           "type":"keyword"
         },
         "uid":{
           "type":"keyword"
         },
         "ar":{
           "type":"keyword"
         },
         "ch":{
           "type":"keyword"
         },
         "vc":{
           "type":"keyword"
         },
          "dt":{
           "type":"keyword"
         },
          "hr":{
           "type":"keyword"
         },
          "mi":{
           "type":"keyword"
         },
         "ts":{
           "type":"date"
         }  
       }
     }
   }
}

封装实体类

package org.wdh01.gmall.realtime.bean

/**
 * 样例类
 * @param mid
 * @param uid
 * @param ar
 * @param ch
 * @param vc
 * @param dt
 * @param hr
 * @param mi
 * @param ts
 */
case class DauInfo(
                    mid: String, //设备id
                    uid: String, //用户id
                    ar: String, //地区
                    ch: String, //渠道
                    vc: String, //版本
                    var dt: String, //日期
                    var hr: String, //小时
                    var mi: String, //分钟
                    ts: Long //时间戳
                  ) {}

保存ES

    //将数据批量保存 ES
    filterDStream1.foreachRDD {
      rdd => {
        //以分区为单位对数据进行处理
        rdd.foreachPartition {
          jsonObjItr => {
            val dauInfolist: List[(String, DauInfo)] = jsonObjItr.map {
              jsonObj => {
                //每次处理的是一个json对象   将json对象封装为样例类
                val commonJsonObj: JSONObject = jsonObj.getJSONObject("common")
                val dauInfo: DauInfo = DauInfo(
                  commonJsonObj.getString("mid"),
                  commonJsonObj.getString("uid"),
                  commonJsonObj.getString("ar"),
                  commonJsonObj.getString("ch"),
                  commonJsonObj.getString("vc"),
                  jsonObj.getString("dt"),
                  jsonObj.getString("hr"),
                  "00", //分钟我们前面没有转换,默认00
                  jsonObj.getLong("ts")
                )
                (dauInfo.mid, dauInfo)
              }
            }.toList

            //批量保存ES
            val dt: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
            MyESutil.bulkInsert(dauInfolist, "gmall0423_dau_info_" + dt)
          }
        }
        //提交偏移量到 Redis
        OffsetManagerUtil.saveOffset(topic, groupId, offsetRanges)
      }
    }
    filterDStream1.count().print()
    ssc.start()
    ssc.awaitTermination()

  }
}

/**
 * 查看所有topic
 * bin/kafka-topic.sh --bootstrap-server hadoop201:9092 --list
 * 消费数据测试
 * bin/kafka-console-consumer.sh --bootstrap-server hadoop201:9092 --topic gmall_start_0423
 * jsonObject 样例数据
 * {"dt":"2022-04-23","common":{"ar":"230000","uid":"149","os":"Android 10.0","ch":"xiaomi","md":"Xiaomi 9","mid":"mid_50","vc":"v2.1.134","ba":"Xiaomi"},"start":{"entry":"notice","open_ad_skip_ms":0,"open_ad_ms":5519,"loading_time":1737,"open_ad_id":20},"hr":"03","ts":1650656885000}
 * redis
 * /usr/local/bin/redis-server /etc/redis.conf
 * /usr/local/bin/redis-cli
 * 127.0.0.1:6379> keys *
 * 1) "dau:2022-04-23"
 * 127.0.0.1:6379> Smembers dau:2022-04-23
 * redis 格式化所有数据
 * flushall
 * ES 查看所有模板
 * #查看所有idnex
 * GET /_cat/indices
 * #查看指定 idnex 内容
 * GET /gmall0423_dau_info_2022-05-01/_search
 *
 */

启动 es & kiban 进行测试即可。

标签:需求,String,val,redis,kafka,日活,import,org
来源: https://www.cnblogs.com/wdh01/p/16217871.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有