ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Netty:Reactor Pattern 与 Dubbo 底层传输中的 NettyServer

2020-04-27 23:01:46  阅读:262  来源: 互联网

标签:Dubbo ch NIO Netty OP 线程 客户端 channel Reactor


首先,我们需要了解Reactor模式的三种线程模型:

1)单线程模型

  Reactor 单线程模型,指的是所有的 IO 操作都在同一个 NIO 线程上面完成,NIO 线程的职责如下:

  1. 作为 NIO 服务端,接收客户端的 TCP 连接;
  2. 作为 NIO 客户端,向服务端发起 TCP 连接;
  3. 读取通信对端的请求或者应答消息;
  4. 向通信对端发送消息请求或者应答消息。

  Reactor 单线程模型示意图如下所示:

 

  由于 Reactor 模式使用的是异步非阻塞 IO,所有的 IO 操作都不会导致阻塞,理论上一个线程可以独立处理所有 IO 相关的操作。从架构层面看,一个 NIO 线程确实可以完成其承担的职责。例如,通过 Acceptor 类接收客户端的 TCP 连接请求消息,链路建立成功之后,通过 Dispatch 将对应的 ByteBuffer 派发到指定的 Handler 上进行消息解码。用户线程可以通过消息编码通过 NIO 线程将消息发送给客户端。

  对于一些小容量的应用场景,可以使用单线程模型。但是对于高负载、大并发的应用场景却不合适主要原因如下:

  • 一个 NIO 线程同时处理成百上千的链路,性能上无法支撑,即便 NIO 线程的 CPU 负荷达到 100%,也无法满足海量消息的编码、解码、读取和发送;
  • 当 NIO 线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了 NIO 线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;
  • 可靠性问题:一旦 NIO 线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。

2)多线程模型

  为了解决单线程在其他应用场景的不足,演进除了Rector 多线程模型。Reactor 多线程模型与单线程模型最大的区别就是有一组 NIO 线程处理 IO 操作,它的原理图如下:

  

  Reactor 多线程模型的特点:

  1. 有专门一个 NIO 线程 Acceptor 线程用于监听服务端,接收客户端的 TCP 连接请求;
  2. 网络 IO 操作 —— 读、写等由一个 NIO 线程池负责,线程池可以采用标准的 JDK 线程池实现,它包含一个任务队列和 N 个可用的线程,由这些 NIO 线程负责消息的读取、解码、编码和发送;
  3. 1 个 NIO 线程可以同时处理 N 条链路,但是 1 个链路只对应 1 个 NIO 线程,防止发生并发操作问题。

  在绝大多数场景下,Reactor 多线程模型都可以满足性能需求;但是,在极个别特殊场景中,一个 NIO 线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万级别客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个 Acceptor 线程可能会存在性能不足问题,为了解决性能问题,产生了第三种 Reactor 线程模型 - 主从 Reactor 多线程模型。

3)主从多线程模型

  主从 Reactor 线程模型的特点是:服务端用于接收客户端连接的不再是个 1 个单独的 NIO 线程,而是一个独立的 NIO 线程池。Acceptor 接收到客户端 TCP 连接请求处理完成后(可能包含接入认证等),将新创建的 SocketChannel 注册到 IO 线程池(sub reactor 线程池)的某个 IO 线程上,由它负责 SocketChannel 的读写和编解码工作。Acceptor 线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端 subReactor 线程池的 IO 线程上,由 IO 线程负责后续的 IO 操作。

  利用主从 NIO 线程模型,可以解决 1 个服务端监听线程无法有效处理所有客户端连接的性能不足问题。

  它的工作流程总结如下:

  1. 从主线程池中随机选择一个 Reactor 线程作为 Acceptor 线程,用于绑定监听端口,接收客户端连接;

  2. Acceptor 线程接收客户端连接请求之后创建新的 SocketChannel,将其注册到主线程池的其它 Reactor 线程上,由其负责接入认证、IP 黑白名单过滤、握手等操作;

  3. 步骤 2 完成之后,业务层的链路正式建立,将 SocketChannel 从主线程池的 Reactor 线程的多路复用器上摘除,重新注册到 Sub 线程池的线程上,用于处理 I/O 的读写操作。

 

Netty 的线程模型与上面介绍的三种 Reactor 线程模型相似

  举个Dubbo底层传输时使用Netty4.x服务器端的例子:

1 org.apache.dubbo.remoting.transport.netty4.NettyServer.class

  第一步:其中打开服务端的时候实例化了 2 个 EventLoopGroup,1 个 EventLoopGroup 实际就是一个 EventLoop 线程组,负责管理 EventLoop 的申请和释放。第一步:其中打开服务端的时候实例化了 2 个 EventLoopGroup,1 个 EventLoopGroup 实际就是一个 EventLoop 线程组,负责管理 EventLoop 的申请和释放。

 1    @Override
 2     protected void doOpen() throws Throwable {
 3         bootstrap = new ServerBootstrap();
 4 
 5         bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
 6         workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
 7                 new DefaultThreadFactory("NettyServerWorker", true));
 8 
 9         final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
10         channels = nettyServerHandler.getChannels();
11 
12         bootstrap.group(bossGroup, workerGroup)
13                 .channel(NioServerSocketChannel.class)
14                 .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
15                 .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
16                 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
17                 .childHandler(new ChannelInitializer<NioSocketChannel>() {
18                     @Override
19                     protected void initChannel(NioSocketChannel ch) throws Exception {
20                         // FIXME: should we use getTimeout()?
21                         int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
22                         NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
23                         ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
24                                 .addLast("decoder", adapter.getDecoder())
25                                 .addLast("encoder", adapter.getEncoder())
26                                 .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
27                                 .addLast("handler", nettyServerHandler);
28                     }
29                 });
30         // bind
31         ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
32         channelFuture.syncUninterruptibly();
33         channel = channelFuture.channel();
34 
35     }

  bossGroup 线程组实际就是 Acceptor 线程池,负责处理客户端的 TCP 连接请求,如果系统只有一个服务端端口需要监听,则建议 bossGroup 线程组线程数设置为 1。这个boss线程组里面只设置了一个EventLoop线程。

  workerGroup 是真正负责 I/O 读写操作的线程组,通过 ServerBootstrap 的 group 方法进行设置,用于后续的 Channel 绑定。

  第二步:通过ServerBootStrap引导类bossgroup线程绑定监听端口,启动 NIO 服务端,相关代码如下:AbstractBootstrap#initAndRegister

 1 final ChannelFuture initAndRegister() {
 2         Channel channel = null;
 3         try {
 4             channel = channelFactory.newChannel();
 5             init(channel);
 6         } catch (Throwable t) {
 7             if (channel != null) {
 8                 // channel can be null if newChannel crashed (eg SocketException("too many open files"))
 9                 channel.unsafe().closeForcibly();
10                 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
11                 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
12             }
13             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
14             return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
15         }
16 
17         ChannelFuture regFuture = config().group().register(channel); // 注册
18         if (regFuture.cause() != null) {
19             if (channel.isRegistered()) {
20                 channel.close();
21             } else {
22                 channel.unsafe().closeForcibly();
23             }
24         }
25 
26         // If we are here and the promise is not failed, it's one of the following cases:
27         // 1) If we attempted registration from the event loop, the registration has been completed at this point.
28         //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
29         // 2) If we attempted registration from the other thread, the registration request has been successfully
30         //    added to the event loop's task queue for later execution.
31         //    i.e. It's safe to attempt bind() or connect() now:
32         //         because bind() or connect() will be executed *after* the scheduled registration task is executed
33         //         because register(), bind(), and connect() are all bound to the same thread.
34 
35         return regFuture;
36     }

  服务端 Channel 创建完成之后,将其注册到多路复用器 Selector 上,用于接收客户端的 TCP 连接,核心代码如下:AbstractNioChannel#doRegister

 1     @Override
 2     protected void doRegister() throws Exception {
 3         boolean selected = false;
 4         for (;;) {
 5             try {
 6                 // 取出NioEventLoop中关联的Selector并注册这个NioChannel的相关操作, 0值表示注册, 没有其他操作
 7                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
 8                 return;
 9             } catch (CancelledKeyException e) {
10                 if (!selected) {
11                     // Force the Selector to select now as the "canceled" SelectionKey may still be
12                     // cached and not removed because no Select.select(..) operation was called yet.
13                     eventLoop().selectNow();
14                     selected = true;
15                 } else {
16                     // We forced a select operation on the selector before but the SelectionKey is still cached
17                     // for whatever reason. JDK bug ?
18                     throw e;
19                 }
20             }
21         }
22     }

  第三步,如果监听到客户端连接,则创建客户端 SocketChannel 连接,重新注册到 workerGroup 的 IO 线程上。首先看 Acceptor 线程如何处理客户端的接入:NioEventLoop#processSelectedKey

 1       try {
 2             int readyOps = k.readyOps();
 3             // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
 4             // the NIO JDK channel implementation may throw a NotYetConnectedException.
 5             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
 6                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
 7                 // See https://github.com/netty/netty/issues/924
 8                 int ops = k.interestOps();
 9                 ops &= ~SelectionKey.OP_CONNECT;
10                 k.interestOps(ops);
11 
12                 unsafe.finishConnect();
13             }
14 
15             // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
16             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
17                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
18                 ch.unsafe().forceFlush();
19             }
20 
21             // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
22             // to a spin loop
23             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
24                 unsafe.read();
25             }
26         } catch (CancelledKeyException ignored) {
27             unsafe.close(unsafe.voidPromise());
28         }

  上面调用了 unsafe 的 read()方法,对于 NioServerSocketChannel,它调用了 NioMessageUnsafe 的 read() 方法,代码如下:

 1             try {
 2                 try {
 3                     do {
 4                         int localRead = doReadMessages(readBuf);
 5                         if (localRead == 0) {
 6                             break;
 7                         }
 8                         if (localRead < 0) {
 9                             closed = true;
10                             break;
11                         }
12 
13                         allocHandle.incMessagesRead(localRead);
14                     } while (allocHandle.continueReading());
15                 } catch (Throwable t) {
16                     exception = t;
17                 }
18                 ...
19              }

  它最终会调用 NioServerSocketChannel 的 doReadMessages 方法,为接受的套接字创建新的子通道,代码如下:

 1     @Override
 2     protected int doReadMessages(List<Object> buf) throws Exception {
 3         SocketChannel ch = SocketUtils.accept(javaChannel());
 4 
 5         try {
 6             if (ch != null) {
 7                 buf.add(new NioSocketChannel(this, ch));
 8                 return 1;
 9             }
10         } catch (Throwable t) {
11             logger.warn("Failed to create a new channel from an accepted socket.", t);
12 
13             try {
14                 ch.close();
15             } catch (Throwable t2) {
16                 logger.warn("Failed to close a socket.", t2);
17             }
18         }
19 
20         return 0;
21     }

  ServerSocketChannel有阻塞非阻塞两种模式:

  1. 阻塞模式:ServerSocketChannel.accept() 方法监听新进来的连接,当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel。阻塞模式下, accept()方法会一直阻塞到有新连接到达。

  2. 非阻塞模式: accept()  方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null。

  在NioServerSocketChannel的构造函数分析中,它的抽象父类AbstractNioChannel在构造的时候就设置 ch.configureBlocking(false); 无阻塞模式,所以会立刻返回,NioMessageUnsafe#read()方法会不断的循环读取客户端的接入。

  对于doReadMessages() 方法中创建了NioSocketChannel实例,NioSocketChannelConfig作用是配置这个Channel和JavaSocket的对应关系,主要是设置或获取一些参数,比如说 TCP_NODELAY ,设置发送缓冲区大小等。

1     public NioSocketChannel(Channel parent, SocketChannel socket) {
2         super(parent, socket);
3         config = new NioSocketChannelConfig(this, socket.socket());
4     }

  其中的NioSocketChannel存储了这个这个子通道和Java Socket之间

  第四步,将 SocketChannel 将 SocketChannel 注册到ServerSocketChannel的多路复用器上,监听 READ 操作。向上一直到 AbstractChannel ,为这个Channel创建一个Unsafe(与Channel相关,数据读取方面)和PipeLine。

1     protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
2         super(parent, ch, SelectionKey.OP_READ);
3     }

  第五步,处理网络的 I/O 读写事件,主要代码看第三步中的代码,也可以看以下的处理SelectedKey的方法

 1     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
 2         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
 3         if (!k.isValid()) {
 4             final EventLoop eventLoop;
 5             try {
 6                 eventLoop = ch.eventLoop();
 7             } catch (Throwable ignored) {
 8                 // If the channel implementation throws an exception because there is no event loop, we ignore this
 9                 // because we are only trying to determine if ch is registered to this event loop and thus has authority
10                 // to close ch.
11                 return;
12             }
13             // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
14             // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
15             // still healthy and should not be closed.
16             // See https://github.com/netty/netty/issues/5125
17             if (eventLoop != this || eventLoop == null) {
18                 return;
19             }
20             // close the channel if the key is not valid anymore
21             unsafe.close(unsafe.voidPromise());
22             return;
23         }
24 
25         try {
26             int readyOps = k.readyOps();
27             // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
28             // the NIO JDK channel implementation may throw a NotYetConnectedException.
29             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
30                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
31                 // See https://github.com/netty/netty/issues/924
32                 int ops = k.interestOps();
33                 ops &= ~SelectionKey.OP_CONNECT;
34                 k.interestOps(ops);
35 
36                 unsafe.finishConnect();
37             }
38 
39             // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
40             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
41                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
42                 ch.unsafe().forceFlush();
43             }
44 
45             // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
46             // to a spin loop
47             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
48                 unsafe.read();
49             }
50         } catch (CancelledKeyException ignored) {
51             unsafe.close(unsafe.voidPromise());
52         }
53     }
View Code

参考: Netty 系列之 Netty 线程模型 ,Netty源码分析 (五)----- 数据如何在 pipeline 中流动

标签:Dubbo,ch,NIO,Netty,OP,线程,客户端,channel,Reactor
来源: https://www.cnblogs.com/magic-sea/p/12790862.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有