Java架构-还不了解Flink底层RPC使用的框架和原理?那就认真看完( 二 )


 
2.3.2. ask方式
当我们需要从Actor获取响应结果时,可使用ask方法,ask方法会将返回结果包装在scala.concurrent.Future中,然后通过异步回调获取返回结果 。如调用方:
// 异步发送消息给Actor,并获取响应结果Future<Object> fu = Patterns.ask(printerActor, "hello helloActor", timeout);fu.onComplete(new OnComplete<Object>() {@Overridepublic void onComplete(Throwable failure, String success) throws Throwable {if (failure != null) { System.out.println("failure is " + failure); } else { System.out.println("success is " + success); } }}, system.dispatcher());HelloActor处理消息方法的代码大致如下:
private void handleMessage(Object object) {if (object instanceof String) {String str = (String) object; log.info("[HelloActor] message is {}, sender is {}", str, getSender().path().toString());// 给发送者发送消息 getSender().tell(str, getSelf()); } }上面主要介绍了Akka中的ActorSystem、Actor,及与Actor的通信;Flink借此构建了其底层通信系统 。
 
3. RPC类图结构
下图展示了Flink中RPC框架中涉及的主要类 。

Java架构-还不了解Flink底层RPC使用的框架和原理?那就认真看完

文章插图
 
3.1. RpcGateway
Flink的RPC协议通过RpcGateway来定义;由前面可知,若想与远端Actor通信,则必须提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster会先启动ActorSystem,此时TaskExecutor的Container还未分配,后面与TaskExecutor通信时,必须让其提供对应地址,从类继承图可以看到基本上所有组件都实现了RpcGateway接口,其代码如下:
public interface RpcGateway {/** * Returns the fully qualified address under which the associated rpc endpoint is reachable. * * @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable */String getAddress();/** * Returns the fully qualified hostname under which the associated rpc endpoint is reachable. * * @return Fully qualified hostname under which the associated rpc endpoint is reachable */String getHostname();}3.2. RpcEndpoint
每个RpcEndpoint对应了一个路径(endpointId和actorSystem共同确定),每个路径对应一个Actor,其实现了RpcGateway接口,其构造函数如下:
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {// 保存rpcService和endpointIdthis.rpcService = checkNotNull(rpcService, "rpcService");this.endpointId = checkNotNull(endpointId, "endpointId");// 通过RpcService启动RpcServerthis.rpcServer = rpcService.startServer(this);// 主线程执行器,所有调用在主线程中串行执行this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);}在RpcEndpoint中还定义了一些方法如runAsync(Runnable)、callAsync(Callable, Time)方法来执行Rpc调用,值得注意的是在Flink的设计中,对于同一个Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动RpcEndpoint/进行Rpc调用时,其会委托RcpServer进行处理 。
 
3.3. RpcService
Rpc服务的接口,其主要作用如下:
 
根据提供的RpcEndpoint来启动RpcServer(Actor);
根据提供的地址连接到RpcServer,并返回一个RpcGateway;
延迟/立刻调度Runnable、Callable;
停止RpcServer(Actor)或自身服务;
在Flink中其实现类为AkkaRpcService 。
 
3.3.1. AkkaRpcService
AkkaRpcService中封装了ActorSystem,并保存了ActorRef到RpcEndpoint的映射关系,在构造RpcEndpoint时会启动指定rpcEndpoint上的RpcServer,其会根据Endpoint类型(FencedRpcEndpoint或其他)来创建不同的Actor(FencedAkkaRpcActor或AkkaRpcActor),并将RpcEndpoint和Actor对应的ActorRef保存起来,然后使用动态代理创建RpcServer,具体代码如下:
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); CompletableFuture<Void> terminationFuture = new CompletableFuture<>();final Props akkaRpcActorProps;// 根据RpcEndpoint类型创建不同类型的Propsif (rpcEndpoint instanceof FencedRpcEndpoint) { akkaRpcActorProps = Props.create( FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion(), configuration.getMaximumFramesize()); } else { akkaRpcActorProps = Props.create( AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion(), configuration.getMaximumFramesize()); } ActorRef actorRef;// 同步块,创建Actor,并获取对应的ActorRefsynchronized (lock) { checkState(!stopped, "RpcService is stopped"); actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId()); actors.put(actorRef, rpcEndpoint); } LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());// 获取Actor的路径final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);final String hostname; Option<String> host = actorRef.path().address().host();if (host.isEmpty()) { hostname = "localhost"; } else { hostname = host.get(); }// 解析该RpcEndpoint实现的所有RpcGateway接口 Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass())); // 额外添加RpcServer和AkkaBasedEnpoint类 implementedRpcGateways.add(RpcServer.class); implementedRpcGateways.add(AkkaBasedEndpoint.class);final InvocationHandler akkaInvocationHandler;// 根据不同类型动态创建代理对象if (rpcEndpoint instanceof FencedRpcEndpoint) {// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler akkaInvocationHandler = new FencedAkkaInvocationHandler<>( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), terminationFuture, ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { akkaInvocationHandler = new AkkaInvocationHandler( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), terminationFuture); }// Rather than using the System ClassLoader directly, we derive the ClassLoader// from this class . That works better in cases where Flink runs embedded and all Flink// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader();// 生成RpcServer对象,而后对该server的调用都会进入Handler的invoke方法处理,handler实现了多个接口的方法@SuppressWarnings("unchecked") RpcServer server = (RpcServer) Proxy.newProxyInstance( classLoader, implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]), akkaInvocationHandler);return server; }


推荐阅读