@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 0) { if ("toString".equals(methodName)) { return invoker.toString(); } else if ("$destroy".equals(methodName)) { invoker.destroy(); return null; } else if ("hashCode".equals(methodName)) { return invoker.hashCode(); } } else if (parameterTypes.length == 1 && "equals".equals(methodName)) { return invoker.equals(args[0]); } RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args); String serviceKey = url.getServiceKey(); rpcInvocation.setTargetServiceUniqueName(serviceKey); // invoker.getUrl() returns consumer url. RpcServiceContext.setRpcContext(url); if (consumerModel != null) { rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel); rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method)); } //掉到 MigrationInvoker return invoker.invoke(rpcInvocation).recreate();}
@Overridepublic Result invoke(Invocation invocation) throws RpcException { if (currentAvailableInvoker != null) { if (step == APPLICATION_FIRST) { // call ratio calculation based on random value if (ThreadLocalRandom.current().nextDouble(100) > promotion) { return invoker.invoke(invocation); } } //代码会走这里,走到MockClusterInvoker中 return currentAvailableInvoker.invoke(invocation); } switch (step) { case APPLICATION_FIRST: if (checkInvokerAvailable(serviceDiscoveryInvoker)) { currentAvailableInvoker = serviceDiscoveryInvoker; } else if (checkInvokerAvailable(invoker)) { currentAvailableInvoker = invoker; } else { currentAvailableInvoker = serviceDiscoveryInvoker; } break; case FORCE_APPLICATION: currentAvailableInvoker = serviceDiscoveryInvoker; break; case FORCE_INTERFACE: default: currentAvailableInvoker = invoker; } return currentAvailableInvoker.invoke(invocation);}
@Overridepublic Result invoke(Invocation invocation) throws RpcException { Result result = null; //获取url中的mock参数 String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || "false".equalsIgnoreCase(value)) { //no mock //如果没mock则直接走后面调用逻辑 result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { //如果是force开头 if (logger.isWarnEnabled()) { logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled, url : " + getUrl()); } //强制降级,调用mock的实现类逻辑 //force:direct mock result = doMockInvoke(invocation, null); } else { //fail-mock try { result = this.invoker.invoke(invocation); //fix:#4585 if(result.getException() != null && result.getException() instanceof RpcException){ RpcException rpcException= (RpcException)result.getException(); if(rpcException.isBiz()){ throw rpcException; }else { result = doMockInvoke(invocation, rpcException); } } } catch (RpcException e) { if (e.isBiz()) { throw e; } if (logger.isWarnEnabled()) { logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e); } result = doMockInvoke(invocation, e); } } return result;}
class FilterChainNode, FILTER extends BaseFilter> implements Invoker{ TYPE originalInvoker; Invoker nextNode; FILTER filter; public FilterChainNode(TYPE originalInvoker, Invoker nextNode, FILTER filter) { this.originalInvoker = originalInvoker; this.nextNode = nextNode; this.filter = filter; } public TYPE getOriginalInvoker() { return originalInvoker; } @Override public Class getInterface() { return originalInvoker.getInterface(); } @Override public URL getUrl() { return originalInvoker.getUrl(); } @Override public boolean isAvailable() { return originalInvoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { asyncResult = filter.invoke(nextNode, invocation); } catch (Exception e) { if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); try { Filter.Listener listener = listenableFilter.listener(invocation); if (listener != null) { listener.onError(e, originalInvoker, invocation); } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof FILTER.Listener) { FILTER.Listener listener = (FILTER.Listener) filter; listener.onError(e, originalInvoker, invocation); } throw e; } finally { } return asyncResult.whenCompleteWithContext((r, t) -> { if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); Filter.Listener listener = listenableFilter.listener(invocation); try { if (listener != null) { if (t == null) { listener.onResponse(r, originalInvoker, invocation); } else { listener.onError(t, originalInvoker, invocation); } } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof FILTER.Listener) { FILTER.Listener listener = (FILTER.Listener) filter; if (t == null) { listener.onResponse(r, originalInvoker, invocation); } else { listener.onError(t, originalInvoker, invocation); } } }); } @Override public void destroy() { originalInvoker.destroy(); } @Override public String toString() { return originalInvoker.toString(); }}
@Overridepublic Result invoke(final Invocation invocation) throws RpcException { //判断是否销毁 checkWhetherDestroyed(); // binding attachments into invocation. // Map contextAttachments = RpcContext.getClientAttachment().getObjectAttachments(); // if (contextAttachments != null && contextAttachments.size() != 0) { // ((RpcInvocation)invocation).addObjectAttachmentsIfAbsent(contextAttachments); // } //获取服务列表 List> invokers = list(invocation); //spi 获取负载均衡类实例 LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance);}
public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException { List> copyInvokers = invokers; //invokers校验 checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); //计算调用次数 int len = calculateInvokeTimes(methodName); // retry loop. RpcException le = null; // last exception. //记录已经调用过了的服务列表 List> invoked = new ArrayList>(copyInvokers.size()); // invoked invokers. Set providers = new HashSet(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. //如果掉完一次后,服务列表更新了,再次获取服务列表 if (i > 0) { checkWhetherDestroyed(); copyInvokers = list(invocation); // check again checkInvokers(copyInvokers, invocation); } //根据负载均衡算法,选择一个服务调用 Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked); //记录已经调用过的invoker invoked.add(invoker); RpcContext.getServiceContext().setInvokers((List) invoked); try { //具体的服务调用逻辑 Result result = invokeWithContext(invoker, invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);}
@Overridepublic Result invoke(Invocation inv) throws RpcException { // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed if (isDestroyed()) { logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer"); } RpcInvocation invocation = (RpcInvocation) inv; //调用 invocation初始化 // prepare rpc invocation prepareInvocation(invocation); //具体的rpc调用流程 // do invoke rpc invocation and return async result AsyncRpcResult asyncResult = doInvokeAndReturn(invocation); //阻塞拿返回结果 // wait rpc result if sync waitForResultIfSync(asyncResult, invocation); return asyncResult;}
在doInvokeAndReturn方法中是走了远程rpc调用,然后在waitForResultIfSync里面如果是同步调用就掉了CompletableFuture的get方法完成了调用阻塞,这里就是异步调用然后用get的方式阻塞,异步调用框架的同步阻塞改造。我们看看waitForResultIfSync(asyncResult, invocation);方法。
private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) { //如果非同步调用,要直接返回asyncResult if (InvokeMode.SYNC != invocation.getInvokeMode()) { return; } try { /* * NOTICE! * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)}because * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop. */ //同步调用,其实是在这里阻塞拿返回结果 asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (ExecutionException e) { Throwable rootCause = e.getCause(); if (rootCause instanceof TimeoutException) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } else if (rootCause instanceof RemotingException) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } else { throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } catch (Throwable e) { throw new RpcException(e.getMessage(), e); }}
private AsyncRpcResult doInvokeAndReturn(RpcInvocation invocation) { AsyncRpcResult asyncResult; try { //调用核心代码 asyncResult = (AsyncRpcResult) doInvoke(invocation); } catch (InvocationTargetException e) { Throwable te = e.getTargetException(); if (te != null) { // if biz exception if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation); } else { asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } } catch (RpcException e) { // if biz exception if (e.isBiz()) { asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { throw e; } } catch (Throwable e) { asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } // set server context RpcContext.getServiceContext().setFuture(new FutureAdapter<> (asyncResult.getResponseFuture())); return asyncResult;}
@Overrideprotected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { //如果是单工通讯 //@Method(name = "doKill",isReturn = false) 这样配置就是单工通讯 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = calculateTimeout(invocation, methodName); invocation.put(TIMEOUT_KEY, timeout); //单工就不需要等待返回结果 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY,false); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { ExecutorService executor = getCallbackExecutor(getUrl(), inv); //request具体的rpc远程调用 CompletableFuture appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); }}
CompletableFuture appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj ->(AppResponse) obj);
@Overridepublic CompletableFuture
public Request() { mId = newId();}private static long newId() { // getAndIncrement() When it grows to MAX_VALUE, it will grow to MIN_VALUE, and the negative can be used as ID return INVOKE_ID.getAndIncrement();}
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);该方法创建了一个CompletableFuture对象,并把对象缓存了起来,缓存的key就是刚刚生成Request对象的id,这个id会传递给消费端,然后消费端响应的时候又会传递回来的,然后会从缓存中根据传递回来的id拿到对应的CompletableFuture对象然后把阻塞的地方唤醒。
private DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; //这里是生成请求流水,请求和响应的流水id this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); // put into waiting map. //通过id来找对应请求的future对象 FUTURES.put(id, this); CHANNELS.put(id, channel);}
@Overridepublic void send(Object message, boolean sent) throws RemotingException { // whether the channel is closed super.send(message, sent); boolean success = true; int timeout = 0; try { //netty通讯,把请求发送到服务端 ChannelFuture future = channel.writeAndFlush(message); if (sent) { // wait timeout ms timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.cause(); if (cause != null) { throw cause; } } catch (Throwable e) { removeChannelIfDisconnected(channel); throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); }}
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); handler.received(channel, msg);}
@Overridepublic void received(Channel channel, Object message) throws RemotingException { if (message instanceof MultiMessage) { MultiMessage list = (MultiMessage) message; for (Object obj : list) { try { handler.received(channel, obj); } catch (ExecutionException e) { logger.error("MultiMessageHandler received fail.", e); handler.caught(channel, e); } } } else { handler.received(channel, message); }}
@Overridepublic void received(Channel channel, Object message) throws RemotingException { //设置读数据的时间戳 setReadTimestamp(channel); //如果是心跳请求 if (isHeartbeatRequest(message)) { Request req = (Request) message; if (req.isTwoWay()) { //应答消息 Response res = new Response(req.getId(), req.getVersion()); res.setEvent(HEARTBEAT_EVENT); //发送应答给对方 channel.send(res); if (logger.isInfoEnabled()) { int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); if (logger.isDebugEnabled()) { logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period" + (heartbeat > 0 ? ": " + heartbeat + "ms" : "")); } } } return; } //如果是收到的心跳应答消息 if (isHeartbeatResponse(message)) { if (logger.isDebugEnabled()) { logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName()); } return; } handler.received(channel, message);}
@Overridepublic void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if(message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); }}
protected void sendFeedback(Channel channel, Request request, Throwable t) throws RemotingException { if (request.isTwoWay()) { String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") thread pool is exhausted, detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; }}
@Overridepublic void run() { //如果是接受请求 if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } break; case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } }}
@Overridepublic void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); } //如果是请求消息 if (message instanceof Request) { decode(((Request) message).getData()); } //如果是响应消息 if (message instanceof Response) { decode(((Response) message).getResult()); } handler.received(channel, message);}
@Overridepublic void received(Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); //服务端接收到的请求消息 if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { //如果是双工通讯 if (request.isTwoWay()) { //核心代码 handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { //如果是客户端接收到的响应消息 handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); }}
我们可以看到核心代码就是handleRequest(exchangeChannel, request);,我们看看该方法
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { //这里注意,id是从request对象中获取到的id,这个id响应数据时是需要返回给客户端的 Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) { msg = null; } else if (data instanceof Throwable) { msg = StringUtils.toString((Throwable) data); } else { msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); channel.send(res); return; } //获取到编解码后的数据 // find handler by message class. Object msg = req.getData(); try { //调用DubboProtocol中的ExchangeHandlerAdapter中的reply方法进行后续的filter调用 CompletionStage
Response res = new Response(req.getId(), req.getVersion());
然后调用了这行代码CompletionStage future = handler.reply(channel, msg);
@Overridepublic CompletableFuture
future.whenComplete((appResult, t) -> { try { if (t == null) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } //回调客户端响应数据 channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); }});
@Overridepublic void received(Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); //服务端接收到的请求消息 if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { //如果是双工通讯 if (request.isTwoWay()) { //核心代码 handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { //如果是客户端接收到的响应消息 handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); }}
客户端接收到Response响应后,走的逻辑就是 handleResponse(channel, (Response) message);逻辑了,我们看看这个方法。
static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); }}
public static void received(Channel channel, Response response, boolean timeout) { try { //根据返回的id,从map中找到该请求对应的future对象 DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { Timeout t = future.timeoutCheckTask; if (!timeout) { // decrease Time t.cancel(); } //收到服务端的回调后,唤醒get方法 future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response status is " + response.getStatus() + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result."); } } finally { CHANNELS.remove(response.getId()); }}
private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { //收到返回结果后,这里是去唤醒get方法的。唤醒必须是接收到返回结果后才去唤醒 this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } // the result is returning, but the caller thread may still waiting // to avoid endless waiting for whatever reason, notify caller thread to return. if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; if (threadlessExecutor.isWaiting()) { threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" + " which is not an expected state, interrupt the thread manually by returning an exception.")); } }}
留言与评论(共有 0 条评论) “” |