Java架构-还不了解Flink底层RPC使用的框架和原理?那就认真看完( 三 )

当启动RpcServer后,即创建了相应的Actor(注意此时Actor的处于停止状态)和动态代理对象,需要调用RpcEndpoint#start启动启动Actor,此时启动RpcEndpoint流程如下(以非FencedRpcEndpoint为例):

  • 调用RpcEndpoint#start;
  • 委托给RpcServer#start;
  • 调用动态代理的AkkaInvocationHandler#invoke;发现调用的是StartStoppable#start方法,则直接进行本地方法调用;invoke方法的代码如下:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?> declaringClass = method.getDeclaringClass(); Object result;// 先匹配指定类型(handler已实现接口的方法),若匹配成功则直接进行本地方法调用;若匹配为FencedRpcGateway类型,则抛出异常(应该在FencedAkkaInvocationHandler中处理);其他则进行Rpc调用if (declaringClass.equals(AkkaBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(RpcServer.class)) { result = method.invoke(this, args); } else if (declaringClass.equals(FencedRpcGateway.class)) {throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" + method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +"fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +"retrieve a properly FencedRpcGateway."); } else { result = invokeRpc(method, args); }return result; }
  • 调用AkkaInvocationHandler#start;
  • 通过ActorRef#tell给对应的Actor发送消息rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());;
  • 调用AkkaRpcActor#handleControlMessage处理控制类型消息;
  • 在主线程中将自身状态变更为Started状态;
经过上述步骤就完成了Actor的启动过程,Actor启动后便可与Acto通信让其执行代码(如runSync/callSync等)和处理Rpc请求了 。下面分别介绍处理执行代码和处理Rpc请求;
3.3.1.1. 执行代码
与Actor通信,通过调用runSync/callSync等方法其直接执行代码 。
下面以scheduleRunAsync方法为例分析请求Actor执行代码流程,方法代码如下:public void scheduleRunAsync(Runnable runnable, long delayMillis) { checkNotNull(runnable, "runnable"); checkArgument(delayMillis >= 0, "delay must be zero or greater");// 判断是否为本地Actorif (isLocal) {long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);// 向Actor发送消息runnable tell(new RunAsync(runnable, atTimeNanos)); } else {// 抛出异常,不支持远程发送Runnable消息throw new RuntimeException("Trying to send a Runnable to a remote actor at " + rpcEndpoint.path() + ". This is not supported."); } }AkkaInvocationHandler#invoke -> AkkaInvocation#scheduleRunAsync;
 
AkkaRpcActor#handleMessage->AkkaRpcActor#handleRpcMessage,其中handleRpcMessage方法如下:
protected void handleRpcMessage(Object message) {// 根据消息类型不同进行不同的处理if (message instanceof RunAsync) { handleRunAsync((RunAsync) message); } else if (message instanceof CallAsync) { handleCallAsync((CallAsync) message); } else if (message instanceof RpcInvocation) { handleRpcInvocation((RpcInvocation) message); } else { log.warn("Received message of unknown type {} with value {}. Dropping this message!", message.getClass().getName(), message); sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message +" of type " + message.getClass().getSimpleName() + '.')); } }AkkaRpcActor#handleRunAsync,其代码如下:
private void handleRunAsync(RunAsync runAsync) {// 获取延迟调度时间final long timeToRun = runAsync.getTimeNanos();final long delayNanos;// 若为0或已经到了调度时间,则立刻进行调度if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) {// run immediatelytry { runAsync.getRunnable().run(); } catch (Throwable t) { log.error("Caught exception while executing runnable in main thread.", t); ExceptionUtils.rethrowIfFatalErrorOrOOM(t); } }else {// schedule for later. send a new message after the delay, which will then be immediately executed// 计算出延迟时间 FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);// 重新封装消息 RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);final Object envelopedSelfMessage = envelopeSelfMessage(message);// 等待指定延迟时间后给自己再发送一个消息 getContext().system().scheduler().scheduleOnce(delay, getSelf(), envelopedSelfMessage, getContext().dispatcher(), ActorRef.noSender()); } }注意:当还未到调度时间时,该Actor会延迟一段时间后再次给自己发送消息;


推荐阅读