ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark将数据写入ES(ElasticSearch)终极总结

2019-02-01 10:48:42  阅读:6504  来源: 互联网

标签:String val 写入 elasticsearch es ElasticSearch spark ES


简介

spark接入ES可以使用多种方式,常见类型如下。

本文主要介绍将case class 类对象写入ElasticSearch:也就是获取数据然后使用case class封装数据,然后在case class中选取一个字段当做 id,但是这个字段一定数据不能重复 要唯一。不指定ES自己也会生成id。

准备工作

第一步:

使用Apache Spark将数据写入ElasticSearch中。本文使用的是类库是elasticsearch-hadoop,其从2.1版本开始提供了内置支持Apache Spark的功能,使用elasticsearch-hadoop之前,我们需要引入依赖:本文使用的版本是:6.3.2

<dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.3.2</version>
</dependency> 
<dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            <version>6.3.2</version>
</dependency>
<dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.3.2</version>
</dependency>

第二步:

提前创建好索引和type:索引-->sql_command   type-->  sql_info

在spark代码中设置ES的相关参数:ES的nodes节点 ip和端口,以及自动创建索引参数

val session: SparkSession = SparkSession.builder().config(conf)
      .config("es.index.auto.create", "true")
      .config("es.nodes", "ip1:9200,ip2:9200,ip2:9200")
      .getOrCreate()

第三步:

调用写入ES的api: saveToEs(),并导入ES的相关类:import org.elasticsearch.spark._,这将使得所有的RDD拥有saveToEs方法。下面我将一一介绍将不同类型的数据写入ElasticSearch中。调用 saveToEs()时可以指定id也可以不指定,不指定ES会自动生成,看自己需求。自己指定时需要提供一个不唯一的字段,如果没有自己可以生成一个,但是一定不能重复 。

//数据写入es
    import org.elasticsearch.spark._
    rddSource.saveToEs("sql_command/sql_info", Map("es.mapping.id" -> "md5id"))

案例代码

object Data2Es {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    if (args.length != 5) {
      println(
        """          	  |cn.missfresh.SparkStreaming2Es
          				  |参数不合法,请传递正确的参数个数:
          				  |brokers
          				  |topic
          				  |groupId
          				  |seconds
                          |offtype
        				""".stripMargin)
      sys.exit()
    }
    val Array(brokers, topic, groupId, seconds, offtype) = args
    val conf = new SparkConf().setAppName("RTC_data2es_wangzh")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.streaming.kafka.consumer.poll.ms", "60000")
    conf.set("spark.streaming.kafka.maxRatePerPartition", "500")
    //设置es的相关参数
    val session: SparkSession = SparkSession.builder().config(conf)
      .config("es.index.auto.create", "true")
      .config("es.nodes", "ip1:9200,iP2:9200,ip3:9200")
      .getOrCreate()
    val sc = session.sparkContext
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(seconds.toLong))
    //设置kafka的相关参数
    val topics = Array(topic)
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> offtype, // latest
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    //创建Kafka数据流
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(topics, kafkaParams)
    )
")
    kafkaStream.foreachRDD(rdd=>{
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val fliterData = rdd.filter(_.value().split("\\{").length==2)
      val rddSource= fliterData.map(x => {
        val split = x.value().split("\\{")
        //获得ip
        val ip = IpUtil.getIp(x.value())
        val jsonStr = "{" + split(1)
       //解析json字符串,使用case class封装数据
        val behavior= JsonParse.jsonParse(jsonStr, ip)
      })
      //数据写入es
    import org.elasticsearch.spark._
    rddSource.saveToEs("sql_command/sql_info", Map("es.mapping.id" -> "md5id"))
    // 提交偏移量
      kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    println("=============》  任务结束 ================》")
  })
    ssc.start()
    ssc.awaitTermination()
  }
}
/**
  *创建对应字段的case  class
  */
case class UserBehavior ( md5id          :String,
                          application_id :String,
                         session_id     :String,
                         user_ip_address:String,
                         logger_type    :String,
                         logger_location:String,
                         command        :String,
                         command_clean  :String,
                         query_string   :String,
                         current_time   :String,
                         blg_user_name  :String,
                         user_name      :String,
                         ret            :String,
                         mode_type      :String,
                         processor_name :String,
                         last_command :String,
                         mryxblg_authorization_nabled :String,
                         mryxblg_command_monitoring:String)

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

 

标签:String,val,写入,elasticsearch,es,ElasticSearch,spark,ES
来源: https://blog.csdn.net/aA518189/article/details/86140210

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有