ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

IO

2022-03-05 03:00:23  阅读:177  来源: 互联网

标签:String int buffer IO new public


目录

网络IO

1. 网络IO

1.1 什么是IO流以及IO流的作用

I/O实际上是Input和Output,也就是输入和输出。而流其实是一种抽象的概念,它表示的是数据的无结构化传递。会被当成无结构的字节序列或字符序列。流可以当作是磁盘与内存之间的一个管道。

1.2 IO流的分类

在Java中I/O流操作很多,但是核心体系实际上就只有File(文件流)、InputStream(字节输入流)、OutputStream(字节输出流)、Reader(字符输入流)、Writer(字符输出流)。

image-20220301161506677

  • 字节流:操作的数据单元是8位的字节。InputStream、OutputStream作为抽象基类。可以处理所有的数据文件。
  • 字符流:操作的数据单元是字符。以Writer、Reader作为抽象基类。只限于处理文本的数据文件。
  • 访问管道处理流,是用来去完成管道的读写操作,用于线程间的通讯
  • 访问数组处理流,是针对内存的操作
  • 缓冲流是提供一个缓冲区,对于缓冲区的一个处理流,避免每次与磁盘的交互,提高输入输出的一个效率
  • 对象流,主要用在序列化这个机制上,将一个对象序列化后转换成一个可存储可传输的对象,传输时用到的流。
  • 转换流:将字符流转换成字节流
  • 打印流

image-20220301162730551

2. IO流的数据来源及操作的API

  • 硬盘
  • 内存
  • 键盘
  • 网络

2.1 File类简介

File类是Java中为文件进行创建、删除、重命名、移动等操作而设计的一个类

  • File(File parent, String child):根据parent抽象路径名和child路径名字符串创建一个新的File实例。
  • File(String pathname):将指定路径名转化为抽象路径名创建一个新的File实例。
  • File(String parent, String child):根据parent路径名和child路径名创建一个File实例。
  • File(URI uri):指定URI转化为抽象路径名。

2.2 基于文件的输入输出流

public static void main(String[] args) {
    File file = new File("D:\\appdata\\IODemo\\Capture001.png");
    try (
        FileOutputStream fileOutputStream = new FileOutputStream("D:\\appdata\\IODemo\\Capture002.png");
        FileInputStream fileInputStream = new FileInputStream(file)) { // 1.7之后,将流写入try()中,代码执行完毕后,会自动关闭流
        int len = 0;
        byte[] buffer = new byte[1024];
        long start = System.currentTimeMillis();
        while ((len = fileInputStream.read(buffer)) != -1) {
            fileOutputStream.write(buffer, 0, len);
        }
        long end = System.currentTimeMillis();
        System.out.println((end - start) / 1000);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

流一定要关闭,否则当前线程没执行完会一直使其被进程占用。

try (FileReader reader = new FileReader("/appdata/IODemo/IODemo");
     FileWriter writer = new FileWriter("/appdata/IODemo/IODemo.txt")) {
    int i = 0;
    char[] chars = new char[1];
    while ((i = reader.read(chars)) != -1) {
        writer.write(new String(chars, 0, i));
    }
} catch (Exception e) {
    e.printStackTrace();
}

2.3 缓冲流

缓冲流是带缓冲区的处理流,他会提供一个缓冲区,缓冲区的作用主要目的是:避免每次和硬盘打交道,能够提高输入/输出的执行效率。

BufferedInputStream

private static int DEFAULT_BUFFER_SIZE = 8192; // 默认8Kb的缓冲区
private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8; // 最大缓冲区大小
// 每次读取的8Kb size的字节会存储在buf[]数组中
//每次调用read()方法时,会首先去尝试从这个数组中读取,如果读取失败,会从数据源(磁盘上)去读取
protected volatile byte buf[];

// 两种构造方法最终调用该方法,带int参数的会覆盖默认的8Kb size
public BufferedInputStream(InputStream in, int size) {
    super(in);
    if (size <= 0) {
        throw new IllegalArgumentException("Buffer size <= 0");
    }
    buf = new byte[size];
}

其实缓冲流原理上是帮我们封装了8Kb大小的数据,先从磁盘读8Kb到我们内存,后由我们自己去操作这8Kb的数据,当处理完8Kb缓冲区没有了,再加载数据到缓冲区,再读到内存去处理。当我们用普通流去处理文件,将buffer[]设置的稍微大一点,一样可以达到提高效率的结果。

public static void main(String[] args) {

    try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream("/appdata/IODemo/IODemo"));
         BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream("/appdata/IODemo/IODemo.txt"))) {
        int len = 0;
        byte[] bytes = new byte[1024];
        while ((len = bufferedInputStream.read(bytes)) != -1) {
            // System.out.println(new String(bytes, 0, len));
            bufferedOutputStream.write(bytes, 0, len);
            bufferedOutputStream.flush();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }

}

将创建InputStream写入到try()中,可以帮我们实现close()关闭流的操作,这个close中包含了buffred的flush操作,如果没有关闭流,又没有手动flush(),将会丢失数据。

public void close() throws IOException {
    try (OutputStream ostream = out) {
        flush();
    }
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("/appdata/IODemo/IODemo"), StandardCharsets.UTF_8))) {
    String str;
    while ((str = reader.readLine()) != null) {
        System.out.println(str);
    }
} catch (Exception e) {
    e.printStackTrace();
}

2.4 转换流

try (InputStream inputStream = new FileInputStream("/appdata/IODemo/IODemo");
     InputStreamReader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8)) {
    char[] chars = new char[1024];
    int i;
    while ((i = reader.read(chars)) != -1) {
        System.out.println(new String(chars, 0, i));
    }
} catch (Exception e) {
    e.printStackTrace();
}

在这个转换流中,时可以指定字符集编码的。

2.5 对象流

关于序列化和反序列化这个问题,我在18年参加工作的时候,遇到过一个项目,之后就再没有用过了。当时架构还是分布式dubbo+zookeeper,但是传输报文竟然用到这个我是没想到的。

什么是序列化和反序列化?

  • 序列化是把对象的状态信息转化为可存储或传输的形式的过程,也就是把对象转化为字节序列的过程成为对象的序列化
  • 反序列化是序列化的逆向过程,把字节数组反序列化为对象。
public class UserSerializable implements Serializable {

    private static final long serialVersionUID = 8160464260217334369L;

    private String name;

    private int age;

    public void setName(String name) {
        this.name = name;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "UserSerializable{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }

    public static void main(String[] args) {
        UserSerializable user = new UserSerializable();
        user.setAge(26);
        user.setName("Elian");
        String fileName = "/appdata/IODemo/User";
        try (FileInputStream fileInputStream = new FileInputStream(fileName);
             FileOutputStream fileOutputStream = new FileOutputStream(fileName);
             ObjectOutputStream outputStream = new ObjectOutputStream(fileOutputStream);
             ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream)
        ) {
            outputStream.writeObject(user);
            outputStream.flush();
            UserSerializable newUser = (UserSerializable) objectInputStream.readObject();
            System.out.println(newUser);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3. 本地IO和网络IO

3.0 本地I/O操作实例

public class NIOFirstDemo {
    public static void main(String[] args) {
        bio();
        bufferBio();
        nio();
        mmap();
        zeroCopy();
    }

    private static void bio() {
        try (FileInputStream bioInputStream = new FileInputStream("/appdata/IODemo/jdk api 1.8_google.CHM");
             FileOutputStream bioOutputStream = new FileOutputStream("/appdata/IODemo/jdk_bio.CHM")) {
            // bio实现copy
            long bioStart = System.currentTimeMillis();
            int len = 0;
            byte[] buffer = new byte[1024];
            while ((len = bioInputStream.read(buffer)) != -1) {
                bioOutputStream.write(buffer, 0, len);
            }
            bioOutputStream.flush();
            System.out.println(System.currentTimeMillis() - bioStart);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void bufferBio() {
        try (BufferedInputStream bioInputStream = new BufferedInputStream(new FileInputStream("/appdata/IODemo/jdk api 1.8_google.CHM"));
             BufferedOutputStream bioOutputStream = new BufferedOutputStream(new FileOutputStream("/appdata/IODemo/jdk_bufferBio.CHM"))) {
            // bio实现copy
            long bioStart = System.currentTimeMillis();
            int len = 0;
            byte[] buffer = new byte[1024];
            while ((len = bioInputStream.read(buffer)) != -1) {
                bioOutputStream.write(buffer, 0, len);
            }
            bioOutputStream.flush();
            System.out.println(System.currentTimeMillis() - bioStart);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void nio() {
        try (FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
             FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk_nio.CHM"),
                     StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
            // nio 实现copy
            long nioStart = System.currentTimeMillis();
            int len = 0;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while ((len = inChannel.read(buffer)) != -1) {
                buffer.flip();
                outChannel.write(buffer);
                buffer.clear();
            }
            System.out.println(System.currentTimeMillis() - nioStart);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 依然将用户空间的
    private static void mmap() {
        try (FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
             FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdb_mmap.CHM"),
                     StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
            long mmapStart = System.currentTimeMillis();
            MappedByteBuffer inMappedBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
            MappedByteBuffer outMappedBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
            byte[] bytes = new byte[inMappedBuffer.limit()];
            inMappedBuffer.get(bytes);
            outMappedBuffer.put(bytes);
            System.out.println("mmap:" + (System.currentTimeMillis() - mmapStart));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void zeroCopy() {
        try(FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
            FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk_zeroCopy.CHM"),
                    StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
            long zeroCopyStart = System.currentTimeMillis();
            inChannel.transferTo(0, inChannel.size(), outChannel);
            System.out.println(System.currentTimeMillis() - zeroCopyStart);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

实验顺序(速度由快到慢排序)

zeroCopy(零拷贝) > mmap(内存映射) > bufferedInputStream > bio(基于channle) ~= nio

zerCopy无需将文件映射到内存,mmap会将buffer读进内存,关于Buffer继续往下看4.2。

3.1 Socket和ServerSocket

// 服务端
final int DEFAULT_PORT = 9090;
try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
    Socket socket = serverSocket.accept();// 阻塞操作,等待客户端的连接
    System.out.println("Client port:" + socket.getPort() + " has been connected!");
    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
    String str = bufferedReader.readLine();
    System.out.println("Client Content:" + str);
    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
    writer.write(str + "\n"); // 如果不换行,客户端会一直等待读取完
    writer.flush();
    bufferedReader.close();
    writer.close();
} catch (Exception e) {
    e.printStackTrace();
}

try (Socket socket = new Socket("localhost", 9090)) {
    OutputStream outputStream = socket.getOutputStream();
    outputStream.write("Hello Elian\n".getBytes(StandardCharsets.UTF_8)); // 不换行服务端会一直等待读取完,进入阻塞
    BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    System.out.println(reader.readLine());
} catch (Exception e) {
    e.printStackTrace();
}

3.2 网络通讯协议分析

image-20220302125049178

image-20220302125308839

客户端是怎样找到目标服务的呢?

客户端发起请求的时候,在不同的层去增加不同的协议头,在数据链路层组装目标机器的Mac地址,这个地址是通过ARP协议,我们已知目标的IP,需要获得目标的Mac地址,会发送一个广播消息,会在网段内去询问这个IP是谁,目标地址会发送自己Mac地址给到当前这个发送端,就可以去组装目标的Mac地址。那么在数据发送过程中,进入IP广播后,某个网卡就会发现,对应Mac的网卡就会把数据包收进来。

3.3 网络通信原理

本地磁盘IO通信:

image-20220302142120134

网络磁盘通信:

image-20220302142142094

两者不同在于:本地磁盘要通过DMA(直接存储访问器)将磁盘上的内容读取到内核空间缓冲区,再从内核空间缓冲区读到用户空间缓冲区进行操作。而网络IO是通过网卡中的缓冲区读取到系统内核缓冲区,如果应用进程一直没有调用socket的read()方法读取数据将数据copy到用户缓冲区,数据会一直被缓存在内核缓冲区里面。

理解阻塞过程

image-20220302143625641

accept()每次只能接收一个并处理一个socket,这样只能等上一个socket处理完才能继续处理下一个请求。BIO每次阻塞两个位置,第一个阻塞位置是accept过程,另一个阻塞过程是I/O流读写的过程。

解决办法:通过线程池进行处理。

public static void main(String[] args) {
    final int DEFAULT_PORT = 9090;
    try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        while (true) {
            Socket socket = serverSocket.accept();// 阻塞操作,等待客户端的连接
            executorService.submit(new ServerSocketThread(socket));
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public class ServerSocketThread implements Runnable {

    private Socket socket;

    public ServerSocketThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        System.out.println("Client port:" + socket.getPort() + " has been connected");
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            BufferedWriter writer=new BufferedWriter((new OutputStreamWriter(socket.getOutputStream())))){
            String clinetStr = reader.readLine();
            System.out.println("Client resived message: " + clinetStr);
            Thread.sleep(15000);
            writer.write("OK.\n");
        } catch(Exception e){
            e.printStackTrace();
        }
    }
}

现在还有一个缺点:

线程数取决于计算机本身的线程数,但是线程数设置太大,又会造成线程之间切换造成的资源消耗。

3.4 手写RPCDemo

RPC(Remote Procedure Call) 远程过程调用,是一种通过网络从计算机程序上请求服务,而不需要了解底层网络技术的协议。一般用来实现部署在不同机器上的系统之间的方法调用,使得程序能够像访问本地系统资源一样,通过网络传输去访问远端系统资源。

image-20220302164640587

// 1. 公共类
// 接口
public interface IHelloWorld {
    String sayHello(String content);
}

// Request
public class RpcRequest implements Serializable {
    
    private static final long serialVersionUID = -7922155162004878476L;
    
    private String className;
    private String methodName;

    private Object[] parameters;
    private Class[]  types;

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object[] getParameters() {
        return parameters;
    }

    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }

    public Class[] getTypes() {
        return types;
    }

    public void setTypes(Class[] types) {
        this.types = types;
    }
}

// 2. provider
// impl
public class HelloWorldImpl implements IHelloWorld {
    @Override
    public String sayHello(String content) {
        return "Hello " + content;
    }
}

// Server
public class RpcProxyServer {
    private final ExecutorService executorService = Executors.newCachedThreadPool();

    public void publisher (int port) {
        try (ServerSocket server = new ServerSocket(port)) {
            while (true) {
                final Socket socket = server.accept();
                executorService.execute(new ProcessorHandler(socket));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class ProcessorHandler implements Runnable {
    private final Socket socket;
    public ProcessorHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
        ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())) {
            RpcRequest request = (RpcRequest)objectInputStream.readObject();
            Object object = invoke(request);
            objectOutputStream.writeObject(object);
            objectOutputStream.flush();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private Object invoke(RpcRequest request) throws Exception {
        Class<?> clazz = Class.forName(request.getClassName());
        Method method = clazz.getMethod(request.getMethodName(), request.getTypes());
        if (request.getClassName().substring(request.getClassName().lastIndexOf('.') + 1).equals("IHelloWorld"))
            return method.invoke(new HelloWorldImpl(), request.getParameters());
        else
            return null;
    }
}

// 3.consumer
// client
public class App 
{
    public static void main( String[] args )
    {
        RpcProxyClient client = new RpcProxyClient();
        IHelloWorld iHelloWorld = client.clientProxy(IHelloWorld.class, "localhost", 9090);

        System.out.println(iHelloWorld.sayHello("Elian"));
    }
}

// Client
public class RpcProxyClient {
    public <T> T clientProxy(final Class<T> interfaceCls, final String host, final int port) {
        return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));
    }
}
// 动态代理类
public class RemoteInvocationHandler implements InvocationHandler {

    private String host;
    private int port;

    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest request = new RpcRequest();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameters(args);
        request.setTypes(method.getParameterTypes());

        RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
        Object object = rpcNetTransport.send(request);
        return object;
    }
}
// reader读取返回报文
public class RpcNetTransport {
    private String host;
    private int port;

    public RpcNetTransport(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public Object send( RpcRequest request ) {
        try (Socket socket = new Socket(host, port);
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
             ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())) {
            objectOutputStream.writeObject(request);
            objectOutputStream.flush();
            return objectInputStream.readObject();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

3.4 NIO

五种IO模型

阻塞IO,非阻塞IO,IO复用,信号驱动IO,异步IO,无论哪种IO,都是为了能够提高服务端能够并行处理的连接数量。

  1. 阻塞IO

    image-20220302231856640

应用进程调用accept()时,触发系统把数据从网卡缓冲区复制到内核空间,再从内核空间复制到用户空间,如果这个过程中,数据没有准备好,到数据返回或发生错误返回之前,用户进程一直处于阻塞状态,这个就是阻塞IO。

  1. 非阻塞IO

image-20220302232932498

非阻塞是指用户进程调用accept()后,如果数据没有准备好,会返回一个EWOULDBLOCK状态,并创建一个线程出来不断轮询返回结果。由此可见会增加CPU的消耗。

  1. IO复用

image-20220302233336221

  • select/poll

单个进程可以同时处理多个客户端的网络IO链接,我们可以把所有链接过来的客户端注册到select/poll复用器上,用一个线程或者进程来调用这个select/poll,调用这个select的时候会阻塞,阻塞的时候,内核会去监视所有select/poll所负责的socket,当其中一个socket准备好的时候,那么这个select/poll就会返回,如果再次调用这个select的时候,就会把数据从内核拷贝到用户空间。

select/poll模型最大的缺点是,他只能线性的轮询1024个链接,当然这1024个链接只有少数处于活跃状态,会导致网络的延迟。jdk1.5之前的NIO是使用这种模型。

这种模型处理的情况是:多个不同的监听,而且只是提高了并发连接数,并不是提高单个线程处理性能。连接数少的情况下,不一定比BIO效率更高。

  • epoll

对select/poll进行的优化:

  • 对单个进程所打开的连接数没有限制;
  • 利用每个文件描述符fd上的callback函数来实现异步回调,不需要轮询了;
  • mmap,可以通过内核和用户空间映射同一块内存来减少内存复制。

image-20220302235611165

image-20220304180148962

  1. 信号驱动

image-20220303000101291

  1. 异步IO

image-20220303001110759

总结:

  1. BIO是指accept()过程和读写过程会被阻塞,每个线程只能同时处理一个链接,这个时候线程是不能做别的事情的,我们通过将获取的Socket丢进线程池,来解决能够处理下个监听到的Socket的能力。如果连接数量足够多,这时候性能就会下降,会有其他连接在等待被accept()到,并把它获取的socket丢进线程池。
  2. NIO是一种非阻塞IO,当线程在某个复用器通道读取数据没有读取到,可以进行其他事情的处理,不需要等待连接。

4. 深入分析NIO

4.1 NIO的新特性

image-20220303003151530

相比较老的IO来说,所有操作都是基于Channel和Buffer来说的,可以将Channel看成是InputStream/OutputStream,应用程序与磁盘/网络缓冲区之间的一个通道,而所有数据操作都是通过缓冲区来实现的。

4.2 核心组件

通道(Channel):Java NIO数据来源,可以是网络,也可以是本地磁盘

缓冲区(Buffer):数据读写的中转区

选择器(selectors):异步IO的核心类,可以实现异步非阻塞IO,一个selectors可以管理多个通道Channel

  • Channle

FileChannle:从文件中读取数据

DatagramChannel:通过UDP协议读写网络中的数据

SocketChannel:通过TCP协议读写网络中的数据

ServerSocketChannel:监听一个TCP连接,对于每一个新的客户端连接都会创建一个SocketChannel

  • Buffer

缓冲区本质上是一块可以写入的数据,以及从中读取数据的内存,实际上也是一个byte[]数据,只是在NIO中被封装成了NIO Buffer对象,并提供了一组方法来访问这个内存块,要理解buffer的工作原理,需要知道几个属性:

private int position = 0; // 下一个位置
private int limit;	// 
private int capacity; // 容量,buffer数组初始化的最大容量
private int mark; // 标记
  • 读:position=0; limit = capacity = [size];当要添加的数据byte[].lenth > limit - position时都可以成功。

  • flip():limit=position; position=0,防止多余数据的写出

  • 写:position遍历到limit的过程

  • get():有4个重载方法,get()获取一个单字节,get(int) 获取特定位置的字节,get(byte[]) ,get(byte[], int, int)获取一段字节

  • put():有5个重载,put(byte),put(int, byte),put(ByteBuffer),put(byte[], int, int),put(byte[])

  • 堆内内存:由JVM控制的内存,堆外内存不数据JVM运行时内存,而是用的系统内存,但GC会触发回收。ByteBuffer有两个子类:HeapByteBuffer和DirectByteBuffer

    • HeapByteBuffer:JVM堆内存

    • DirectByteBuffer:堆外本地内存

    • MappedByteBuffer:mmap的内存映射,读写性能极高

      • MappedByteBuffer将文件直接映射到内存。可以映射整个文件,如果文件比较大的话可以考虑分段进行映射,只要指定文件的感兴趣部分就可以。

      • 由于MappedByteBuffer申请的是直接内存,因此不受Minor GC控制,只能在发生Full GC时才能被回收,因此Java提供了DirectByteBuffer类来改善这一情况。它是MappedByteBuffer类的子类,同时它实现了DirectBuffer接口,维护一个Cleaner对象来完成内存回收。因此它既可以通过Full GC来回收内存,也可以调用clean()方法来进行回收

      • FileChannel提供了map方法来把文件映射为内存对象:

        MappedByteBuffer outMappedBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
        

使用堆外内存的原因

  1. 对垃圾回收停顿的改善。因为full gc时,垃圾收集器会对所有分配的堆内内存进行扫描,垃圾收集对Java应用造成的影响,跟堆的大小是成正比的。过大的堆会影响Java应用的性能。如果使用堆外内存的话,堆外内存是直接受操作系统管理。这样做的结果就是能保持一个较小的JVM堆内存,以减少垃圾收集对应用的影响。(full gc时会触发堆外空闲内存的回收。)

  2. 减少了数据从JVM拷贝到native堆的次数,在某些场景下可以提升程序I/O的性能。

  3. 可以突破JVM内存限制,操作更多的物理内存。

使用堆外内存的问题

  1. 堆外内存难以控制,如果内存泄漏,那么很难排查(VisualVM可以通过安装插件来监控堆外内存)。

  2. 堆外内存只能通过序列化和反序列化来存储,保存对象速度比堆内存慢,不适合存储很复杂的对象。一般简单的对象或者扁平化的比较适合。

  3. 直接内存的访问速度(读写方面)会快于堆内存。在申请内存空间时,堆内存速度高于直接内存。

  4. 当直接内存不足时会触发full gc,排查full gc的时候,一定要考虑。

ByteBuffer模型

初始

image-20220304225349332

read(), put()

position = n;
limit = capacity = 8;
mark = -1;

flip()

limit = position; // 用来设置限制
position = 0;
mark = -1;

mark()

mark = postion; // 标记

reset()

position = mark;

clear()实际上数据还在

position = 0;
limit = capacity;
mark = -1;

4.3 零拷贝

  • 正常情况下,将一个文件发送给另一台服务器,需要四次拷贝,首先用户进程调用cpu,从用户空间切换到内核空间,内核空间调用DMA,从硬盘/网卡copy数据到内核空间,然后cpu系统调用,从内核空间copy到用户空间,然后再从用户空间通过cpu系统调用将数据copy到内核空间,再从内核空间通过DMAcopy到网卡缓冲区/硬盘。

image-20220303151649071

  • 零拷贝:不要用户空间了,省略了内核缓冲区拷贝到用户缓冲区。(目前理解是只针对客户端)

image-20220303151849935

Linux支持的零拷贝方式:

  • mmap内存映射:MappedByteBuffer channle.map();的方式
  • sendfile:transferTo/transferFrom
  • sendfile with DMA Scatter/Gather Copy
  • splice

server

public class ZeroCopyServer {
    public static void main(String[] args) {
        try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
             RandomAccessFile writeFile = new RandomAccessFile("/appdata/IODemo/Capture001_zerCopy.png", "rw");
             FileChannel fileChannel = writeFile.getChannel();
        ) {
            long start = System.currentTimeMillis();
            serverSocketChannel.bind(new InetSocketAddress(9090));
            SocketChannel socketChannel = serverSocketChannel.accept();
            ByteBuffer buffer = ByteBuffer.allocate(8192);
            int i = 0;
            int j = 0;
            /*while ((i = socketChannel.read(buffer)) != -1) {
                buffer.flip();
                fileChannel.map(FileChannel.MapMode.READ_WRITE, j, i ).put(buffer);
                buffer.clear();
                j += i;
            }*/ // 2527ms mmap()方式
            while ((i = socketChannel.read(buffer)) != -1) {
                buffer.flip();
                fileChannel.write(buffer);
                buffer.clear();
                j += i;
            } // 4462ms 普通写
            System.out.println("传输大小:" + j + ";时间:" + (System.currentTimeMillis() - start));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

client

public class ZeroCopyClient {
    public static void main(String[] args) {
        try (SocketChannel socketChannel = SocketChannel.open();
             FileChannel fileChannel = FileChannel.open(Paths.get("/appdata/IODemo/Capture001.png"))) {
            socketChannel.connect(new InetSocketAddress("localhost", 9090));
            int position = 0;
            long size=fileChannel.size();
            while (size > 0) {
                long transfer = fileChannel.transferTo(position, fileChannel.size(), socketChannel); // 零拷贝,只从File Copy到缓冲区
                position += transfer;
                size -= transfer;
            }
            System.out.println("上传文件大小:" + position);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4.4 Selector

Selector(选择器,多路复用器)是Java NIO中能够检测一到多个NIO通道,是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

image-20220303163415475

服务端处理过程:

  1. Selector.open()开启一个多路复用器,将ServerSocketChannel注册到selector上,这个ServerSocketServer必然不能是阻塞的,一个Channel会以4种状态注册到selector上:

    • SelectionKey.OP_ACCEPT:可接收
    • SelectionKey.OP_CONNECT:可连接
    • SelectionKey.OP_READ:可读
    • SelectionKey.OP_WRITE:可写
  2. 通过Selector的select()方法可以阻塞selection的操作,当通道中有已准备好进行I/O操作的SelectionKey,会返回这些准备好的SelectionKey的个数,下面是select()的重载方法:

    • int select():阻塞到至少有一个通道在你注册的事件上就绪了。

    • int select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。

    • int selectNow():非阻塞,只要有通道就绪就立刻返回。

    select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。

  3. 当selector.select()返回了准备好的连接数量后,可以通过selector.selectedKeys()获取所有已就绪的Channel的描述符selectedKey,这个selectKyes中包含了它所对应的selector和channel,并且能获取到当前这个selectedKey对应的channel的状态(key.isAcceptable()等)

  4. 如果当前selectedKey描述的是一个isAcceptable(),可以从当前selectedKey中将其对应的ServerSocketChannel也就是我们最初注册进来的channel获取出来,并建立accept()监听,进入阻塞(其实已经不用阻塞了,肯定是个准备好的channel,拿到SocketChannel后,将其设置为非阻塞,通过SelectionKey.OP_READ状态注册到selector中去,最后将其在selectKeys中移除。

    注册事件状态时,可用 | 连接,比如SelectionKey.OP_READ | SelectionKey.OP_ACCEPT

    image-20220304172521562

    // selector.open();
    private native int epollCreate();
    // serverSocketChannel/socketChannle.register(selector, SelectionKey.OP_ACCEPT)
    private native void epollCtl(int epfd, int opcode, int fd, int events);
    // selector.select()
    private native int epollwait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
    
  5. 再次进行selector.select(),这时会返回刚刚readable的SelectionKey,通过selector.selectionKeys()拿到后,判断其状态为isReadable(),就可以对其进行读写操作了,最后也要将其描述符移除掉

客户端处理过程:

  1. 创建SocketChannel设置为非阻塞,通过SelectionKey.OP_CONNECT状态注册到一个selector上。
  2. 通过selector的select()方法阻塞selection操作。
  3. 然后通过selector.seletedKeys()获取所有就绪的SelectionKey描述符。
  4. 如果当前描述符为isConnectable(),获取当前描述符对应的channel,判断当前channel是否已启动连接操作,但是并没有通过finishConnect()完成连接,如果为true,执行channel.finishConnect(),并将channel设置为非阻塞,写数据后以SelectionKey.OP_READ状态重新注册到selector中,最后将其在selectKeys中移除。
  5. 再次进行selector.select()操作,如果在轮询过程中获得了isReadable()的描述符selectionKey,对其进行读取,完成处理过程,最后也要将其描述符移除掉

server

public class NIOSelectorServer {
    public static void main(String[] args) {
        try (Selector selector = Selector.open();
             ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
            serverSocketChannel.configureBlocking(false); // 在多路复用器中,这个必须设置为非阻塞
            serverSocketChannel.bind(new InetSocketAddress(9090));
            // 监听连接事件
            // 将serverSocketChannel注册到selector上
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                // 参数可以带时间:0:阻塞;有时间:设置一个超时时间
                selector.select(); // 阻塞所有注册到多路复用器上的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 对于连接的SocketChannel的selectKey的集合
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove(); // 避免重复处理
                    // socket两种状态:listen    通信R/W
                    if (selectionKey.isAcceptable()) {  // 是一个连接事件
                        acceptHandler(selectionKey);
                    } else if (selectionKey.isReadable()) { // 是一个读事件
                        readHandler(selectionKey);
                    }
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void acceptHandler(SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel client = ssc.accept(); // 目的是调用accept接收客户端,例如fd7
            client.configureBlocking(false);
            ByteBuffer buffer = ByteBuffer.allocate(1024);

            client.register(key.selector(), SelectionKey.OP_READ, buffer);
            System.out.println("-------------------------------------------");
            System.out.println("新客户端:" + client.getRemoteAddress());
            System.out.println("-------------------------------------------");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void readHandler(SelectionKey key) {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        try {
            channel.read(buffer);
            buffer.flip();
            System.out.println("Client Info: "+new String(buffer.array()));
            buffer.clear();
            buffer.put("Hello Client, i'm Server".getBytes());
            buffer.flip();
            channel.write(buffer);
            channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

client

public class NIOSelectorClient {
    public static void main(String[] args) {
        try (Selector selector = Selector.open()) {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("localhost", 9090));
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                Iterator<SelectionKey> selectionKeyIterator = selectionKeySet.iterator();
                while (selectionKeyIterator.hasNext()) {
                    SelectionKey selectionKey = selectionKeyIterator.next();
                    selectionKeyIterator.remove();
                    if (selectionKey.isConnectable()) {
                        connectHandler(selector, selectionKey);
                    } else if (selectionKey.isReadable()) {
                        readHandler(selectionKey);
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void connectHandler(Selector selector, SelectionKey selectionKey) throws IOException {
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        if (channel.isConnectionPending()) {
            channel.finishConnect();
        }
        channel.configureBlocking(false);
        channel.write(ByteBuffer.wrap("Hello Server, I'm NIO Client".getBytes()));
        channel.register(selector, SelectionKey.OP_READ);
    }

    private static void readHandler(SelectionKey selectionKey) throws IOException {
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        channel.read(byteBuffer);
        byteBuffer.flip();
        System.out.println("client receive message: " + new String(byteBuffer.array()));
        channel.close();
    }
}

5. Reactor模式

  • 5.1 传统阻塞式
accpet() + new Thread(() -> {

	// 业务处理

}).start();
  • 5.2 单reactor单线程处理
new SocketServerChannel().registror(selector, SelectionKey.OP_ACCEPT);

while(true) {

	selector.select();

	seletor.selectedKeys().iterator();

	while (iterator.hasnext()) {

		// 业务处理

	}

}
  • 5.3 单reactor多线程处理

在上面业务处理部分加入多线程。

  • 5.4 多reactor多线程处理

Netty,两个Grop,一个处理accept,一个处理业务

回顾

IO

  • inputStream/outputStream, reader/writer
  • BufferedIutStream/BufferedReader(readLine()) 需要flush(), FileInputStream/FileReader
  • RandomAccessFile(File/路径, "rw"),File,FileChannle.open(Paths, StrandardOpenOption)
  • Socket -> BIO + 多线程
  • NewIO(No Block IO),基于Buffer + Channel
  • ServerSocketChannle.open().bind().configurBlocking(false)
    • Selector.open()
    • serverSocketChannle.registor(selector, SelectionKey.ACCEPT)
    • selector.select()
    • selector.selectionKeys()
  • 阻塞(I/O阻塞,连接阻塞)
  • epool(多路[多个Channle注册到selector上] 复用[一个或少量的线程])
  • Netty框架,后续维护
  • 零拷贝(Kafka, rocketMQ),内存映射(mmap)

标签:String,int,buffer,IO,new,public
来源: https://www.cnblogs.com/coderElian/p/15966875.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有