- tars支持TCP和UDP两种协议,RPC场景下是使用TCP协议 。
- new SelectorManager 根据配置信息初始化selectorManager,线程池大小 processors > 8 ? 4 + (processors * 5 / 8) : processors + 1;线程名称前缀是server-tcp-reactor,然后启动reactor线程数组中的所有线程 。
- 开启服务端监听的ServerSocketChannel,绑定服务端本地ip和监听的端口号,设置TCP连接请求队列的最大容量为1024;设置非阻塞模式 。
- 选取reactor线程数组中第0个线程作为服务端监听连接OP_ACCEPT就绪事件的线程 。
publicvoidbind( AppService appService) throws IOException { // 此处略去非关键代码 if(endpoint.type. equals( "tcp")) { // 1 this.selectorManager = newSelectorManager(Utils.getSelectorPoolSize, newServantProtocolFactory(codec), threadPool, processor, keepAlive, "server-tcp-reactor", false); // 2 this.selectorManager.setTcpNoDelay(serverCfg.isTcpNoDelay); this.selectorManager.start; ServerSocketChannel serverChannel = ServerSocketChannel.open; serverChannel.socket.bind( newInetSocketAddress(endpoint.host, endpoint.port), 1024); // 3 serverChannel.configureBlocking( false); selectorManager.getReactor( 0).registerChannel(serverChannel, SelectionKey.OP_ACCEPT); // 4 } elseif(endpoint.type. equals( "udp")) { this.selectorManager = newSelectorManager( 1, newServantProtocolFactory(codec), threadPool, processor, false, "server-udp-reactor", true); this.selectorManager.start; // UDP开启的是DatagramChannel DatagramChannel serverChannel = DatagramChannel.open; DatagramSocket socket = serverChannel.socket; socket.bind( newInetSocketAddress(endpoint.host, endpoint.port)); serverChannel.configureBlocking( false); // UDP协议不需要建连,监听的是OP_READ就绪事件 this.selectorManager.getReactor( 0).registerChannel(serverChannel, SelectionKey.OP_READ); } }3.4 Reactor线程启动流程
- 多路复用器开始轮询检查 是否有就绪的事件 。
- 处理register队列中剩余的channel注册到当前reactor线程的多路复用器selector中 。
- 获取已选键集中所有就绪的channel 。
- 更新Session中最近操作时间,Tars服务端启动时会调用 startSessionManager , 单线程每30s扫描一次session会话列表,会检查每个session的 lastUpdateOperationTime 与当前时间的时间差,如果超过60秒会将过期session对应的channel踢除 。
- 分发IO事件进行处理 。
- 处理unregister队列中剩余的channel,从当前reactor线程的多路复用器selector中解除注册 。
publicvoidrun( ) { while(!Thread.interrupted) { selector. select; // 1 processRegister; // 2 Iterator<SelectionKey> iter = selector.selectedKeys.iterator; // 3 while(iter.hasNext) { SelectionKey key = iter.next; iter. remove; if(!key.isValid) continue; try{ if(key.attachment != null&& key.attachment instanceof Session) { ((Session) key.attachment).updateLastOperationTime; //4 } dispatchEvent(key); // 5 } catch(Throwable ex) { disConnectWithException(key, ex); } } processUnRegister; // 6 } }3.5 IO事件分发处理
每个reactor线程都有一个专门的Accepter类去处理各种IO事件 。TCPAccepter可以处理全部的四种事件(OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ)、UDPAccepter由于不需要建立连接所以只需要处理读和写两种事件 。
![Tars-Java网络编程源码分析](http://img.jiangsulong.com/230630/15504C0E-5.png)
文章插图
1. 处理OP_ACCEPT
- 获取channel,处理TCP请求 。
- 为这个TCP请求创建TCPSession,会话的状态是服务器已连接
- 会话注册到sessionManager中,Tars服务可配置最大连接数maxconns,如果超过就会关闭当前会话 。
- 寻找下一个reactor线程进行多路复用器与channel的绑定 。
publicvoidhandleAcceptEvent(SelectionKey key)throwsIOException { ServerSocketChannel server = (ServerSocketChannel) key.channel; // 1 SocketChannel channel = server.accept; channel.socket.setTcpNoDelay(selectorManager.isTcpNoDelay); channel.configureBlocking( false); Utils.setQosFlag(channel.socket); TCPSession session = newTCPSession(selectorManager); // 2 session.setChannel(channel); session.setStatus(SessionStatus.SERVER_CONNECTED); session.setKeepAlive(selectorManager.isKeepAlive); session.setTcpNoDelay(selectorManager.isTcpNoDelay); SessionManager.getSessionManager.registerSession(session); // 3 selectorManager.nextReactor.registerChannel(channel, SelectionKey.OP_READ, session); // 4 }2. 处理OP_CONNECT
推荐阅读
- 长线牛股指标源码 长线牛股
- MySQL 驱动中虚引用 GC 耗时优化与源码分析
- 带你读 MySQL 源码:Where 条件怎么过滤记录?
- 谷歌开源 Rust Crate 审查结果:便于 Rust 开发者验证源码安全
- Spring/SpringBoot中的声明式事务和编程式事务源码、区别、优缺点、适用场景、实战
- 从Java源码来看Native命令执行方法
- 带你读 MySQL 源码:Select *
- 深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因
- 一文看懂Java中的ThreadLocal源码和注意事项
- 一文看懂Redisson分布式锁的Watchdog机制源码实现