- 获取客户端连接过来的channel通道
- 获取Session
- 与服务器建立连接,将关注的兴趣OPS设置为ready就绪事件,session中的状态修改为客户端已连接
publicvoidhandleConnectEvent(SelectionKey key)throwsIOException { SocketChannel client = (SocketChannel) key.channel; // 1 TCPSession session = (TCPSession) key.attachment; //2 if(session == null) thrownewRuntimeException( "The session is null when connecting to ..."); try{ // 3 client.finishConnect; key.interestOps(SelectionKey.OP_READ); session.setStatus(SessionStatus.CLIENT_CONNECTED); } finally{ session.finishConnect; } }3.处理OP_WRITE、 处理OP_READ
调用session.read和session.doWrite 方法处理读写事件
代码8:
publicvoidhandleReadEvent(SelectionKey key)throwsIOException { TCPSession session = (TCPSession) key.attachment; if(session == null) thrownewRuntimeException( "The session is null when reading data..."); session.read; } publicvoidhandleWriteEvent(SelectionKey key)throwsIOException { TCPSession session = (TCPSession) key.attachment; if(session == null) thrownewRuntimeException( "The session is null when writing data..."); session.doWrite; }3.6 seesion中网络读写的事件详细处理过程
1. 读事件处理
申请2k的ByteBuffer空间,读取channel中的数据到readBuffer中 。根据sessionStatus判断是客户端读响应还是服务器读请求,分别进行处理 。
代码9:
protectedvoid read throws IOException { int ret = readChannel; if( this.status == SessionStatus.CLIENT_CONNECTED) { readResponse; } elseif( this.status == SessionStatus.SERVER_CONNECTED) { readRequest; } else{ thrownew IllegalStateException( "The current session status is invalid. [status:"+ this.status + "]"); } if(ret < 0) { close; return; } } privateint readChannel throws IOException { int readBytes = 0, ret = 0; ByteBuffer data= https://www.isolves.com/it/cxkf/bk/2023-06-29/ByteBuffer.allocate( 1024* 2); // 1 if(readBuffer == null) { readBuffer = IoBuffer.allocate(bufferSize); } // 2 while((ret = ((SocketChannel) channel).read( data)) > 0) { data.flip; // 3 readBytes += data.remaining; readBuffer.put( data.array, data.position, data.remaining); data.clear; } returnret < 0? ret : readBytes; }① 客户端读响应
从当前readBuffer中的内容复制到一个新的临时buffer中,并且切换到读模式,使用TarsCodec类解析出buffer内的协议字段到response,WorkThread线程通知Ticket处理response 。如果response为空,则重置tempBuffer到mark的位置,重新解析协议 。
代码10:
publicvoidreadResponse( ) { Response response = null; IoBuffer tempBuffer = null; tempBuffer = readBuffer.duplicate.flip; while( true) { tempBuffer.mark; if(tempBuffer.remaining > 0) { response = selectorManager.getProtocolFactory.getDecoder.decodeResponse(tempBuffer, this); } else{ response = null; } if(response != null) { if(response.getTicketNumber == Ticket.DEFAULT_TICKET_NUMBER) response.setTicketNumber(response.getSession.hashCode); selectorManager.getThreadPool.execute( newWorkThread(response, selectorManager)); } else{ tempBuffer.reset; readBuffer = resetIoBuffer(tempBuffer); break; } } }② 服务器读请求
任务放入线程池交给 WorkThread线程,最终交给Processor类出构建请求的响应体,包括分布式上下文,然后经过FilterChain的处理,最终通过jdk提供的反射方法invoke服务端本地的方法然后返回response 。如果线程池抛出拒绝异常,则返回SERVEROVERLOAD = -9,服务端过载保护 。如果request为空,则重置tempBuffer到mark的位置,重新解析协议 。
代码11:
publicvoidreadRequest( ) { Request request = null; IoBuffer tempBuffer = readBuffer.duplicate.flip; while( true) { tempBuffer.mark; if(tempBuffer.remaining > 0) { request = selectorManager.getProtocolFactory.getDecoder.decodeRequest(tempBuffer, this); } else{ request = null; } if(request != null) { try{ request.resetBornTime; selectorManager.getThreadPool.execute( newWorkThread(request, selectorManager)); } catch(RejectedExecutionException e) { selectorManager.getProcessor.overload(request, request.getIOSession); } catch(Exception ex) { ex.printStackTrace; } } else{ tempBuffer.reset; readBuffer = resetIoBuffer(tempBuffer); break; } } }2. 写事件处理
推荐阅读
- 长线牛股指标源码 长线牛股
- MySQL 驱动中虚引用 GC 耗时优化与源码分析
- 带你读 MySQL 源码:Where 条件怎么过滤记录?
- 谷歌开源 Rust Crate 审查结果:便于 Rust 开发者验证源码安全
- Spring/SpringBoot中的声明式事务和编程式事务源码、区别、优缺点、适用场景、实战
- 从Java源码来看Native命令执行方法
- 带你读 MySQL 源码:Select *
- 深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因
- 一文看懂Java中的ThreadLocal源码和注意事项
- 一文看懂Redisson分布式锁的Watchdog机制源码实现
