2.3.2. ask方式
当我们需要从Actor获取响应结果时,可使用ask方法,ask方法会将返回结果包装在scala.concurrent.Future中,然后通过异步回调获取返回结果 。如调用方:
// 异步发送消息给Actor,并获取响应结果Future<Object> fu = Patterns.ask(printerActor, "hello helloActor", timeout);fu.onComplete(new OnComplete<Object>() {@Overridepublic void onComplete(Throwable failure, String success) throws Throwable {if (failure != null) { System.out.println("failure is " + failure); } else { System.out.println("success is " + success); } }}, system.dispatcher());HelloActor处理消息方法的代码大致如下:
private void handleMessage(Object object) {if (object instanceof String) {String str = (String) object; log.info("[HelloActor] message is {}, sender is {}", str, getSender().path().toString());// 给发送者发送消息 getSender().tell(str, getSelf()); } }上面主要介绍了Akka中的ActorSystem、Actor,及与Actor的通信;Flink借此构建了其底层通信系统 。
3. RPC类图结构
下图展示了Flink中RPC框架中涉及的主要类 。
文章插图
3.1. RpcGateway
Flink的RPC协议通过RpcGateway来定义;由前面可知,若想与远端Actor通信,则必须提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster会先启动ActorSystem,此时TaskExecutor的Container还未分配,后面与TaskExecutor通信时,必须让其提供对应地址,从类继承图可以看到基本上所有组件都实现了RpcGateway接口,其代码如下:
public interface RpcGateway {/** * Returns the fully qualified address under which the associated rpc endpoint is reachable. * * @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable */String getAddress();/** * Returns the fully qualified hostname under which the associated rpc endpoint is reachable. * * @return Fully qualified hostname under which the associated rpc endpoint is reachable */String getHostname();}3.2. RpcEndpoint
每个RpcEndpoint对应了一个路径(endpointId和actorSystem共同确定),每个路径对应一个Actor,其实现了RpcGateway接口,其构造函数如下:
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {// 保存rpcService和endpointIdthis.rpcService = checkNotNull(rpcService, "rpcService");this.endpointId = checkNotNull(endpointId, "endpointId");// 通过RpcService启动RpcServerthis.rpcServer = rpcService.startServer(this);// 主线程执行器,所有调用在主线程中串行执行this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);}在RpcEndpoint中还定义了一些方法如runAsync(Runnable)、callAsync(Callable, Time)方法来执行Rpc调用,值得注意的是在Flink的设计中,对于同一个Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动RpcEndpoint/进行Rpc调用时,其会委托RcpServer进行处理 。
3.3. RpcService
Rpc服务的接口,其主要作用如下:
根据提供的RpcEndpoint来启动RpcServer(Actor);
根据提供的地址连接到RpcServer,并返回一个RpcGateway;
延迟/立刻调度Runnable、Callable;
停止RpcServer(Actor)或自身服务;
在Flink中其实现类为AkkaRpcService 。
3.3.1. AkkaRpcService
AkkaRpcService中封装了ActorSystem,并保存了ActorRef到RpcEndpoint的映射关系,在构造RpcEndpoint时会启动指定rpcEndpoint上的RpcServer,其会根据Endpoint类型(FencedRpcEndpoint或其他)来创建不同的Actor(FencedAkkaRpcActor或AkkaRpcActor),并将RpcEndpoint和Actor对应的ActorRef保存起来,然后使用动态代理创建RpcServer,具体代码如下:
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); CompletableFuture<Void> terminationFuture = new CompletableFuture<>();final Props akkaRpcActorProps;// 根据RpcEndpoint类型创建不同类型的Propsif (rpcEndpoint instanceof FencedRpcEndpoint) { akkaRpcActorProps = Props.create( FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion(), configuration.getMaximumFramesize()); } else { akkaRpcActorProps = Props.create( AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion(), configuration.getMaximumFramesize()); } ActorRef actorRef;// 同步块,创建Actor,并获取对应的ActorRefsynchronized (lock) { checkState(!stopped, "RpcService is stopped"); actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId()); actors.put(actorRef, rpcEndpoint); } LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());// 获取Actor的路径final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);final String hostname; Option<String> host = actorRef.path().address().host();if (host.isEmpty()) { hostname = "localhost"; } else { hostname = host.get(); }// 解析该RpcEndpoint实现的所有RpcGateway接口 Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass())); // 额外添加RpcServer和AkkaBasedEnpoint类 implementedRpcGateways.add(RpcServer.class); implementedRpcGateways.add(AkkaBasedEndpoint.class);final InvocationHandler akkaInvocationHandler;// 根据不同类型动态创建代理对象if (rpcEndpoint instanceof FencedRpcEndpoint) {// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler akkaInvocationHandler = new FencedAkkaInvocationHandler<>( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), terminationFuture, ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { akkaInvocationHandler = new AkkaInvocationHandler( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), terminationFuture); }// Rather than using the System ClassLoader directly, we derive the ClassLoader// from this class . That works better in cases where Flink runs embedded and all Flink// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader();// 生成RpcServer对象,而后对该server的调用都会进入Handler的invoke方法处理,handler实现了多个接口的方法@SuppressWarnings("unchecked") RpcServer server = (RpcServer) Proxy.newProxyInstance( classLoader, implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]), akkaInvocationHandler);return server; }
推荐阅读
- 总结Java中return语句的用法
- 梦见狐狸扑向自己还想咬是什么意思 梦见狐狸扑向自己吓醒
- 2022天猫圣诞活动大还是元旦活动大,圣诞和元旦哪个打折多
- 编发|刘欢夫妇走机场,妻子扎马尾辫戴头巾真年轻,比他还有艺术家气质
- 电动晾衣架好还是手摇晾衣架好 晾衣架电动的好还是手摇的好
- 一道简单面试题引出的Java数据类型连环问
- 梦见牙要掉还没掉下来 梦见自己的牙要掉了还连着
- 梦见自己的左腿断了 梦到自己右腿断了还能走
- 梦到发现了古墓 梦见自己发现了古墓,还有干尸
- 分析MySQL应用架构发展演变史