ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

5、Storm集成Kafka

2019-04-26 15:50:07  阅读:174  来源: 互联网

标签:集成 Storm conf kafka storm new apache org Kafka


1、pom文件依赖

<!--storm相关jar  -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!--排除相关依赖  -->
            <exclusions>
                <!--<exclusion>-->
                    <!--<groupId>io.dropwizard.metrics</groupId>-->
                    <!--<artifactId>metrics-core</artifactId>-->
                <!--</exclusion>-->
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j-impl</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-1.2-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-web</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>ring-cors</artifactId>
                    <groupId>ring-cors</groupId>
                </exclusion>
            </exclusions>
            <!--<scope>provided</scope>--><!--注意本地调试和集群部署-->
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>1.2.2</version>
            <!--<scope>provided</scope>--><!--注意本地调试和集群部署-->
                        
        </dependency>

        <!--注:老版本使用的storm-kafka依赖已经被废弃,建议在以后使用storm-kafka-client依赖进行开发,老版本的storm-kafka依赖为:-->
        <!--    <dependency> -->
        <!--        <groupId>org.apache.storm</groupId> -->
        <!--        <artifactId>storm-kafka</artifactId> -->
        <!--        <version>1.2.2</version> -->
        <!--    </dependency> -->

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.0</version>
        </dependency>

2、Topology(拓扑中配置Spout,简单的配置可以不用单独的写KafkaSpout)

本打算用spring-kafka的监听器去完成kafka的消费,实现KafkaSpout感觉有点绕,后面再研究,有相关经验的请告知更好的方式

@Component
public class KafkaStormSpoutWordCountTopology {

    public static void main(String[] args) {

        KafkaSpoutConfig.Builder<String,String> builder =
                KafkaSpoutConfig.builder(
                        "192.168.8.101:9092,192.168.8.102:9092,192.168.8.103:9092",
                        "topic");

        builder.setGroupId("storm_group");

        KafkaSpoutConfig<String, String> kafkaSpoutConfig= builder.build();
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("WordCountKafkaSpout",new KafkaSpout<String,String>(kafkaSpoutConfig), 1);

        topologyBuilder.setBolt("ReadKafkaSpoutBolt",new ReadKafkaSpoutBolt()).shuffleGrouping("WordCountKafkaSpout");
        Config config = new Config();

        System.out.println("准备启动kafkaStromTopo");
        LocalCluster cluster= new LocalCluster();
        cluster.submitTopology("kafkaStromTopo", config, topologyBuilder.createTopology());



//        //启动topology的配置信息
//        Config conf = new Config();
//        //TOPOLOGY_DEBUG(setDebug),当他被设置成true的话,storm会记录下每个组件所发射的每条消息
//        //这在本地环境调试topology很有用。但是在线上这么做的话,会影响性能
//        conf.setDebug(false);
//
//        //storm的运行模式有两种:本地模式和分布式模式
//        if(args != null || args.length>0){
//            conf.setNumWorkers(3);
//            //向集群提交topology
//            try {
//                StormSubmitter.submitTopologyWithProgressBar(args[0],conf,topologyBuilder.createTopology());
//            } catch (AlreadyAliveException e) {
//                e.printStackTrace();
//            } catch (InvalidTopologyException e) {
//                e.printStackTrace();
//            } catch (AuthorizationException e) {
//                e.printStackTrace();
//            }
//        }
//        else{
//
//
//            conf.setMaxTaskParallelism(3);
//
//            LocalCluster cluster = new LocalCluster();
//            cluster.submitTopology("word-count",conf,builder.createTopology());
//        }
    }
}

3、Bolt, 负责拓扑请跟根据自己的业务

public class ReadKafkaSpoutBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {

        System.out.println(input.getValues().get(4)+"消息接受bolt");
        /*
        input 获取到的值

        0索引代表kafka的topic
        1索引代表kafka的分区
        2索引代表kafka的偏移量
        3索引代表kafka的key值
        4索引代表kafka的value值
        */
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

标签:集成,Storm,conf,kafka,storm,new,apache,org,Kafka
来源: https://www.cnblogs.com/xidianzxm/p/10774655.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有