ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Netty由浅入深的学习指南(进阶粘包半包)

2021-05-13 09:59:02  阅读:191  来源: 互联网

标签:Netty writeBytes 进阶 bytes ctx new 半包 byte buf


本章节主要介绍粘包半包的解决方法、协议的设计、序列化知识;同时通过实现聊天室案例将这些知识点串联起来。

3.1 粘包半包

  • 粘包半包现象

    • 粘包:多条数据粘连,一次发送给服务器
    • 半包:一条完整消息从某个点断开,发送给服务器的消息不完整

    演示代码

    //服务器演示代码
    public void start(){
        //声明工作线程及主线程
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            //启动配置
            ServerBootstrap server = new ServerBootstrap();
            server.channel(NioServerSocketChannel.class); //非阻塞
            //调整系统的接收缓冲区
            server.option(ChannelOption.SO_RCVBUF,10); //设置接收缓冲区
            server.group(boss,worker);
            server.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                }
            });
            ChannelFuture future = server.bind(8080).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("服务器启动错误:{}",e.getMessage());
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
    //客户端演示代码
    static void send(){
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap client = new Bootstrap();
            client.channel(NioSocketChannel.class);
            client.group(worker);
            client.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            for (int i = 0; i < 10; i++) {
                                ByteBuf buf = ctx.alloc().buffer(16);
                                buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
                                ctx.writeAndFlush(buf);
                            }
    					}
                    });
                }
            });
            ChannelFuture future = client.connect("127.0.0.1", 8080).sync();
            future.channel().closeFuture().sync();
        }catch (Exception e){
            log.error("客户端错误:{}",e.getMessage());
        }finally {
            worker.shutdownGracefully();
        }
    }
    

    日志说明

​ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |

±-------±------------------------------------------------±---------------+
|00000000| 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 04 05 |…|
|00000010| 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 04 05 |…|
|00000020| 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 04 05 |…|
|00000030| 06 07 |… |
±-------±------------------------------------------------±---------------+

通过日志可以看出,服务器前三次接收的属于粘包现象,最后一次属于半包现象。

  • 粘包半包的解决方案

    • 短链接

      //客户端
      @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
          ByteBuf buf = ctx.alloc().buffer(16);
          buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
          ctx.writeAndFlush(buf);
          ctx.channel().close(); //发完消息接断开连接,可以解决粘包,不能处理半包
      }
      //服务器,模拟半包现象
      //调整系统的接收缓冲区(滑动窗口)
      //server.option(ChannelOption.SO_RCVBUF,10); //设置接收缓冲区
      //调整netty的接收缓冲区(ByteBuf)
      server.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16));
      
    • 定长解码器

      //服务器端
      protected void initChannel(SocketChannel sc) throws Exception {
          //添加定长解吗器,位于最上端
          sc.pipeline().addLast(new FixedLengthFrameDecoder(10)); 
          sc.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
      }
      //客户端,fill10Bytes不足的补全10个字节
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
          ByteBuf buf = ctx.alloc().buffer();
          char c = '0';
          Random r = new Random();
          for (int i = 0; i < 10; i++) {
              byte[] bytes = fill10Bytes(c,r.nextInt(10) + 1);
              c++;
              buf.writeBytes(bytes);
          }
          ctx.writeAndFlush(buf);
      }
      //fill10Bytes()方法代码
      public static byte[] fill10Bytes(char c ,int len){
          byte[] bytes = new byte[10];
          Arrays.fill(bytes,(byte) '_');
          for (int i = 0; i < len; i++) {
              bytes[i] = (byte)c;
          }
          System.out.println("内容:" + new String(bytes));
          return bytes;
      }
      
    • 行解码器

      //服务器代码
      protected void initChannel(SocketChannel sc) throws Exception {
          sc.pipeline().addLast(new LineBasedFrameDecoder(1024)); //添加行解吗器
          sc.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
      }
      //客户端代码,makeString给消息添加分隔符‘\n’
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
          ByteBuf buf = ctx.alloc().buffer();
          char c = '0';
          Random r = new Random();
          for (int i = 0; i < 10; i++) {
              byte[] bytes = makeString(c,r.nextInt(256) + 1);
              c++;
              buf.writeBytes(bytes);
          }
          ctx.writeAndFlush(buf);
      }
      //makeString()方法代码
      public static byte[] makeString(char c ,int len){
          StringBuilder sb = new StringBuilder(len + 2);
          for (int i = 0; i < len; i++) {
              sb.append(c);
          }
          sb.append("\n");
          return sb.toString().getBytes();
      }
      
    • LTC解码器

      public static void main(String[] args) {
              EmbeddedChannel channel = new EmbeddedChannel(
                      new LengthFieldBasedFrameDecoder(1024,0,4,1,4),
                      new LoggingHandler(LogLevel.DEBUG)
              );
              //4个字节的内容长度,实际长度
              ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
              send(buf,"hello, world");
              send(buf,"Hi");
              channel.writeInbound(buf);
      
          }
      
          private static void send(ByteBuf buf,String content){
              byte[] bytes = content.getBytes(); //实际内容
              int len = bytes.length; //实际内容长度
              buf.writeInt(len); //写入内容长度
              buf.writeByte(1); //写入任意数据
              buf.writeBytes(bytes); //写入实际内容
          }
      

      注解:

      • maxFrameLength:发送的数据帧最大长度
      • lengthFieldOffset:定义长度域位于发送的字节数组中的下标
      • lengthFieldLength:用于描述定义的长度域的长度
      • lengthAdjustment:自长度域以后几个字节为内容域
      • initialBytesToStrip:接收到的发送数据包,去除前initialBytesToStrip位

3.2 协议的制定及解析

  • Redis

    final byte[] LINE = {13,10};
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        Bootstrap client = new Bootstrap();
        client.channel(NioSocketChannel.class);
        client.group(worker);
        client.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        //set name zhangsan
                        ByteBuf buf = ctx.alloc().buffer();
                        buf.writeBytes("*3".getBytes()); //命令数组长度
                        buf.writeBytes(LINE); //回车换行
                        buf.writeBytes("$3".getBytes()); //命令字段长度 set
                        buf.writeBytes(LINE);
                        buf.writeBytes("set".getBytes()); //指令 => set
                        buf.writeBytes(LINE);
                        buf.writeBytes("$4".getBytes()); //key字段长度 name
                        buf.writeBytes(LINE);
                        buf.writeBytes("name".getBytes()); //key => name
                        buf.writeBytes(LINE);
                        buf.writeBytes("$8".getBytes()); //value字段长度 zhangsan
                        buf.writeBytes(LINE);
                        buf.writeBytes("zhangsan".getBytes()); //value => zhangsan
                        buf.writeBytes(LINE);
                        ctx.writeAndFlush(buf); //写入并发送
                    }
    
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        System.out.println(buf.toString(Charset.defaultCharset()));
                    }
                });
            }
        });
        ChannelFuture future = client.connect("127.0.0.1", 6379).sync();
        future.channel().closeFuture().sync();
    }catch (Exception e){
        log.error("客户端错误:{}",e.getMessage());
    }finally {
        worker.shutdownGracefully();
    }
    
  • Http

    NioEventLoopGroup boss = new NioEventLoopGroup(1);
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        ServerBootstrap client = new ServerBootstrap();
        client.channel(NioServerSocketChannel.class);
        client.group(boss,worker);
        client.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                sc.pipeline().addLast(new HttpServerCodec());
                sc.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>(){
    
                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
                        //获取请求
                        log.info(msg.uri());
                        //返回响应
                        DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
                        byte[] bytes = "<h1>Hello, World!</h1>".getBytes();
                        response.headers().setInt(CONTENT_LENGTH,bytes.length);
                        response.content().writeBytes(bytes);
                        //写回响应
                        ctx.writeAndFlush(response);
                    }
                });
            }
        });
        ChannelFuture future = client.bind("127.0.0.1", 8080).sync();
        future.channel().closeFuture().sync();
    }catch (Exception e){
        log.error("客户端错误:{}",e.getMessage());
    }finally {
        boss.shutdownGracefully();
        worker.shutdownGracefully();
    }
    
  • 自定义协议

    • 要素

      • 魔数:用来在第一时间判定是否是无效的数据包
      • 版本号:可以支持协议的升级
      • 序列化算法:消息正文到底采用哪种序列化方式,可以由此扩展,例如JSON\protobuf\jdk
      • 指令类型:是登录、注册…与业务相关
      • 请求序号:为了双工通信,提供异步能力
      • 正文长度:传递数据的长度
      • 消息正文:传递的数据
    • 编码

      @Override
      protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
          //4字节的魔数
          out.writeBytes(new byte[]{1,2,3,4});
          //1字节的版本
          out.writeByte(1);
          //1字节的序列化方式0:jdk;1:json
          out.writeByte(0);
          //1字节的指令类型
          out.writeByte(msg.getMessageType());
          //4字节序列号
          out.writeByte(msg.getSequenceId());
          //对齐填充
          out.writeByte(0xff);
          //获取字节的内容数组
          ByteArrayOutputStream bos = new ByteArrayOutputStream();
          ObjectOutputStream oos = new ObjectOutputStream(bos);
          oos.writeObject(msg);
          byte[] bytes = bos.toByteArray();
          //内容长度
          out.writeInt(bytes.length);
          //内容写入
          out.writeBytes(bytes);
      }
      
    • 解码

      @Override
      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
          int magicNum = in.readInt();
          byte version = in.readByte();
          byte serializerType = in.readByte();
          byte messageType = in.readByte();
          int sequenceId = in.readInt();
          in.readByte();
          int length = in.readInt();
          byte[] bytes = new byte[length];
          in.readBytes(bytes,0,length);
          ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
          Message message = (Message) ois.readObject();
          log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
          log.debug("{}", message);
          out.add(message);
      }
      
    • 测试

      EmbeddedChannel channel = new EmbeddedChannel(
          //解决粘包半包问题
          new LengthFieldBasedFrameDecoder(1024,12,4,0,0),
          new LoggingHandler(LogLevel.DEBUG),
          new MessageCodec()
      );
      
      LoginRequestMessage request =
          new LoginRequestMessage("zhangsan","123","张三");
      //出站
      channel.writeOutbound(request);
      
      ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
      new MessageCodec().encode(null,request,buf);
      
      //模拟半包现象
      ByteBuf s1 = buf.slice(0, 100);
      ByteBuf s2 = buf.slice(100,buf.readableBytes() - 100);
      //入站
      s1.retain(); //防止内存释放
      channel.writeInbound(s1);
      channel.writeInbound(s2);
      

标签:Netty,writeBytes,进阶,bytes,ctx,new,半包,byte,buf
来源: https://blog.csdn.net/fuu123f/article/details/116737771

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有