ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

javaIO模型-Socket实现一个简单的客服聊天功能的改造(二)

2021-03-29 18:59:07  阅读:158  来源: 互联网

标签:java Socket 客服 端口 javaIO import port 服务端 客户端


功能改进-线程池

当然,先不考虑改端口合不合实际场景
我们想达到的流程是这样的:
在这里插入图片描述
再加上线程池,来在一个进程中启动多个服务端和多个客户端(当然因为控制台是唯一的,所以没办法模拟通信了)。
话不多说,代码如下:
服务端,做了部分改造:

  1. if判断改成了switch
package com.test.sf.socket;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Objects;

/**
 * 服务端
 * @author luhui
 */
public class SocketServerDemo {

    private Socket socket = null;
    private ServerSocket serverSocket = null;
    /**
     * 来自客户端的输入流
     */
    private BufferedReader inFromClient = null;
    /**
     * 发向客户端的输出流
     */
    private PrintWriter outForClient = null;
    /**
     * 服务端关闭标志
     */
    public final static String CLOSE_FLAG = "close";

    SocketServerDemo(int port) {
        // 1.0 服务端监听port端口
        // 如果是本地多网卡的情况,最好手动指定一个ip,不然会随机取一个ip绑定
        try {
            serverSocket = new ServerSocket(port);
            System.out.println("客服系统运行中...");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void startServer() {
        try {
            waitClient();
            listenClient();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                // 关闭资源
                outForClient.close();
                inFromClient.close();
                serverSocket.close();
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void waitClient() throws IOException {

        // 2.0 服务端开始接受消息,没客户端连接之前一直阻塞在这里
        socket = serverSocket.accept();

        // 3.0 代码到这里,说明有客户端建立了连接,也就是TCP三次握手成功了,二者可以正常通信
        //-- 3.1 从socket获取输入流,用来接受客户端输入的信息
        inFromClient = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        //-- 3.2 从socket获取输出流,用来向客户端输出信息
        outForClient = new PrintWriter(socket.getOutputStream(), true);

        // 4.0 向客户端输出一条自动回复消息
        outForClient.println("智能客服:您好!我是智能客服小V,有什么需要帮助的么");
    }

    public void listenClient() throws IOException {
        // 5.0 监听等待客户端传来的输入流
        String readline = inFromClient.readLine();
        // 只要客户不输入'close',服务端就一直循环接受并处理客户端的输入流
        while (!CLOSE_FLAG.equals(readline)) {
            if (Objects.isNull(readline)) {
                continue;
            }
            // 简单模拟服务端的处理逻辑
            switch (readline) {
                case "咨询":
                    outForClient.println("智能客服:需要咨询请拨打我们的热线12345");
                    break;
                case "客服":
                    outForClient.println("智能客服:我们请不起人工客服");
                    break;
                case "bye":
                    outForClient.println("再见!!");
                    socket.close();
                    this.waitClient();
                    break;
                default:
                    outForClient.println("智能客服:你的输入不合法,请重新输入");
            }
            readline = inFromClient.readLine();
        }

        // 跳出了循环,说明客户端输入了close,服务端关闭
        outForClient.println("智能客服系统关闭");
    }

}

客户端

  1. 由于换成了线程池启动,所以只保留一个类即可。之所以不用之前的方式,启动好几个进程,是因为这样没法动态的控制端口这个共享资源。进程间通信就不是java多线程能解决的问题了。
package com.test.sf.socket;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;

/**
 * 客户端
 * @author luhui
 */
public class SocketClientDemo {
    private Socket socket = null;
    /**
     * 来自控制台的输入流
     */
    private BufferedReader inFromControler = null;
    /**
     * 来自服务器的输入流
     */
    private BufferedReader inFromServer = null;
    /**
     * 发向服务器的输出流
     */
    private PrintWriter outForServer = null;
    /**
     * 客户端关闭标志
     */
    public final static String BYE_FLAG = "bye";

    public void startClient() {
        try {
            connectServer();
            chatWithServer();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                outForServer.close();
                inFromServer.close();
                inFromControler.close();
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    SocketClientDemo(String host, int port) {
        try {
            // 客户端请求ip地址为host的地址,端口为port的应用,也就是服务端,如果服务端没启动,执行这行代码会直接报错,connect refuse
            socket = new Socket(host, port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void connectServer() throws IOException {
        inFromServer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        outForServer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
        inFromControler = new BufferedReader(new InputStreamReader(System.in));
        // 接收到服务器传来的欢迎消息,同理服务器没发送消息的话也会一直阻塞
        System.out.println(inFromServer.readLine());
    }

    public void chatWithServer() throws IOException {

        // 准备向服务器发送数据,数据来自控制台
        String readline = inFromControler.readLine();
        while (!BYE_FLAG.equals(readline) && !SocketServerDemo.CLOSE_FLAG.equals(readline)) {
            // 只要客户端输入的不是'bye'或者'close',就循环向服务端发送
            outForServer.println(readline);
            // 输出服务端发回的消息
            System.out.println(inFromServer.readLine());
            // 继续等待客户端的输入
            readline = inFromControler.readLine();
        }

        // 将bye发送给服务器
        outForServer.println(readline);
        // 打印出服务端的结束语
        System.out.println(inFromServer.readLine());
    }
}

端口管理类,用来管理端口这个共享资源

  1. 服务端可用端口集合用ConcurrentHashMap,可以满足存储端口-可用性,且保证线程安全
  2. 客户端可用端口集合用LinkedBlockingQueue,是因为在客户端没有查询到可用端口,会去启动一个服务端,等到这个服务端启动之后才会返回可用端口,这个过程需要时间,而且是多线程操作,所以用阻塞队列正合适
package com.test.sf.socket;

import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 端口管理类
 * @author luhui
 */
public class PortManager {

    /**
     * 维护一个服务端可用端口的键值对,这是一个所有线程共用的资源
     */
    private static final ConcurrentHashMap<Integer, Boolean> SERVER_PORT_MAP = new ConcurrentHashMap<>();

    /**
     * 维护一个客户端可用端口的阻塞队列
     */
    private static final LinkedBlockingQueue<Integer> CLIENT_PORT_LINK = new LinkedBlockingQueue<>(10);

    /**
     * 服务端端口初始化标志
     */
    private static Boolean initFlag = false;

    PortManager() {
        initPortMap();
    }

    /**
     * 初始化端口
     */
    private static void initPortMap() {
        SERVER_PORT_MAP.put(8070, true);
        SERVER_PORT_MAP.put(8080, true);
        SERVER_PORT_MAP.put(8090, true);
        initFlag = true;
    }

    /**
     * 获取并占用服务端可用的端口
     */
    public static Integer getAvailableServerPort() {
        if (!initFlag) {
            initPortMap();
        }
        Set<Map.Entry<Integer, Boolean>> entrySet = SERVER_PORT_MAP.entrySet();
        for (Map.Entry<Integer, Boolean> entry : entrySet) {
            if (entry.getValue()) {
                entry.setValue(false);
                return entry.getKey();
            }
        }
        throw new RuntimeException("系统端口已被全部占用!");
    }

    /**
     * 获取并占用客户端可用的端口
     */
    public static Integer getAvailableClientPort() throws InterruptedException {
        // 非阻塞
        Integer port = CLIENT_PORT_LINK.poll();
        if(Objects.isNull(port)){
            ServerProxy.newServer(getAvailableServerPort());
            // 阻塞等待
            port = CLIENT_PORT_LINK.take();
        }
        return port;
    }

    /**
     * 使指定的客户端端口生效
     */
    public static void addClientPort(Integer port) {
        CLIENT_PORT_LINK.add(port);
        System.out.println(port + "端口添加到客户端可用队列");
    }

    /**
     * 使指定的客户端端口失效
     */
    public static void occupyClientPort(Integer port) {
        CLIENT_PORT_LINK.remove(port);
        System.out.println(port + "端口已被客户端消费!");
    }

    /**
     * 归还服务端被占用的端口,失效对于客户端来说可访问的端口
     */
    public static void releaseServerPort(Integer port) {
        SERVER_PORT_MAP.put(port, true);
        CLIENT_PORT_LINK.remove(port);
        System.out.println("服务端" + port + "端口释放!");
        System.out.println("客户端可用端口" + port + "端口移除!");
    }
}

服务端代理类,用来存放服务端多线程启动的逻辑,与业务逻辑解耦

  1. 其实createNewServer最好也解耦出来,这里为了类少点,方便阅读,就放一起了
package com.test.sf.socket;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 服务端代理类(其实也可以说是委派类,怎么理解都行)
 * @author luhui
 */
public class ServerProxy {

    /**
     * 线程池工厂,用于指定线程名字(便于调试)
     */
    private final static ThreadFactory SERVER_THREAD_FACTORY = new BasicThreadFactory.Builder().namingPattern("serverThreadFactory-%d").build();
    /**
     * IO密集型操作,线程池可以适当设的大一点
     */
    private static final ExecutorService SERVER_EXECUTOR = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), SERVER_THREAD_FACTORY);

    public static void newServer(Integer port) {
        SERVER_EXECUTOR.execute(() -> createNewServer(port));
    }

    public static void createNewServer(Integer port) {
        // 用端口再次设置线程名字
        Thread.currentThread().setName("ServerThread-" + port);
        // 服务端启动
        SocketServerDemo socketServerDemo = new SocketServerDemo(port);
        // 此端口进入客户端可用队列
        PortManager.addClientPort(port);
        socketServerDemo.startServer();
        // 服务端停止,释放端口
        PortManager.releaseServerPort(port);
    }
}

客户端代理类

package com.test.sf.socket;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 客户端代理类
 *
 * @author luhui
 */
public class ClientProxy {

    private final static ThreadFactory CLIENT_THREAD_FACTORY = new BasicThreadFactory.Builder().namingPattern("clientThreadFactory-%d").build();

    private static final ExecutorService CLIENT_EXECUTOR = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), CLIENT_THREAD_FACTORY);

    public static void startOneClient() {
        CLIENT_EXECUTOR.execute(() -> {
            try {
                newClient();
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    public static void newClient() throws ExecutionException, InterruptedException {
        // 获取客户端可用端口
        Integer availableClientPort = PortManager.getAvailableClientPort();
        // 设置当前线程名字
        Thread.currentThread().setName("clientThread-" + availableClientPort);
        // 启动客户端
        SocketClientDemo socketClientDemo = new SocketClientDemo("127.0.0.1", availableClientPort);
        // 消费此端口
        PortManager.occupyClientPort(availableClientPort);
        // 连接服务端
        socketClientDemo.startClient();
        // 释放此端口
        PortManager.addClientPort(availableClientPort);
    }
}

启动类

提示,可以使用多线程的调试模式来查看一些参数

package com.test.sf.socket;

import java.util.concurrent.ExecutionException;

public class SocketMain {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ClientProxy.startOneClient();
        ClientProxy.startOneClient();
        ClientProxy.startOneClient();
    }
}

IO的两大概念

首先,我们这里的IO一般指网络通信中的IO,不同的应用彼此建立联系。

阻塞与非阻塞

阻塞与非阻塞是一种应用访问数据时,根据返回结果而采取的行动。
直白点讲,就是一种读取或者写入操作的实现方式。
比如上面的BufferReader.readline就是阻塞的,PrintWriter.println就是非阻塞的
再比如上面的阻塞队列中,take方法就是阻塞的,如果取不到值就一直等待,轮询去查
poll方法就是非阻塞的,如果取不到值就返回一个null。

同步与异步

  • 同步就是发起一个调用后,被调用者在处理完返回最后的结果以前,被调用者需要一直去查询,是不是有结果了是不是有结果了。
  • 异步就是发起一个调用后,被调用者立马返回一个结果,表示我收到请求了,你等我消息,被调用者不需要去查询是否有结果,被调用者会通过回调函数或者事件来通知调用者。

二者区别

同步异步是对于应用程序与内核交互而言的,阻塞与非阻塞是看是否立即返回结果,是应用程序自己的处理方式。
他们的场景可能一样,但是针对的层面不一样。
阻塞与非阻塞对于用户来说是感受很明显的,比如阻塞了,线程就直接挂起了,啥也干不了。
但同步与异步的时候,线程都可以做其他事情,只不过同步的时候,后台的cpu需要不断的去主动查询,比较耗资源。

java对IO模型的封装

有了上面的实例,简单介绍一下,深度学习留到之后的文章,本文不再延申

BIO的缺点

其实上面聊天室这种模式就是早期的BIO模式(Blocking I/O),同步阻塞的IO,jdk1.4之前,都是靠这个建立网络通信的。
缺点也很明显了,动不动就阻塞,所以我们尝试用多线程改善。
但是,线程也要占用内存占用资源的,是有限的,比起庞大的连接数来说,线程数量太少了。
所以为了解决这种情况,出现了NIO

NIO的特性

NIO(New I/O)是同步非阻塞的,基于通道(Channel)、缓冲区(Buffer)、选择器(Selector)三大核心组件。

  • Buffer:java中的IO是面向流的,注定了它的阻塞,NIO是面向缓冲区的,数据的读写操作都是在缓存区中操作的
  • Channel:java中的读写是单向的,因为流是单向的,Channel是双向的,结合缓冲区,可以使得数据读写的时候实现非阻塞(eg:读数据的时候,让它读到缓冲区中,线程去做其他事情,期间轮询去查有没有读好,读好以后用通道把它从缓冲区拿出来)。Socket->SocketChannel,Socket->ServerSocketChannel
  • Selector: 也可以叫多路复用器,减少线程的切换,它不再是一个连接就占用一个线程,所有的连接建立都使用一个或者少量线程来执行,建立好以后把连接注册在Selector上面。等到多Selector轮询它的连接时,发现哪个连接上有请求,就分配一个工作线程去处理。说白了,就是不为空连接分配线程

NIO的缺点

  1. 原生API使用起来异常繁琐,而且空循环轮询查又容易出bug又消耗性能
  2. 当并发量上来的时候,如果后端由请求JDBC的这种情况,查询时间比较长,线程被占掉,资源还是会被浪费。

HTTP/1.1之后,出现了http长连接,可以长时间的保持连接保持状态。
有了这个基础,我们就可以考虑,不让线程白白等待浪费时间,在后端处理jdbc这种请求的时候,线程可以去做其他的事情,处理其他请求,本次请求保留好现场,等有了返回值,再触动线程去处理。
达到异步处理的效果,增加吞吐量

AIO的出现

如果说BIO是一个连接一个线程,NIO是一个请求一个线程,那么AIO(Asynchronous I/O)是一个有效请求一个线程。
它是一个异步非阻塞的模型,异步基于事件和回调机制实现。

标签:java,Socket,客服,端口,javaIO,import,port,服务端,客户端
来源: https://blog.csdn.net/qq_31363843/article/details/115307367

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有