ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

御神楽的学习记录之基于IO、NIO、Netty的TCP聊天程序

2021-12-15 12:32:58  阅读:132  来源: 互联网

标签:Netty java NIO TCP io new import public channel


文章目录


前言

java 1.4版本推出了一种新型的IO API,与原来的IO具有相同的作用和目的;可代替标准java IO,只是实现的方式不一样,NIO面向缓冲区、基于通道的IO操作;通过NIO可以提高对文件的读写操作。基于这种优势,现在使用NIO的场景越来愈多,很多主流行的框架都使用到了NIO技术,如Tomcat、Netty、Jetty等;所以学习和掌握NIO技术已经是一个java开发的必备技能了。


一、IO与NIO

1.面向流与面向缓冲区

Java IO中读取数据和写入数据是**面向流(Stream)**的,这表示当我们从流中读取数据,写入数据时也将其写入流,流的含义在于没有缓存 ,就好像我们站在流水线前,所有的数据沿着流水线依次到达我们的面前,我们只能读取当前的数据。如果需要获取某个数据的前一项或后一项数据那就必须自己缓存数据,而不能直接从流中获取。

而在Java NIO中数据的读写是面向**缓冲区(Buffer)**的,读取时可以将整块的数据读取到缓冲区中,在写入时则可以将整个缓冲区中的数据一起写入。这就好像是将流水线传输变成了卡车运送,面向流的数据读写只提供了一个数据流切面,而面向缓冲区的IO则使我们能够看到数据的上下文,也就是说在缓冲区中获取某项数据的前一项数据或者是后一项数据十分方便。这种便利是有代价的,因为我们必须管理好缓冲区,这包括不能让新的数据覆盖了缓冲区中还没有被处理的有用数据;将缓冲区中的数据正确的分块,分清哪些被处理过哪些还没有等等。

2.阻塞与非阻塞

传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。

Java NIO非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。

二、TCP聊天程序

1.基于IO

IO服务端

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class IOServer {

    @SuppressWarnings("resource")
    public static void main(String[] args) throws Exception {

        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        //创建socket服务,监听8081端口
        ServerSocket server=new ServerSocket(8081);
        System.out.println("服务器启动!");
        int count=0;
        while(true){
            //获取一个套接字(阻塞)
            final Socket socket = server.accept();
            System.out.println("欢迎第"+(++count)+"个同学");
            newCachedThreadPool.execute(new Runnable() {

                @Override
                public void run() {
                    //业务处理
                    handler(socket);
                }
            });

        }
    }

  
 //读取数据
    
    public static void handler(Socket socket){
        try {
            byte[] bytes = new byte[1024];
            InputStream inputStream = socket.getInputStream();

            while(true){
                //读取数据(阻塞)
                int read = inputStream.read(bytes);
                if(read != -1){
                    System.out.println(new String(bytes, 0, read));
                }else{
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            try {
                System.out.println("socket关闭");

                socket.close();
            } catch (IOException e) {

                e.printStackTrace();
            }
        }
    }
}

IO客户端

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;

public class IOClient {
    public static void main(String[] args) throws IOException {
        //发送十次
        for (int i=0;i<10;i++){
            Socket socket=new Socket("127.0.0.1", 8081);
            //写数据
            OutputStream os=socket.getOutputStream();
            os.write(("御神楽"+i).getBytes());
            //释放资源
            socket.close();
        }

    }

}

效果:
在这里插入图片描述

2.基于NIO

NIO服务端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NIOServer {
    // 通道管理器
    private Selector selector;


     //启动服务端测试

    public static void main(String[] args) throws IOException {
        NIOServer server = new NIOServer();
        server.initServer(8081);
        server.listen();
    }



     // 获得一个ServerSocket通道,并对该通道做一些初始化的工作

    public void initServer(int port) throws IOException {
        // 获得一个ServerSocket通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置通道为非阻塞
        serverChannel.configureBlocking(false);
        // 将该通道对应的ServerSocket绑定到port端口
        serverChannel.socket().bind(new InetSocketAddress(port));
        // 获得一个通道管理器
        this.selector = Selector.open();
        // 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
        // 当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    }


     //采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理

    public void listen() throws IOException {
        System.out.println("服务端启动成功!");
        // 轮询访问selector
        while (true) {
            // 当注册的事件到达时,方法返回;否则,该方法会一直阻塞
            selector.select();
            // 获得selector中选中的项的迭代器,选中的项为注册的事件
            Iterator<?> ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                // 删除已选的key,以防重复处理
                ite.remove();

                handler(key);
            }
        }
    }


     //处理请求

    public void handler(SelectionKey key) throws IOException {

        // 客户端请求连接事件
        if (key.isAcceptable()) {
            handlerAccept(key);
            // 获得了可读的事件
        } else if (key.isReadable()) {
            handelerRead(key);
        }
    }


     // 处理连接请求

    public void handlerAccept(SelectionKey key) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        // 获得和客户端连接的通道
        SocketChannel channel = server.accept();
        // 设置成非阻塞
        channel.configureBlocking(false);

        // 在这里可以给客户端发送信息哦
        System.out.println("检测到新客户连接");
        // 在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
        channel.register(this.selector, SelectionKey.OP_READ);
    }


     // 处理读的事件

    public void handelerRead(SelectionKey key) throws IOException {
        // 服务器可读取消息:得到事件发生的Socket通道
        SocketChannel channel = (SocketChannel) key.channel();
        // 创建读取的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int read = channel.read(buffer);
        if(read > 0){
            byte[] data = buffer.array();
            String msg = new String(data).trim();
            System.out.println("用户名为:" + msg);

            //回写数据
            ByteBuffer outBuffer = ByteBuffer.wrap("服务器已接收".getBytes());
            channel.write(outBuffer);// 将消息回送给客户端
        }else{
            System.out.println("客户端关闭");
            key.cancel();
        }
    }
}

NIO客户端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class NIOClient {
    public static void main(String[] args) throws Exception {
        final int count[]=new int[1];
        count[0]=1;
        for(int i=0;i<5;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    SocketChannel socketChannel = null;
                    //发送的数据
                    String str = "御神楽"+count[0]++;
                    ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());

                    //接受的数据
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    try {
                        //建立连接
                        socketChannel = SocketChannel.open();
                        socketChannel.configureBlocking(false);
                        if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8081))) {
                            //等待连接
                            while (!socketChannel.finishConnect()) {
                            }
                        }
                        //写入数据
                        socketChannel.write(byteBuffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }

                    //10s后自动断开连接
                    int time=1;
                    while (time<10){
                        time++;
                        try {
                            //读取数据
                            int read=socketChannel.read(buffer);
                            if(read > 0) {
                                byte[] data = buffer.array();
                                String msg = new String(data).trim();
                                System.out.println("客户端收到信息:" + msg);
                            }
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            Thread.sleep(100);
        }
    }

}

测试效果:
服务器:
在这里插入图片描述
客户端:
在这里插入图片描述

3.基于Netty

Netty服务端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

public class NettyServer {
    public static void main(String[] args) {
        //用于处理服务器端接收客户端连接
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //进行网络通信(读写)
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //辅助工具类,用于服务器通道的一系列配置
            ServerBootstrap bootstrap = new ServerBootstrap();
            //绑定两个线程组
            bootstrap.group(bossGroup,workerGroup)
                    //设置boss selector建立channel使用的对象
                    .channel(NioServerSocketChannel.class)
                    //boss 等待连接的 队列长度
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //处理消息对象
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //创建管道
                            ChannelPipeline pipeline = ch.pipeline();
                            //解码方式
                            pipeline.addLast("decoder",new StringDecoder());
                            //编码方式
                            pipeline.addLast("encoder",new StringEncoder());
                            //自定义处理消息对象
                            pipeline.addLast(new ServerHandler());
                        }
                    });
            System.out.println("服务器正在启动");
            //绑定端口号
            ChannelFuture cf = bootstrap.bind(8083).sync();

            cf.addListener(cd->{
                if(cd.isSuccess()){
                    System.out.println("启动成功");
                }else{
                    System.out.println("启动失败");
                }
            });
            //服务端给所有客户端发信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()){
                String msg = scanner.nextLine();
                ServerHandler.sendAll(msg);
            }
            //阻塞当前线程
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }



}
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.Date;

public class ServerHandler extends SimpleChannelInboundHandler<String> {
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress()+" == " +msg);
        channelGroup.forEach(ch->{
            if (channel!=ch) {
                ch.writeAndFlush("[ 客户端 ]" + channel.remoteAddress() + "发送了消息 : " + msg + "\n");
            }else{
                ch.writeAndFlush("[ 客户 ] 发送了消息: " + msg + "\n");
            }
        });

    }
    //用于服务端发信息给所有客户端
    public static void sendAll(String msg){
        channelGroup.forEach(channel -> {
            channel.writeAndFlush("服务器: "+msg+"\n");
        });
    }


     // 当有新的用户连接触发

    public void channelActive(ChannelHandlerContext ctx){
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[ 客户端]"+channel.remoteAddress()+" 已连接 "+sf.format(new Date())+"\n");
        //把新来的连接加入
        channelGroup.add(channel);
        System.out.println(ctx.channel().remoteAddress()+" 上线了" + "\n");
    }


     //当用户断开连接触发

    public void channelInactive(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[ 客户端 ] " +channel.remoteAddress()+ " 断开连接"+"\n");
        System.out.println(channel.remoteAddress()+" 下线了.\n");
        System.out.println("channelGroup size = "+ channelGroup.size());
    }


}

Netty客户端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.ArrayList;
import java.util.List;

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        List<ChannelFuture> channelFutures = new ArrayList<ChannelFuture>();
        try {
            Bootstrap bootstrap = new Bootstrap();
            //服务器可以主动断开连接
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            //地址复用
            bootstrap.option(ChannelOption.SO_REUSEADDR, true);
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder",new StringDecoder());
                            pipeline.addLast("encoder",new StringEncoder());
                            pipeline.addLast(new ClientHandler());
                        }
                    });
            final int count[] =new int[1];
            count[0]=0;
            for(int i=0;i<3;i++){
                //添加连接
                channelFutures.add(bootstrap.connect("127.0.0.1",8083).sync());
                //新建线程模拟多用户
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        int index=count[0]++;
                        //获取对应管道
                        Channel channel = channelFutures.get(index).channel();
                        System.out.println( "======"+channel.localAddress()+"======");
                        int time=0;
                        while (time++<3){
                            //发送数据
                            String msg =" 御神楽 "+(index)+": "+time;
                            channel.writeAndFlush(msg);
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        //关闭连接
                        channel.close();

                    }
                }).start();

            }

            //阻塞主线程,否则会直接执行finally关闭EventLoopGroup
            int time=0;
            while (time++<5){
                Thread.sleep(1000);
            }

        } finally {
            //关闭EventLoopGroup
            group.shutdownGracefully();
        }

    }

}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println(msg.trim());
    }

}

演示
服务端:
在这里插入图片描述
客户端
**加粗样式**


参考

https://blog.csdn.net/linjpg/article/details/80962453
https://blog.csdn.net/qq_47281915/article/details/121802536

标签:Netty,java,NIO,TCP,io,new,import,public,channel
来源: https://blog.csdn.net/YuKaguraNe/article/details/121940407

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有