Tars-Java网络编程源码分析(下)

上一篇 / 下一篇  2023-03-20 10:53:29

  三、 Tars NIO网络编程
  了解完 Java NIO的原理,我们来看看Tars是如何使用NIO进行网络编程的。
  Tars的网络模型是多reactor多线程模型。有一点特殊的是tars的reactor线程组里随机选一个线程处理网络事件,并且该线程同时也能处理读写。
  核心类之间的关系如下:
  3.1 一个典型的Java NIO服务端开发流程
  创建ServerSocketChannel,设置为非阻塞,并绑定端口
  创建Selector对象
  给ServerSocketChannel注册SelectionKey.OP_ACCEPT事件
  启动一个线程循环,调用Selector的select方法来检查IO就绪事件,一旦有IO就绪事件,就通知用户线程去处理IO事件
  如果有Accept事件,就创建一个SocketChannel,并注册SelectionKey.OP_READ
  如果有读事件,判断一下是否全包,如果全包,就交给后端线程处理
  写事件比较特殊。isWriteable表示的是本机的写缓冲区是否可写。这个在绝大多少情况下都是为真的。在Netty中只有写半包的时候才需要注册写事件,如果一次写就完全把数据写入了缓冲区就不需要注册写事件。
  3.2 Tars客户端发起请求到服务器的流程
  Communicator.stringToProxy()  根据servantName等配置信息创建通信器。
  ServantProxyFactory.getServantProxy() 调用工厂方法创建servant代理。
   ObjectProxyFactory.getObjectProxy()  调用工厂方法创建obj代理。
   TarsProtocolInvoker.create() 创建协议调用者。
  ServantProtocolInvoker.initClient(Url url)  根据servantProxyConfig中的配置信息找到servant的ip端口等进行初始化ServantClient。
  ClientPoolManager.getSelectorManager() 如果第一次调用selectorManager是空的就会去初始化selectorManager。
   reactorSet = new Reactor[selectorPoolSize];     SelectorManager初始化构造类中的会根据selectorPoolSize(默认是2)的配置创建Reactor线程数组。线程名称的前缀是servant-proxy-加上CommunicatorId,CommunicatorId生成规则是由locator的地址生成的UUID。
  启动reactor线程。
  3.3 Tars服务端启动步骤
  tars支持TCP和UDP两种协议,RPC场景下是使用TCP协议。
  new SelectorManager() 根据配置信息初始化selectorManager,线程池大小 processors > 8 ? 4 + (processors * 5 / 8) : processors + 1;线程名称前缀是server-tcp-reactor,然后启动reactor线程数组中的所有线程。
  开启服务端监听的ServerSocketChannel,绑定服务端本地ip和监听的端口号,设置TCP连接请求队列的最大容量为1024;设置非阻塞模式。
   选取reactor线程数组中第0个线程作为服务端监听连接OP_ACCEPT就绪事件的线程。
  代码4:
  public void bind(AppService appService) throws IOException {
   
      // 此处略去非关键代码
   
      if (endpoint.type().equals("tcp")) {  // 1
          this.selectorManager = new SelectorManager(Utils.getSelectorPoolSize(), new ServantProtocolFactory(codec), threadPool, processor, keepAlive, "server-tcp-reactor", false);     // 2
          this.selectorManager.setTcpNoDelay(serverCfg.isTcpNoDelay());
          this.selectorManager.start();
          ServerSocketChannel serverChannel = ServerSocketChannel.open();
          serverChannel.socket().bind(new InetSocketAddress(endpoint.host(), endpoint.port()), 1024);   // 3
          serverChannel.configureBlocking(false);
                selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_ACCEPT);  // 4
      } else if (endpoint.type().equals("udp")) {
          this.selectorManager = new SelectorManager(1, new ServantProtocolFactory(codec), threadPool, processor, false, "server-udp-reactor", true);
          this.selectorManager.start();
          // UDP开启的是DatagramChannel
          DatagramChannel serverChannel = DatagramChannel.open();
          DatagramSocket socket = serverChannel.socket();
          socket.bind(new InetSocketAddress(endpoint.host(), endpoint.port()));
          serverChannel.configureBlocking(false);
          // UDP协议不需要建连,监听的是OP_READ就绪事件
          this.selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_READ);
      }
  }
  3.4 Reactor线程启动流程
  多路复用器开始轮询检查 是否有就绪的事件。
  处理register队列中剩余的channel注册到当前reactor线程的多路复用器selector中。
  获取已选键集中所有就绪的channel。
   更新Session中最近操作时间,Tars服务端启动时会调用 startSessionManager() , 单线程每30s扫描一次session会话列表,会检查每个session的 lastUpdateOperationTime 与当前时间的时间差,如果超过60秒会将过期session对应的channel踢除。
   分发IO事件进行处理。
   处理unregister队列中剩余的channel,从当前reactor线程的多路复用器selector中解除注册。
  代码5:
  public void run() {
          while (!Thread.interrupted()) {
              selector.select();  // 1
              processRegister();  // 2
              Iterator<SelectionKey> iter = selector.selectedKeys().iterator();   //  3
              while (iter.hasNext()) {
                  SelectionKey key = iter.next();
                  iter.remove();
                  if (!key.isValid()) continue;
                  try {
                      if (key.attachment() != null && key.attachment() instanceof Session) {
                        ((Session) key.attachment()).updateLastOperationTime(); //4
                      }
                   dispatchEvent(key);    // 5
                  } catch (Throwable ex) {
                   disConnectWithException(key, ex);
                  }
              }
              processUnRegister();  // 6
          }
  }
  3.5 IO事件分发处理
  每个reactor线程都有一个专门的Accepter类去处理各种IO事件。TCPAccepter可以处理全部的四种事件(OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ)、UDPAccepter由于不需要建立连接所以只需要处理读和写两种事件。
  1、 处理OP_ACCEPT
  获取channel,处理TCP请求。
  为这个TCP请求创建TCPSession,会话的状态是服务器已连接
  会话注册到sessionManager中,Tars服务可配置最大连接数maxconns,如果超过就会关闭当前会话。
  寻找下一个reactor线程进行多路复用器与channel的绑定。
  代码6:
  public void handleAcceptEvent(SelectionKey key) throws IOException {
      ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 1
      SocketChannel channel = server.accept();
         channel.socket().setTcpNoDelay(selectorManager.isTcpNoDelay());
      channel.configureBlocking(false);
      Utils.setQosFlag(channel.socket());
      TCPSession session = new TCPSession(selectorManager);    // 2
      session.setChannel(channel);
      session.setStatus(SessionStatus.SERVER_CONNECTED);
      session.setKeepAlive(selectorManager.isKeepAlive());
      session.setTcpNoDelay(selectorManager.isTcpNoDelay());
      SessionManager.getSessionManager().registerSession(session);   // 3
        selectorManager.nextReactor().registerChannel(channel, SelectionKey.OP_READ, session); // 4
  }
  2、处理OP_CONNECT
  获取客户端连接过来的channel通道
  获取Session
   与服务器建立连接,将关注的兴趣OPS设置为ready就绪事件,session中的状态修改为客户端已连接
  代码7:
  public void handleConnectEvent(SelectionKey key) throws IOException {
      SocketChannel client = (SocketChannel) key.channel();  // 1
      TCPSession session = (TCPSession) key.attachment();   //2
      if (session == null) throw new RuntimeException("The session is null when connecting to ...");
      try {  // 3
          client.finishConnect();
          key.interestOps(SelectionKey.OP_READ);
          session.setStatus(SessionStatus.CLIENT_CONNECTED);
      } finally {
          session.finishConnect();
      }
  }
  3.处理OP_WRITE、 处理OP_READ
  调用session.read()和session.doWrite() 方法处理读写事件
  代码8:
  public void handleReadEvent(SelectionKey key) throws IOException {
      TCPSession session = (TCPSession) key.attachment();
      if (session == null) throw new RuntimeException("The session is null when reading data...");
      session.read();
  }
  public void handleWriteEvent(SelectionKey key) throws IOException {
      TCPSession session = (TCPSession) key.attachment();
      if (session == null) throw new RuntimeException("The session is null when writing data...");
      session.doWrite();
  }
  3.6 seesion中网络读写的事件详细处理过程
  1. 读事件处理
  申请2k的ByteBuffer空间,读取channel中的数据到readBuffer中。根据sessionStatus判断是客户端读响应还是服务器读请求,分别进行处理。
  代码9:
  protected void read() throws IOException {
      int ret = readChannel();
      if (this.status == SessionStatus.CLIENT_CONNECTED) {
          readResponse();
      } else if (this.status == SessionStatus.SERVER_CONNECTED) {
          readRequest();
      } else {
          throw new IllegalStateException("The current session status is invalid. [status:" + this.status + "]");
      }
      if (ret < 0) {
          close();
          return;
      }
  }
  private int readChannel() throws IOException {
      int readBytes = 0, ret = 0;
      ByteBuffer data = ByteBuffer.allocate(1024 * 2);  // 1
      if (readBuffer == null) {
          readBuffer = IoBuffer.allocate(bufferSize);
      }
         // 2
      while ((ret = ((SocketChannel) channel).read(data)) > 0) {
          data.flip();  // 3
          readBytes += data.remaining();
          readBuffer.put(data.array(), data.position(), data.remaining());
          data.clear();
      }
      return ret < 0 ? ret : readBytes;
  }
  ① 客户端读响应
  从当前readBuffer中的内容复制到一个新的临时buffer中,并且切换到读模式,使用TarsCodec类解析出buffer内的协议字段到response,WorkThread线程通知Ticket处理response。如果response为空,则重置tempBuffer到mark的位置,重新解析协议。 
  代码10:
  public void readResponse() {
      Response response = null;
      IoBuffer tempBuffer = null;
          tempBuffer = readBuffer.duplicate().flip();
          while (true) {
              tempBuffer.mark();
              if (tempBuffer.remaining() > 0) {
                  response = selectorManager.getProtocolFactory().getDecoder().decodeResponse(tempBuffer, this);
              } else {
                  response = null;
              }
              if (response != null) {
                  if (response.getTicketNumber() == Ticket.DEFAULT_TICKET_NUMBER) response.setTicketNumber(response.getSession().hashCode());
                  selectorManager.getThreadPool().execute(new WorkThread(response, selectorManager));
              } else {
                  tempBuffer.reset();
                  readBuffer = resetIoBuffer(tempBuffer);
                  break;
              }
          }
  }
  ② 服务器读请求
  任务放入线程池交给 WorkThread线程,最终交给Processor类出构建请求的响应体,包括分布式上下文,然后经过FilterChain的处理,最终通过jdk提供的反射方法invoke服务端本地的方法然后返回response。如果线程池抛出拒绝异常,则返回SERVEROVERLOAD = -9,服务端过载保护。如果request为空,则重置tempBuffer到mark的位置,重新解析协议。
  代码11:
  public void readRequest() {
      Request request = null;
      IoBuffer tempBuffer = readBuffer.duplicate().flip();
          while (true) {
              tempBuffer.mark();
              if (tempBuffer.remaining() > 0) {
                  request = selectorManager.getProtocolFactory().getDecoder().decodeRequest(tempBuffer, this);
              } else {
                  request = null;
              }
              if (request != null) {
                  try {
                      request.resetBornTime();
                      selectorManager.getThreadPool().execute(new WorkThread(request, selectorManager));
                  } catch (RejectedExecutionException e) {
                    selectorManager.getProcessor().overload(request, request.getIoSession());
                  } catch (Exception ex) {
                    ex.printStackTrace();
                  }
              } else {    
                  tempBuffer.reset();
                  readBuffer = resetIoBuffer(tempBuffer);
                  break;
              }
          }
  }
  2. 写事件处理
  同样也包括客户端写请求和服务端写响应两种,其实这两种都是往TCPSession中的LinkedBlockingQueue(有界队列最大8K)中插入ByteBuffer。LinkedBlockingQueue中的ByteBuffer最终会由TCPAcceptor中的handleWriteEvent监听写就绪事件并消费。
  代码12:
  protected void write(IoBuffer buffer) throws IOException {
      if (buffer == null) return;
      if (channel == null || key == null) throw new IOException("Connection is closed");
      if (!this.queue.offer(buffer.buf())) {
          throw new IOException("The session queue is full. [ queue size:" + queue.size() + " ]");
      }
      if (key != null) {
          key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
          key.selector().wakeup();
      }
  }
  四、总结
  本文主要介绍了Java NIO编程的基础知识 和 Tars-Java 1.7.2版本的网络编程模块的源码实现。
  在最新的Tars-Java的master分支中我们可以发现网络编程已经由NIO改成了Netty,虽然Netty更加成熟稳定,但是作为学习者了解NIO的原理也是掌握网络编程的必经之路。

TAG: 软件开发 Java java

 

评分:0

我来说两句

Open Toolbar