总结为一句话就是:客户端在发起远程调用时,具体的代理类会被InvokerInvacationHandler拦截,在这里面根据一些条件和负载均衡策略,选择出其中一个符合条件的Invoker,进行远程调用。提供者收到请求后,会从ExpoterMap中选择对应的Invoker(Wrapper包装),最终调用到具体的实现类。处理完请求后将结果返回。返回后客户端根据之前传过去的请求ID,找到之前的请求,然后再进行自己的业务处理
public class JavassistProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public T getProxy(Invoker invoker, Class<?>[] interfaces) { // InvokerInvocationHandler (重点关注) // 远程调用时,调用的方法会被 InvokerInvocationHandler 拦截 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } // ...}
@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } // 获取调用的远程方法名 String methodName = method.getName(); // 获取参数 Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 0) { if ("toString".equals(methodName)) { return invoker.toString(); } else if ("$destroy".equals(methodName)) { invoker.destroy(); return null; } else if ("hashCode".equals(methodName)) { return invoker.hashCode(); } } else if (parameterTypes.length == 1 && "equals".equals(methodName)) { return invoker.equals(args[0]); } // 构建一个dubbo rpc invocation RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args); String serviceKey = invoker.getUrl().getServiceKey(); rpcInvocation.setTargetServiceUniqueName(serviceKey); // invoker.getUrl() returns consumer url. RpcContext.setRpcContext(invoker.getUrl()); if (consumerModel != null) { rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel); rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method)); } // 远程调用 return invoker.invoke(rpcInvocation).recreate();}
最终挑选出某一个invoker
@Overridepublic Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); // binding attachments into invocation. Map contextAttachments = RpcContext.getContext().getObjectAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addObjectAttachments(contextAttachments); } // 实际上就是directroy.list 通过方法名寻找invokers 里面回去做一些过滤 获取过滤后的invoke列表 List> invokers = list(invocation); // 根据@SPI选择负载均衡的策略 LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); // 调用子类的方法}
@Override@SuppressWarnings({"unchecked", "rawtypes"})public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException { List> copyInvokers = invokers; checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List> invoked = new ArrayList>(copyInvokers.size()); // invoked invokers. Set providers = new HashSet(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { checkWhetherDestroyed(); copyInvokers = list(invocation); // check again checkInvokers(copyInvokers, invocation); } // 根据负载均衡规则找出一个invoker Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // 远程调用(每次请求都有一个唯一的ID) Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);}
调用到AbstractProxyInvoker ,当前类调用通过Javaassist封装成Wrapper类最终调用到具体的实现类
@Overridepublic void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { if (channel != null) { channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel); } handler.connected(channel); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } if (logger.isInfoEnabled()) { logger.info("The connection between " + channel.getRemoteAddress() + " and " + channel.getLocalAddress() + " is established"); }}
@Overridepublic void connected(Channel channel) throws RemotingException { ExecutorService executor = getExecutorService(); try { // 封装成ChannelEventRunnable,丢到线程中处理 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); }}
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { // 通过请求id,构建一个Response Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { // 获取请求信息 方法名之类的 Object data = req.getData(); String msg; if (data == null) { msg = null; } else if (data instanceof Throwable) { msg = StringUtils.toString((Throwable) data); } else { msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); channel.send(res); return; } // find handler by message class. Object msg = req.getData(); try { // 最终调用 DubboProtocol reply CompletionStage
最终调用到 DubboProtocol
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {@Overridepublic CompletableFuture
@Overridepublic Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { asyncResult = filter.invoke(next, invocation); } catch (Exception e) { if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); try { Filter.Listener listener = listenableFilter.listener(invocation); if (listener != null) { listener.onError(e, invoker, invocation); } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; listener.onError(e, invoker, invocation); } throw e; } finally { } return asyncResult.whenCompleteWithContext((r, t) -> { if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); Filter.Listener listener = listenableFilter.listener(invocation); try { if (listener != null) { if (t == null) { listener.onResponse(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; if (t == null) { listener.onResponse(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } });}
最终调用到Wrapper#AbstractProxyInvoker
@Overridepublic Result invoke(Invocation invocation) throws RpcException { try { Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());CompletableFuture
@Overridepublic Invoker getInvoker(T proxy, Class type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '#39; // 通过Javaassist封装成Wrapper类(Dubbo服务启动时生成,所以在运行时不会产生开销),减少反射的调用 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('#39;) < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker(proxy, type, url) { @Override // Wrapper最终调用最终调用服务提供者的接口实现类的方法 protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } };}
留言与评论(共有 0 条评论) “” |