ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Netty通信技术

2022-01-06 18:03:58  阅读:191  来源: 互联网

标签:Netty Reactor 通信 技术 线程 new channel 客户端


一、简介

一、概述

        Netty是JBOSS提供的一个开源框架, 本质为网络应用程序框架, 提供了非阻塞的、事件驱动的网络应用程序框架和工具,便于快速开发出高性能、高可靠性的网络服务端以及客户端。

二、核心架构

        

上图是netty的核心架构,从官网上截取的, 有兴趣的可以去看看

核心:

  1. 可拓展的事件模型
  2. 统一的通信API,简化了通信编码
  3. 零拷贝和丰富的字节缓冲区

传输服务:

  1. 支持Socket和Datagram(数据报)
  2. HTTP传输
  3. In-VM Pipe(管道协议,JVM的一种进程)

协议支持:

  1. HTTP以及WebSocket
  2. SSL安全套接字协议支持
  3. Google Protobuf(序列化框架)
  4. 支持zlib、gzip压缩
  5. 支持大文件的传输
  6. RTSP(实时流传输协议, 是TCP/IP协议体系中的一个应用层协议)
  7. 支持二进制协议并且提供完整的单元测试

三、为什么使用Netty不使用Java原生NIO

Netty主要采用的也是NIO,并且是基于JDK中的NIO做了一些实现以及升级

主要有四点:

  1. Netty的API对开发者更友好,JDK中的API功能薄弱且复杂(如:ByteBuffer改为ByteBuf)
  2. Netty中采用了Reactor线程模型,能够保证自身的线程安全
  3. Netty能实现高可用,解决了一些传输的问题,如粘包、半包、断路重连等
  4. 解决了Bug,如JDK的NIO中epoll bug

四、在使用Netty的项目

  • 数据库: Cassandra
  • 大数据处理: Spark、Hadoop
  • Message Queue:RocketMQ
  • 检索: Elasticsearch
  • 框架:gRPC、Apache Dubbo、Spring5(响应式编程WebFlux)
  • 分布式协调器:ZooKeeper
  • 工具类: async-http-client

二、Reactor模型

Reactor线程模型是一种思想,不属于Java,也不属于Netty,其定义了三种角色

  • Reactor:用于监听和分配事件,将I/O事件分给对应的Handler。新的事件包括建立连接就绪、读就绪、写就绪。
  • Acceptor:处理客户端连接,并分配到处理器链中(可暂时简单理解为ServerSocketChannel)
  • Handler:将自身与事件绑定,执行非阻塞的读和写任务,从channel中读入,完成处理业务逻辑, 再将结果写入channel

通过这三种角色定义出三种NIO的模式

单Reactor-单线程模式 

 

        所有的接收请求以及处理数据都是有一个线程执行的,所以在一定的数量之后,性能就会下降。

单Reactor-多线程模式

 

 

         基于刚刚的单Reactor单线程模式,我们将消耗时间较长的编解码、业务计算抽取出去,建立一个线程池进行处理,这样能提升性能,但还不是最优解。

主从Reactor-多线程模型

 

        这次我们再次基于上面的模型进行调整,将专门负责接受连接的ServerSocketChannel另开一个Reactor进行调用,这个Reactor称为主;主Reactor将Channel建立在专门负责读写的从Reactor上,这就是所谓的1+N+M(1个监听线程,负责监听新的socket;N个IO线程,负责对socket进行读写;M个worker线程,负责处理数据)。

工作流程:

  1. Reactor主线程MainReactor对象通过selector监听到客户端的连接请求,通过Acceptor处理客户端连接事件。
  2. Acceptor跟客户端建立好socket连接之后,MainReactor会将连接分配给SubReactor。
  3. SubReactor将连接注册到自己的Selector的队列中进行监听,并创建对应的Handler对各种事件进行处理。
  4. 当连接上有新的事件发生时,SubReactor会调用对应的Handler进行处理。
  5. Handler通过read从channel和缓冲区上读取请求数据,然后将分发给Worker进行处理。
  6. Worker处理完数据会将结果再返还给Handler,Handler再通过send请求将数据发给客户端。
  7. 一个MainReactor可以对应多个SubReactor。

优势所在:

  1. 各个线程职责简单且明确,MainReactor只需要负责注册连接,SubReactor负责后续业务处理。
  2. MainReactor和SubReactor交互简单,主只需将连接交给从,从也无需返回数据。
  3. 多个SubReactor可以处理高并发业务。

三、Netty对Reactor的实现

 在Netty这个部分当中可以看到用了很多的池化思想

工作流程:

  1. Netty提供了两个线程池,一个BossGroup、一个WorkerGroup,每个池中都有EventLoop(相当于一个线程,可以是NIO,BIO,AIO)
  2. 每个EventLoop中包含Selector和TaskQueue
  3. 每个BossEventLoop负责以下三件事
  4. ①select:轮询注册ServerSocketChannel上的accept事件
  5. ②processSeleckedKeys:与客户端进行连接,并创建SocketChannel,将它注册到某一WorkerEventLoop上
  6. ③runAllTasks:继续处理其他事件
  7. 每个WorkerEventLoop也负责一下三件事
  8. ①select:轮询注册SocketChannel上的read/write事件
  9. ②processSeleckedKeys:在对应的SocketChannel上的PipeLine上处理相对应的数据
  10. ③runAllTasks:继续处理队列中的其他事件

ChannelPipeline和ChannelHandler

         如图所示,ChannelPipeline为ChannelHandler的容器,每个SocketChannel都会与一个ChannelPipeLine绑定。假如这是服务端程序,读取处理我们称数据是入站的,需要经过一系列Handler处理后;如果服务器想向客户端写回数据,也需要经过一系列的Handler处理,我们称之为出站。

        ChannelHandler则分为出站和入站的处理器,还有混合型的既能处理出站也能处理入站的

 四、Netty使用的示例代码

服务端

public class NettyServer {
    public static void main(String[] args) {
        NettyServer server = new NettyServer();
        server.start(8888);
    }

    private void start(int port) {
        // 创建主线程池
        EventLoopGroup boss = new NioEventLoopGroup(1);
        // 创建从线程池
        EventLoopGroup work = new NioEventLoopGroup();
        try {
            // 创建服务端的引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, work)
                    // 配置服务端主的channel
                    .channel(NioServerSocketChannel.class)
                    // 配置服务端handler
                    .handler(new LoggingHandler(LogLevel.INFO))
                    // 配置从的handler,也就是服务端连接的
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new ServerInboundHandler1());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}

客户端

public class NettyClient {
    public static void main(String[] args) {
        NettyClient client = new NettyClient();
        client.start("127.0.0.1",8888);
    }

    private void start(String host, int port) {
        EventLoopGroup loopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(loopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                        }
                    });
            // 连接服务器
            ChannelFuture future = bootstrap.connect(host, port).sync();
            // 客户端向服务端发送数据
            Channel channel = future.channel();
            String msg= "我是Netty客户端, 你收到了吗?";
            ByteBuf buffer = channel.alloc().buffer();
            buffer.writeBytes(msg.getBytes(StandardCharsets.UTF_8));
            channel.writeAndFlush(buffer);

            // 等待关闭
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            loopGroup.shutdownGracefully();
        }
    }
}

入站处理器

@Slf4j
public class ServerInboundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("ServerInboundHandler1 channelActive 执行了");
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] dst=new byte[buf.readableBytes()];
        buf.readBytes(dst);
        String s = new String(dst, Charset.defaultCharset());
        System.out.println(s);
        log.info("读取到从客户端的数据"+s);
        super.channelRead(ctx, msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

标签:Netty,Reactor,通信,技术,线程,new,channel,客户端
来源: https://blog.csdn.net/m0_51464746/article/details/122324297

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有