ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

netty长(短)连接,心跳检测

2021-06-29 23:31:51  阅读:271  来源: 互联网

标签:netty void 连接 client 心跳 new public 服务端



title: netty长连接,短连接,心跳检测
date: 2018/4/23 11:12:55
tags: [netty,长连接,短连接,心跳检测]
categories:

  • 开发
  • java

https://www.cnblogs.com/superfj/p/9153776.html

短连接

定义

client与server通过三次握手建立连接,client发送请求消息,server返回响应,一次连接就完成了。

这时候双方任意都可以发起close操作,不过一般都是client先发起close操作。上述可知,短连接一般只会在 client/server间传递一次请求操作。

短连接的优缺点

管理起来比较简单,存在的连接都是有用的连接,不需要额外的控制手段。

使用场景

通常浏览器访问服务器的时候就是短连接。

对于服务端来说,长连接会耗费服务端的资源,而且用户用浏览器访问服务端相对而言不是很频繁的

如果有几十万,上百万的连接,服务端的压力会非常大,甚至会崩溃。

所以对于并发量大,请求频率低的,建议使用短连接。

长连接

什么是长连接

client向server发起连接,server接受client连接,双方建立连接。

Client与server完成一次读写之后,它们之间的连接并不会主动关闭,后续的读写操作会继续使用这个连接。

生命周期

  • 正常情况下,一条TCP长连接建立后,只要双不提出关闭请求&不出现异常情况,这条连接是一直存在.

  • 操作系统不会自动去关闭它,甚至经过物理网络拓扑的改变之后仍然可以使用。

  • 所以一条连接保持几天、几个月、几年或者更长时间都有可能,只要不出现异常情况或由用户(应用层)主动关闭。客户端和服务单可一直使用该连接进行数据通信。

优缺点

优点

  • 长连接可以省去较多的TCP建立和关闭的操作,减少网络阻塞的影响,

  • 当发生错误时,可以在不关闭连接的情况下进行提示,

  • 减少CPU及内存的使用,因为不需要经常的建立及关闭连接。

缺点

  • 连接数过多时,影响服务端的性能和并发数量

使用场景

  • 数据库的连接就是采用TCP长连接.
  • RPC一个服务进程频繁调用另一个服务进程,可使用长连接,减少连接花费的时间。

特殊的长连接模式

一种比较特殊的长连接模式,在一段时间内,“数据中心”与设备之间数据交互频繁,但是过了这段时间,很长一段时间内,都没有任何数据需要交互的情况下,最好的办法就是:在交互频繁的那段时间,保持长连接 ,一旦过了那段时间,立马断开连接,下次需要交互时,再获取连接即可,这种方式,主要可以为“数据中心”服务器节省资源的浪费。

第三种情况的使用,需要考虑两个问题:

  • 如果客户端或者“数据中心”,连接超时,无数据交互后,如何去断开连接?
  • 客户端主动断开连接,很好办,下次需要再连接即可,那么如果是“数据中心”主动断开连接了,客户端应该如何去与“数据中心”再次建立连接?

长连接模式下,维护连接的方案:

如果是长连接的情况下,一般我们都需要做连接的维护工作,方案主要有以下两种:

1、客户端间隔5分钟,向服务器发起一次“心跳”报文,如果服务器正常回应,那就无所谓,如果不回应,一般就直接断开“连接”,然后重新向服务器再次申请新的连接即可。

2、客户端或者服务端开启一个定时任务,间隔5分钟,判断在这5分钟内,是否有向服务器交互数据,如果有交互,那么就继续维护这个连接,如果没有交互,那么就直接断开连接即可,下次再需要交互时,再向服务器申请新的连接即可。这样做的好处,是给服务器减压

客户端如果主动自己断开,这个一般不需要做特殊处理,直接在下次连接时,申请新的连接即可。

服务器如果自己因为什么原因,断开了,那么客户端,需要定义一个定时任务,间隔10分钟,或者多少时间,去不断的尝试服务器是否恢复。

长连接的实现

心跳机制

应用层协议大多都有HeartBeat机制,通常是客户端每隔一小段时间向服务器发送一个数据包,通知服务器自己仍然在线。并传输一些可能必要的数据。使用心跳包的典型协议是IM,比如QQ/MSN/飞信等协议。

TCP . SO_KEEPALIVE。系统默认是设置的2小时的心跳频率。但是它检查不到机器断电、网线拔出、防火墙这些断线。而且逻辑层处理断线可能也不是那么好处理。一般,如果只是用于保活还是可以的。

Why心跳机制?

  • 网络的不可靠性
    • 有可能在 TCP 保持长连接的过程中, 由于某些突发情况,(网线被拔出, 突然断电等),会造成连接中断
    • 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的.心跳机制即可解决此类问题。

TCP协议的KeepAlive机制

默认KeepAlive状态是不打开的。

需要将setsockopt将SOL_SOCKET.SO_KEEPALIVE设置为1才是打开KeepAlive状态,

并且可以设置三个参数:

tcp_keepalive_time ,tcp_keepalive_probes , tcp_keepalive_intvl

分别表示:连接闲置多久开始发keepalive的ack包、发几个ack包不回复才当对方已断线、两个ack包之间的间隔。

很多网络设备,尤其是NAT路由器,由于其硬件的限制(例如内存、CPU处理能力),无法保持其上的所有连接,因此在必要的时候,会在连接池中选择一些不活跃的连接踢掉。

典型做法是LRU,把最久没有数据的连接给T掉。

通过使用TCP的KeepAlive机制(修改那个time参数),可以让连接每隔一小段时间就产生一些ack包,以降低被踢掉的风险,当然,这样的代价是额外的网络和CPU负担。

如何实现心跳机制

两种方式实现心跳机制:

  • 使用 TCP 协议层面的 keepalive 机制.
  • 应用层上实现自定义的心跳机制.

虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:

  1. 它不是 TCP 的标准协议, 并且是默认关闭的.
  2. TCP keepalive 机制依赖于操作系统的实现, 默认的 keepalive 心跳时间是 两个小时, 并且对 keepalive 的修改需要系统调用(或者修改系统配置), 灵活性不够.
  3. TCP keepalive 与 TCP 协议绑定, 因此如果需要更换为 UDP 协议时, keepalive 机制就失效了.

使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量,

Netty心跳和断线重连

https://github.com/JayTange/LearnTCP

IdleStateHandler

//构造函数
public IdleStateHandler(
    long readerIdleTime, long writerIdleTime, long allIdleTime,
    TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

//HeartBeatServer.java(服务端)

serverBootstrap.childHandler(new HeartBeatServerInitializer());

private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {
    
      @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            // 监听读操作,读超时时间为5秒,超过5秒关闭channel;
            pipeline.addLast("ping", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0, TimeUnit.SECONDS));
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());

            pipeline.addLast("handler", new HeartbeatServerHandler());
        }
}

  • readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发 IdleStateEvent 事件(READER_IDLE)
  • writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个IdleStateEvent 事件(WRITE_IDLE)
  • allIdleTimeSeconds, 读和写都超时. 即当在指定的时间间隔内没有读并且写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.
// HeartBeatServerHandler.java extends SimpleChannelInboundHandler<T>
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent event = (IdleStateEvent) evt;
        if (event.state()==IdleState.READER_IDLE){
            // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
            if (unRecPingTimes >= MAX_UN_REC_PING_TIMES) {
                System.out.println("===>服务端===(读超时,关闭chanel)");
                // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
                ctx.close();
            } else {
                // 失败计数器加1
                unRecPingTimes++;
            }
        }else {
            super.userEventTriggered(ctx,evt);
        }
    }
}


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    super.channelActive(ctx);
    System.out.println("一个客户端已连接");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    super.channelInactive(ctx);
    System.out.println("一个客户端已断开连接");
}

客户端HeartBeatClient

// 客户端
public class HeartBeatClient {

    private Random random = new Random();
    public Channel channel;
    public Bootstrap bootstrap;

    protected String host = "127.0.0.1";
    protected int port = 9817;

    public static void main(String args[]) throws Exception {
        HeartBeatClient client = new HeartBeatClient();
        client.run();
        client.sendData();

    }

    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new SimpleClientInitializer(HeartBeatClient.this));
            doConncet();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 发送数据
     * @throws Exception
     */
    public void sendData() throws Exception {
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        while (true){
            String cmd = in.readLine();
            switch (cmd){
                case "close" :
                    channel.close();
                    break;
                default:
                channel.writeAndFlush(in.readLine());
                    break;
            }
        }
    }

    /**
     * 连接服务端
     */
    public void doConncet() {
        if (channel != null && channel.isActive()) {
            return;
        }
        ChannelFuture channelFuture = bootstrap.connect(host, port);
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture futureListener) throws Exception {
                if (channelFuture.isSuccess()) {
                    channel = futureListener.channel();
                    System.out.println("connect server successfully");
                } else {
                    System.out.println("Failed to connect to server, try connect after 10s");
                    futureListener.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            doConncet();
                        }
                    }, 10, TimeUnit.SECONDS);
                }
            }
        });

    }


    private class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {

        private HeartBeatClient client;

        public SimpleClientInitializer(HeartBeatClient client) {
            this.client = client;
        }

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new IdleStateHandler(0, 5, 0));
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("handler", new HeartBeatClientHandler(client));
        }
    }

}

HeartBeatClientHandler

客户端handler

public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
    private HeartBeatClient client;

    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd");

    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF_8));

    public HeartBeatClientHandler(HeartBeatClient client) {
        this.client = client;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("收到服务端回复:"+msg);
        if (msg.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        System.err.println("客户端与服务端断开连接,断开的时间为:"+format.format(new Date()));
        // 定时线程 断线重连
        final EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(new Runnable() {
            @Override
            public void run() {
                client.doConncet();
            }
        }, 10, TimeUnit.SECONDS);
    }

}

长连接优化

https://www.jianshu.com/p/54f9bfcd054b

物联网(IoT ) 时代

internet of things 飞速发展,对服务端的协议接入和处理能力要求极高,而且IoT设备接入它有以下特点

  1. 使用移动网络(Wifi),网络质量不稳定
  2. 海量接入,而且是长连接,服务端压力大
  3. 不稳定,消息丢失,重发,延迟,过期发送时有发生
  4. 协议不统一,私有协议 ,开发测试成本高

长连接海量接入优化方向

以下这些调优,都属于小方法,小技巧,如果系统对性能要求很高,最优的还是采用分布式集群的方式来提升整个服务端的处理能力。

要想实现海量设备的长连接接入,需要对

  • 操作系统相关参数
  • Netty框架、JVM GC参数
  • 业务代码针对性的优化
  • 甚至各种优化要素互相影响
操作系统层面
文件描述符

在 /etc/sysctl.conf 插入 fs.file-max = 1000000

设置单进程打开的最大句柄数

TCP/IP相关参数
多网卡队列软中断
Netty调优
线程优化
  • boss线程池优化
    对于Netty服务端,通常只需要启动一个监听端口用于端侧设备接入,但是如果集群实例较少,甚至是单机部署,那么在短时间内大量设备接入时,需要对服务端的监听方式和线程模型做优化,即服务端监听多个端口,利用主从Reactor线程模型。由于同时监听了多个端口,每个ServerSocketChannel都对应一个独立的Acceptor线程,这样就能并行处理,加速端侧设备的接人速度,减少端侧设备的连接超时失败率,提高单节点服务端的处理性能。
  • work线程池优化(I/O工作线程池)
    对于I/O工作线程池的优化,可以先采用系统默认值(cpu内核数*2)进行性能测试,在性能测试过程中采集I/O线程的CPU占用大小,看是否存在瓶颈,具体策略如下:
心跳优化

心跳检测周期通常不要超过60s,心跳检测超时通常为心跳检测周期的2倍

接发缓冲区调优

.childOption(channelOption.SO_RCVBUF, 8*1024)

内存池

java对于缓冲区Buffer的分配和回收是一个耗时的工作(特别是堆外直接内存)。

为了尽量重用缓冲区,netty提供了基于内存池的缓冲区重用机制。

内存池实现上可以分为两类:堆外直接内存和堆内存。

DirectBypeBuf的创建成本高,因为需要配合内存池使用

netty默认的io读写操作采用的都是内存池的堆外直接内存模式,如果用户需要额外使用ByteBuf,建议也采用内存池方式 ; 如果不涉及网络io操作(单纯的内存操作)可以使用堆内存池

// void initchannel(SocketChannel channel)
channel.config().setAllocator(UnpooledByteBufAllocator.DEAFAULT);
io线程与业务线程隔离
JVM调优
GC指标

吞吐量,延迟,内存占用

GC优化原则
  1. Minor GC (每次新生代回收尽可能多的内存,以减小FULL GC)

  2. GC内存最大化原则:GC使用的内存越大,效率越高

  3. 大多数IoT应用,吞吐量优先

-XX printGC

visual GC工具

标签:netty,void,连接,client,心跳,new,public,服务端
来源: https://www.cnblogs.com/skystarry/p/14952493.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有