ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java NIO多线程服务器实现

2022-05-14 20:35:51  阅读:180  来源: 互联网

标签:Java NIO java selector 线程 sc import 多线程 channel


模型原理图

BOSS线程(ServerSocketChannel)专门负责建立链接,然后将accept到的SocketChannel分发给多个Worker线程。Worker线程有多个,可以分摊来自多个Client的SocketChannel。Worker线程专门负责read和write。

NIO多线程服务器实现

server端实现
package niomultithreadserver;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;


public class MultiThreadServer {
    public static void main(String[] args) throws IOException {
        // 设置boss线程的名称为"boss"
        Thread.currentThread().setName("boss");
        // 创建ServerSocketChannel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        /**
         * 1. ServerSocketChannel配置为非阻塞模式
         * 2. selector通常都是和非阻塞channel进行搭配
         * 非阻塞channel一旦感兴趣的事件,则可以通过selector.select()方法将事件追加到selector的selectedKeys中
         */
        ssc.configureBlocking(false);
        // 创建boss线程的selector,该selector专门监听accept事件
        Selector boss = Selector.open();
        // 将创建好的ServerSocketChannel和关注的accept事件注册到该selector
        SelectionKey bossKey = ssc.register(boss, 0, null);
        bossKey.interestOps(SelectionKey.OP_ACCEPT);
        // ServerSocketChannel和端口进行绑定
        ssc.bind(new InetSocketAddress(8088));
        // 创建一定数量的worker
        int cpuNum = Runtime.getRuntime().availableProcessors();
        System.out.println("cpuNum:" + cpuNum);
        Worker[] workers = new Worker[cpuNum];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new Worker("worker-" + i);
            workers[i].initWorker();
        }

        AtomicInteger count = new AtomicInteger();
        while (true) {
            /**
             * 轮训check查询的时候是否就绪,如果未就绪则select不会返回,只有监听的事件发生select()方法才返回
             * 每个selector中有两个集合:
             * 集合1: interested keys集合,存储的是当前selector感兴趣的channel + 事件类型(accept, connect, read, write)
             * 集合2:selected keys集合,存储的是selector.select()方法调用之后扫描到的新发生的事件
             * 注意每次select()方法调用时,可以理解为是往"selected keys集合"中追加本次select()新扫描的事件,上次select()并且加入的事件不会自动清除。
             * 因此后面迭代器迭代"selected keys集合"时需要及时remove掉,否则下次循环会重复消费处理
             */
            boss.select();
            Iterator<SelectionKey> bossSelectedKeysIter = boss.selectedKeys().iterator();
            while (bossSelectedKeysIter.hasNext()) {
                SelectionKey key = bossSelectedKeysIter.next();
                /**
                 * 注意及时remove掉已经监听到并且马上就要处理的事件
                 * 因为selector的selectedKeys集合不会自动将key清除掉,这会导致下次循环重复处理
                 */
                bossSelectedKeysIter.remove();
                if (key.isAcceptable()) {
                    SelectableChannel keyChannel = key.channel();
                    ServerSocketChannel sscFromSelector = (ServerSocketChannel) keyChannel;
                    System.out.println("sscFromSelector == ssc?" + (sscFromSelector == ssc));

                    SocketChannel sc = sscFromSelector.accept();
                    sc.configureBlocking(false);
                    // 建立链接打印日志
                    System.out.println("线程:" + Thread.currentThread().getName() + ":" + "connected --- " + sc.getRemoteAddress());
                    // 分配worker
                    System.out.println("线程:" + Thread.currentThread().getName() + ":" + "before register --- " + sc.getRemoteAddress());
                    workers[count.incrementAndGet() % workers.length].registerChannel(sc);
                    System.out.println("线程:" + Thread.currentThread().getName() + ":" + "after register --- " + sc.getRemoteAddress());
                }
            }
        }
    }

    static class Worker implements Runnable {
        private Thread thread;
        private Selector workerSelector;
        private String name;
        ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();

        public Worker(String name) {
            this.name = name;
        }

        public void initWorker() throws IOException {
            thread = new Thread(this, name);
            workerSelector = Selector.open();
            thread.start();
        }

        /**
         * 将一个SocketChannel分配给当前worker
         *
         * @param sc
         */
        public void registerChannel(SocketChannel sc) {
            queue.add(() -> {
                try {
                    sc.register(workerSelector, SelectionKey.OP_READ, null);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });
            // 注册到任务队列之后唤醒一下
            workerSelector.wakeup();
        }

        @Override
        public void run() {
            while (true) {
                try {
                    workerSelector.select();
                    /**
                     * 注册分配的channel
                     */
                    Runnable scRegisterTask = queue.poll();
                    if (scRegisterTask != null) {
                        scRegisterTask.run();
                    }

                    Iterator<SelectionKey> workerSelectedKeysIter = workerSelector.selectedKeys().iterator();
                    while (workerSelectedKeysIter.hasNext()) {
                        SelectionKey key = workerSelectedKeysIter.next();
                        workerSelectedKeysIter.remove();
                        if (key.isReadable()) {
                            SocketChannel channel = (SocketChannel) key.channel();
                            ByteBuffer byteBuffer = ByteBuffer.allocate(16);
                            /**
                             * 将channel中的数据写入buffer
                             * 注意:两种情况需要做好处理:
                             * 1. 如果客户端正常关闭了socket,则read返回的是-1,这时需要将客户端对应的channel从boss selector中cancel
                             * 2. 如果客户端非正常关闭了socket,则需要捕获read方法,并将客户端对应的channel从boss selector中cancel
                             */
                            int readCnt = channel.read(byteBuffer);
                            if (readCnt == -1) {
                                System.out.println("线程:" + Thread.currentThread().getName() + ":" + "客户端关闭链接,取消channel监听...");
                                key.cancel();
                            }
                            System.out.println("线程:" + Thread.currentThread().getName() + ":" + "read data --- " + channel.getRemoteAddress());
                            // 切换buffer为读模式
                            byteBuffer.flip();
                            // 读取
                            System.out.println(Charset.forName("UTF-8").decode(byteBuffer));
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

client端实现
package niomultithreadserver;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8088));
        sc.write(Charset.forName("UTF-8").encode("123abc"));
        System.in.read();
    }
}

标签:Java,NIO,java,selector,线程,sc,import,多线程,channel
来源: https://www.cnblogs.com/iamswf/p/16271353.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有