一篇文章带你了解Java NIO

上一篇 / 下一篇  2023-05-08 13:22:42

  NIO
  提到IO,这是Java提供的一套类库,用于支持应用程序与内存、文件、网络间进行数据交互,实现数据写入与输出。JDK自从1.4版本后,提供了另一套类库NIO,我们平时习惯称呼为NEW IO或NON-blocking IO。
  那么这套新的IO库与之前的有何区别?为什么需要提供这样一套IO库呢?
  IO与NIO
  Java NIO相比与传统的IO,除了提供标准IO的加强功能之外,最为核心的是对基于Socket的网络编程提供了一套非阻塞编程模式。
  ·流与缓冲
  Java IO Java的IO很好的诠释了Stream这个概念,该单词本身的含义表示‘河流’,承载数据的流,平时我们说的面向流的操作主要是在流的端点,实现对数据读与写。通过Stream相关的API可以看到, 不管是输入还是输出流,我们能做的仅仅是将数据读取或写入到流中。
  Java NIO NIO是基于缓冲区来操作数据,主要是基于通道Channel从缓冲Buffer中进行数据读取或写入。其中Buffer的灵活性决定了NIO的可操作空间,同样基于Buffer API可以看到, 其提供了对Buffer的基本读写功能外,还有提供了各种其他API来操作Buffer,相比Stream对数据的操作更加的灵活。
  · 阻塞与非阻塞
  Java IO 上面说到IO的操作都是基于流的,往流中写入数据时依赖于OutputStream#write,从流中读取数据时通过InputStream#read,这些操作都是阻塞的。
  Java NIO 支持非阻塞模式,但并非NIO就是非阻塞的,比如基于FileChannel操作文件时,仍然是阻塞的。我们说的阻塞或非阻塞都是基于操作系统层面的read/write方法导致的,NIO的非阻塞 基于操作系统层面提供的多路复用IO模型实现,所以NIO的实现是依赖于操作系统的支持。
  NIO相关概念
  在NIO中,三个核心的对象Buffer、Channel、Selector
  Buffer
  我们经常说的面向缓冲区编程主要对该对象的操作,Buffer简单的看就是一个内存块,其内部封装了一个数组,同时该对象提供了大量API可以灵活对其操作,比如缓冲数据读取与写入、缓冲复制等。
  其内部结构如下:
  其内部除了存储数据的数组外,还维护了capacity、limit、position几个属性,用于标记数组容量、存储占用空间、下标索引。Buffer存在读写两种状态,根据上图可以看到其具体含义。
  ·capacity
  表示Buffer最大可缓冲中数据的容量。capacity一旦确定,则不可修改;写入数据一旦达到容量,则不可继续写入;
  · limit
  在写模式时,limit=capacity,表示buf可写入数据上限。在读模式时,limit表示buf可读数据上限。
  · position
  表示Buffer数组下标位置。初始化时,positinotallow=0;
  读模式
  写模式
  1. 当缓冲区刚开始进入读模式时,position会被重置为0。
  2. 当从缓冲区读取时,也是从position位置开始读。读取数据后,position向前移动到下一个可读的位置。
  3. 在读模式下,limit表示可读数据的上限。position的最大值为最大可读上限limit,当position达到limit时表明缓冲区已经无数据可读。
  4. 在刚进入写模式时,position为0,表示当前的写入位置为从头开始。
  5. 当有数据写入到缓冲区后,position会向后移动写入数量个位置。
  6. 初始的position值为0,最大可写值为limit。当position值达到limit时,缓冲区就已经无空间可写了。
  · flip
  用于将Buffer由写状态切换为读状态,limit = position; position = 0;
  · compact、clear
  用于将Buffer由读状态切换为写状态,compact:positinotallow=limit,limit=capacity; clear:positinotallow=0,limit=capacity。
  · mark、reset
  操作Buffer时,用于临时存储position(mark=position),当有需要时,可以通过rest方法将临时值取出并赋值到position(positinotallow=mark) 重新从标记位置继续操作Buffer。
  Channel
  直译为通道,表示源端与目标端的连接通道,主要负责将数据读写到Buffer。
  · 通道可以同时进行读写,而流只能读或者只能写
  · 通道可以实现异步读写数据
  · 通道可以从缓冲读数据,也可以写数据到缓冲
  常用的Channel包括FileChannel、DatagramChannel、ServerSocketChannel和SocketChannel。
  · FileChannel 用于文件的数据读写
  · DatagramChannel 用于支持UDP协议的数据读写
  · ServerSocketChannel和SocketChannel 用于支持TCP协议的数据传输
  Selector
  选择器是NIO技术中的核心组件,可以将通道注册进选择器中,其主要作用就是使用一个线程来对多个通道中的已就绪通道进行选择, 然后可以对选择的通道进行数据处理,属于一对多的关系。这种机制在NIO技术中心称为“IO多路复用”。其优势是可以在一个线程中 对多个连接实现监听,从而节省系统资源与CPU开销。
  其中包括三个核心类:
  · Selector 主操作类,通过静态方法实例化,通过select()方法来监听已经注册的通道
  · SelectionKey 注册完通道之后返回的键,通过该类来描述各个通道的状态
  · SelectableChannel 连接通道,通过该类获取Socket对象,将之注册到Selector中
  我们可以将Channel注册到Selector上并定义感兴趣的事件,当Channel就绪时,可以监听这些事件:
  · Connect 某个Channel成功连接到另一个服务器时称为‘连接就绪’,对应常量:SelectionKey.OP_CONNECT
  · Accept一个Server Socket Channel准备好接收新进入的连接称为‘接收就绪’,对应常量:SelectionKey.OP_ACCEPT
  · Read 一个有数据可读的通道可以说是‘读就绪’,对应常量:SelectionKey.OP_READ
  · Write 等待写数据的通道可以说是‘写就绪’,对应常量:SelectionKey.OP_WRITE
  示例
  1. 文件复制
  传统IO复制文件时需要依赖于InputStream、OutputStream来完成,基于NIO可以通过FileChannel:
  // 文件复制
  sourceChannel.transferTo(0, sourceChannel.size(), targetChannel);
  // 其中获取FileChannel的方法有以下三种:
  FileChannel channel = new FileInputStream(file).getChannel();
  FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
  FileChannel channel = FileChannel.open(file.toPath());
  2. 基于UDP协议的数据传输
  Server:
  @Slf4j
  public class Server {
      private Selector selector;
      private DatagramChannel datagramChannel;
      public Server(int port) {
          try {
              this.selector = Selector.open();
              this.datagramChannel = DatagramChannel.open();
              this.datagramChannel.configureBlocking(false);
              this.datagramChannel.bind(new InetSocketAddress(port));
              this.datagramChannel.register(this.selector, SelectionKey.OP_READ);
              log.info("++++++ DUP Server启动成功 ++++++");
          } catch (IOException e) {
              log.error("Server创建失败:{}", e.getMessage());
          }
      }
      public void start() throws IOException {
          while (true){
              int select = selector.select();
              if(select >0 ){
                  Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                  while (iterator.hasNext()){
                      SelectionKey key = iterator.next();
                      iterator.remove();
                      if(key.isReadable()){
                          DatagramChannel channel = (DatagramChannel) key.channel();
                          ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                          channel.receive(byteBuffer);
                          byteBuffer.flip();
                          CharBuffer charBuffer = Charset.defaultCharset ().decode ( byteBuffer ) ;
                          log.info("Server接收消息:{}",  charBuffer);
                      }
                  }
              }
          }
      }
  }
  Client:
  @Slf4j
  public class Client {
      private DatagramChannel datagramChannel;
      public Client(int port) {
          try {
              this.datagramChannel = DatagramChannel.open();
              this.datagramChannel.configureBlocking(true);
              this.datagramChannel.connect(new InetSocketAddress("127.0.0.1", port));
          } catch (IOException e) {
              log.error("Client创建失败:{}", e.getMessage());
          }
      }
      public void invoke(String message) throws IOException {
          log.info("Client发送消息:{}", message);
          datagramChannel.write(Charset.defaultCharset().encode(message));
      }
  }
  Tests:
  public class UDPTest {
      int port = 8095;
      @Test
      public void server() throws IOException {
          Server server = new Server(port);
          server.start();
      }
      @Test
      public void client() throws IOException {
          Client client = new Client(port);
          client.invoke(message);
          while (true){}
      }
  }
  3. 基于NIO的Socket示例:
  Server:
  @Slf4j
  public class Server {
      private ServerSocketChannel serverSocketChannel;
      private Selector selector;
      public Server(int port){
          try {
              this.selector = Selector.open();
              this.serverSocketChannel = ServerSocketChannel.open();
              serverSocketChannel.configureBlocking(false);
              serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
              serverSocketChannel.bind(new InetSocketAddress(port));
              log.info("++++++ NIO Server启动成功 ++++++");
          } catch (IOException e) {
              log.error("创建ServerSocketChannel出错:{}", e.getMessage());
          }
      }
      public void start() throws IOException {
          while (true){
              selector.select(); // 阻塞
              Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
              while (keyIterator.hasNext()){
                  SelectionKey selectionKey = keyIterator.next();
                  keyIterator.remove(); //
                  if(!selectionKey.isValid()){
                      continue;
                  }
                  if(selectionKey.isAcceptable()){
                      ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
                      SocketChannel socketChannel = ssc.accept(); // 可以是阻塞或非阻塞,获取的Channel一定是阻塞的
                      socketChannel.configureBlocking(false); // 这个有用?
                      socketChannel.register(selector, SelectionKey.OP_READ);
                  }else if(selectionKey.isReadable()){
                      SocketChannel channel = (SocketChannel) selectionKey.channel();
                      ByteBuffer buffer = ByteBuffer.allocate(256);
                      int writeBytes = channel.read(buffer); //
                      if(writeBytes > 0){
                          buffer.flip();
                          byte[] bytes = new byte[buffer.remaining()];
                          buffer.get(bytes);
                          log.info(">>> Server接收消息:{}", new String(bytes));
                      }
                      // 回复
                      channel.write(Charset.defaultCharset().encode("我是Server的回复内容"));
                  }
              }
          }
      }
  }
  Client:
  @Slf4j
  public class Client {
      private SocketChannel socketChannel;
      public Client(int port){
          try {
              this.socketChannel = SocketChannel.open();
              socketChannel.connect(new InetSocketAddress("127.0.0.1",port));
          } catch (IOException e) {
              log.error("创建SocketChannel出错:{}", e.getMessage());
          }
      }
      public void invoke(String message) throws IOException {
          log.info(">>> Client发送消息:{}", message);
          this.socketChannel.write(Charset.defaultCharset().encode(message));
      }
  }
  NIO整体处理流程如下:
  ·通过Selector.open()获取Selector
  · 通过ServerSocketChannel.open()获取ServerSocketChannel
  · 设置ServerSocketChannel为非阻塞模式,ServerSocketChannel.configureBlocking(false)
  · 将Channel绑定到Selector上,并定义关注的操作类型, serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)
  · 将ServerSocketChannel绑定Socket,并设定监听端口,ServerSocketChannel.bind(new InetSocketAddress(port))
  · 开始轮询Selector
  · 阻塞Selector.select(),直到有准备就绪的Channel
  · 轮询Selector.selectedKeys(),获取这些Channel
  · 基于SelectionKey,按需要可以对当前Channel进行Accept、Read、Write等操作
  · 比如当接收客户端链接时,需要将该Channel注册到Selector;
  零拷贝
  首先我们要知道,程序在读取系统文件时,是没办法直接读取磁盘内容,基于操作系统安全考虑,需要通过调用操作系统提供的系统API从内核缓冲区将文件数据拷到用户缓冲区后 才能读取到文件信息。
  在操作系统层面,如果为了完成网络文件的传输,一般需要这样做:
  while( in.read(...)!=-1 ){
     out.write(...) 
  }
  拿到源文件的输入流;拿到目标文件的输出流;从输入流读取数据;将数据写入到输出流;
  整个过程经历了4次文件拷贝:
  1. 读取磁盘文件到操作系统内核缓冲区
  2. 将内核缓冲区的数据,copy到应用程序的buffer
  3. 将应用程序buffer中的数据,copy到socket网络发送缓冲区
  4. 将socket buffer的数据,copy到网卡,由网卡进行网络传输
  经历了4次CPU切换:
  1. 程序调用系统api将文件从磁盘读取到内核态缓冲区,用户态切换内核态
  2. 将数据由内核态缓冲区拷贝到用户缓冲区,内核态切换用户态
  3. 程序调用系统api将数据由用户缓冲区拷贝到内核缓冲区,用户态切换内核态
  4. 将数据由内核态缓冲区拷贝到网卡,内核态切换用户态
  在高并发网络通信环境中,通过传统的方式由于多次的CPU切换与数据拷贝会消耗系统资源,因此为了提高网络间文件传输的性能,就需要减少‘用户态与内核态的上下文切换’和‘内存拷贝’的次数。
  零拷贝的“零”是指用户态和内核态间copy数据的次数为零。
  零拷贝依附于操作系统底层,基于虚拟内存实现,将文件地址与虚拟地址件建立映射关系,
  零拷贝技术可以减少数据拷贝和共享总线操作的次数,消除传输数据在存储器之间不必要的中间拷贝次数,从而有效地提高数据传输效率;零拷贝技术减少了用户进程地址空间和内核地址空间之间因为上下文切换而带来的开销
  ·MappedByteBuffer
  RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
      FileChannel fileChannel = randomAccessFile.getChannel();
      MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileChannel.size());
  · DirectByteBuffer
  DirectByteBuffer继承了MappedByteBuffer,主要是实现了byte获得函数get等。
  · 零拷贝问题
  直接内存DirectMemory的大小默认为-Xmx 的JVM堆的最大值,但是并不受其限制,而是由JVM参数 MaxDirectMemorySize单独控制。
  直接内存不是分配在JVM堆中。并且直接内存不受 GC(新生代的Minor GC)影响,只有当执行老年代的 Full GC时候才会顺便回收直接内存!而直接内存是通过存储在JVM堆中的DirectByteBuffer对象来引用的, 所以当众多的DirectByteBuffer对象从新生代被送入老年代后才触发了 full gc。
  MappedByteBuffer在处理大文件时的确性能很高,但也存在一些问题,如内存占用、文件关闭不确定,被其打开的文件只有在垃圾回收的才会被关闭,而且这个时间点是不确定的。
  结束语
  NIO的出现得益于操作系统的变革,由于网路编程对性能与资源使用上的要求更高,传统的IO模型只能通过线程来提升系统吞吐率;为了满足现代网络通信的需求,在高级编程语言中的优化 行为逐步迁移到操作系统底层,这样通过底层逻辑优化,不仅提供系统性能,最主要减少了系统资源的浪费。

TAG: 软件开发 Java java

 

评分:0

我来说两句

Open Toolbar