当启动RpcServer后,即创建了相应的Actor(注意此时Actor的处于停止状态)和动态代理对象,需要调用RpcEndpoint#start启动启动Actor,此时启动RpcEndpoint流程如下(以非FencedRpcEndpoint为例):
- 调用RpcEndpoint#start;
- 委托给RpcServer#start;
- 调用动态代理的AkkaInvocationHandler#invoke;发现调用的是StartStoppable#start方法,则直接进行本地方法调用;invoke方法的代码如下:
- 调用AkkaInvocationHandler#start;
- 通过ActorRef#tell给对应的Actor发送消息rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());;
- 调用AkkaRpcActor#handleControlMessage处理控制类型消息;
- 在主线程中将自身状态变更为Started状态;
3.3.1.1. 执行代码
与Actor通信,通过调用runSync/callSync等方法其直接执行代码 。
下面以scheduleRunAsync方法为例分析请求Actor执行代码流程,方法代码如下:public void scheduleRunAsync(Runnable runnable, long delayMillis) { checkNotNull(runnable, "runnable"); checkArgument(delayMillis >= 0, "delay must be zero or greater");// 判断是否为本地Actorif (isLocal) {long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);// 向Actor发送消息runnable tell(new RunAsync(runnable, atTimeNanos)); } else {// 抛出异常,不支持远程发送Runnable消息throw new RuntimeException("Trying to send a Runnable to a remote actor at " + rpcEndpoint.path() + ". This is not supported."); } }AkkaInvocationHandler#invoke -> AkkaInvocation#scheduleRunAsync;
AkkaRpcActor#handleMessage->AkkaRpcActor#handleRpcMessage,其中handleRpcMessage方法如下:
protected void handleRpcMessage(Object message) {// 根据消息类型不同进行不同的处理if (message instanceof RunAsync) { handleRunAsync((RunAsync) message); } else if (message instanceof CallAsync) { handleCallAsync((CallAsync) message); } else if (message instanceof RpcInvocation) { handleRpcInvocation((RpcInvocation) message); } else { log.warn("Received message of unknown type {} with value {}. Dropping this message!", message.getClass().getName(), message); sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message +" of type " + message.getClass().getSimpleName() + '.')); } }AkkaRpcActor#handleRunAsync,其代码如下:
private void handleRunAsync(RunAsync runAsync) {// 获取延迟调度时间final long timeToRun = runAsync.getTimeNanos();final long delayNanos;// 若为0或已经到了调度时间,则立刻进行调度if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) {// run immediatelytry { runAsync.getRunnable().run(); } catch (Throwable t) { log.error("Caught exception while executing runnable in main thread.", t); ExceptionUtils.rethrowIfFatalErrorOrOOM(t); } }else {// schedule for later. send a new message after the delay, which will then be immediately executed// 计算出延迟时间 FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);// 重新封装消息 RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);final Object envelopedSelfMessage = envelopeSelfMessage(message);// 等待指定延迟时间后给自己再发送一个消息 getContext().system().scheduler().scheduleOnce(delay, getSelf(), envelopedSelfMessage, getContext().dispatcher(), ActorRef.noSender()); } }注意:当还未到调度时间时,该Actor会延迟一段时间后再次给自己发送消息;
推荐阅读
- 总结Java中return语句的用法
- 梦见狐狸扑向自己还想咬是什么意思 梦见狐狸扑向自己吓醒
- 2022天猫圣诞活动大还是元旦活动大,圣诞和元旦哪个打折多
- 编发|刘欢夫妇走机场,妻子扎马尾辫戴头巾真年轻,比他还有艺术家气质
- 电动晾衣架好还是手摇晾衣架好 晾衣架电动的好还是手摇的好
- 一道简单面试题引出的Java数据类型连环问
- 梦见牙要掉还没掉下来 梦见自己的牙要掉了还连着
- 梦见自己的左腿断了 梦到自己右腿断了还能走
- 梦到发现了古墓 梦见自己发现了古墓,还有干尸
- 分析MySQL应用架构发展演变史