Dubbo基础篇 远程调用

完整流程图



一句话总结流程

总结为一句话就是:客户端在发起远程调用时,具体的代理类会被InvokerInvacationHandler拦截,在这里面根据一些条件和负载均衡策略,选择出其中一个符合条件的Invoker,进行远程调用。提供者收到请求后,会从ExpoterMap中选择对应的Invoker(Wrapper包装),最终调用到具体的实现类。处理完请求后将结果返回。返回后客户端根据之前传过去的请求ID,找到之前的请求,然后再进行自己的业务处理

Consumer远程调用

  • 调用对应的代理类
  • 被InvokerInvocationHandler拦截
  • ClusterInvoker经过路由过滤,负载均衡,选择其中一个Invoker,发起远程调用(带请求ID)
public class JavassistProxyFactory extends AbstractProxyFactory {  @Override  @SuppressWarnings("unchecked")  public  T getProxy(Invoker invoker, Class<?>[] interfaces) {      // InvokerInvocationHandler (重点关注)      // 远程调用时,调用的方法会被 InvokerInvocationHandler 拦截      return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));  }  // ...}

InvokerInvocationHandler处理

  • 构建RpcInvocation
  • 调用对应Invoker的invoke方法
@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {    if (method.getDeclaringClass() == Object.class) {        return method.invoke(invoker, args);    }    // 获取调用的远程方法名    String methodName = method.getName();    // 获取参数    Class<?>[] parameterTypes = method.getParameterTypes();    if (parameterTypes.length == 0) {        if ("toString".equals(methodName)) {            return invoker.toString();        } else if ("$destroy".equals(methodName)) {            invoker.destroy();            return null;        } else if ("hashCode".equals(methodName)) {            return invoker.hashCode();        }    } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {        return invoker.equals(args[0]);    }    // 构建一个dubbo rpc invocation    RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);    String serviceKey = invoker.getUrl().getServiceKey();    rpcInvocation.setTargetServiceUniqueName(serviceKey);    // invoker.getUrl() returns consumer url.    RpcContext.setRpcContext(invoker.getUrl());    if (consumerModel != null) {        rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);        rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));    }    // 远程调用    return invoker.invoke(rpcInvocation).recreate();}

Invoker#invoker方法

  • 路由过滤
  • 负载均衡

最终挑选出某一个invoker

@Overridepublic Result invoke(final Invocation invocation) throws RpcException {    checkWhetherDestroyed();    // binding attachments into invocation.    Map contextAttachments = RpcContext.getContext().getObjectAttachments();    if (contextAttachments != null && contextAttachments.size() != 0) {        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);    }    // 实际上就是directroy.list 通过方法名寻找invokers 里面回去做一些过滤 获取过滤后的invoke列表    List> invokers = list(invocation);    // 根据@SPI选择负载均衡的策略    LoadBalance loadbalance = initLoadBalance(invokers, invocation);    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);    return doInvoke(invocation, invokers, loadbalance); // 调用子类的方法}
@Override@SuppressWarnings({"unchecked", "rawtypes"})public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {    List> copyInvokers = invokers;    checkInvokers(copyInvokers, invocation);    String methodName = RpcUtils.getMethodName(invocation);    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;    if (len <= 0) {        len = 1;    }    // retry loop.    RpcException le = null; // last exception.    List> invoked = new ArrayList>(copyInvokers.size()); // invoked invokers.    Set providers = new HashSet(len);    for (int i = 0; i < len; i++) {        //Reselect before retry to avoid a change of candidate `invokers`.        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.        if (i > 0) {            checkWhetherDestroyed();            copyInvokers = list(invocation);            // check again            checkInvokers(copyInvokers, invocation);        }        // 根据负载均衡规则找出一个invoker        Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked);        invoked.add(invoker);        RpcContext.getContext().setInvokers((List) invoked);        try {            // 远程调用(每次请求都有一个唯一的ID)            Result result = invoker.invoke(invocation);            if (le != null && logger.isWarnEnabled()) {                logger.warn("Although retry the method " + methodName                        + " in the service " + getInterface().getName()                        + " was successful by the provider " + invoker.getUrl().getAddress()                        + ", but there have been failed providers " + providers                        + " (" + providers.size() + "/" + copyInvokers.size()                        + ") from the registry " + directory.getUrl().getAddress()                        + " on the consumer " + NetUtils.getLocalHost()                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "                        + le.getMessage(), le);            }            return result;        } catch (RpcException e) {            if (e.isBiz()) { // biz exception.                throw e;            }            le = e;        } catch (Throwable e) {            le = new RpcException(e.getMessage(), e);        } finally {            providers.add(invoker.getUrl().getAddress());        }    }    throw new RpcException(le.getCode(), "Failed to invoke the method "            + methodName + " in the service " + getInterface().getName()            + ". Tried " + len + " times of the providers " + providers            + " (" + providers.size() + "/" + copyInvokers.size()            + ") from the registry " + directory.getUrl().getAddress()            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "            + Version.getVersion() + ". Last error is: "            + le.getMessage(), le.getCause() != null ? le.getCause() : le);}

Provider处理请求

  • 服务端的NettyServer处理请求,最终会调用到DubboProtcol#reply
  • 根据客户端的请求,从ExportedMap中选择对应的Invoker (ExportedMap key:serviceKey())
  • 调用Invoker具体业务类的方法 链式调用,入口 ProtocolFilterWrapper 会处理调用信息GenericFilter,上下文ContextFilter等


调用到AbstractProxyInvoker ,当前类调用通过Javaassist封装成Wrapper类最终调用到具体的实现类

  • 返回处理结果

NettyServer的Handler处理请求

@Overridepublic void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);    try {        if (channel != null) {            channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel);        }        handler.connected(channel);    } finally {        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());    }    if (logger.isInfoEnabled()) {        logger.info("The connection between " + channel.getRemoteAddress() + " and " + channel.getLocalAddress() + " is established");    }}
@Overridepublic void connected(Channel channel) throws RemotingException {  ExecutorService executor = getExecutorService();  try {    // 封装成ChannelEventRunnable,丢到线程中处理    executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));  } catch (Throwable t) {    throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);  }}
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {    // 通过请求id,构建一个Response    Response res = new Response(req.getId(), req.getVersion());    if (req.isBroken()) {        // 获取请求信息 方法名之类的        Object data = req.getData();        String msg;        if (data == null) {            msg = null;        } else if (data instanceof Throwable) {            msg = StringUtils.toString((Throwable) data);        } else {            msg = data.toString();        }        res.setErrorMessage("Fail to decode request due to: " + msg);        res.setStatus(Response.BAD_REQUEST);        channel.send(res);        return;    }    // find handler by message class.    Object msg = req.getData();    try {        // 最终调用 DubboProtocol reply        CompletionStage future = handler.reply(channel, msg);        future.whenComplete((appResult, t) -> {            try {                if (t == null) {                    res.setStatus(Response.OK);                    res.setResult(appResult);                } else {                    res.setStatus(Response.SERVICE_ERROR);                    res.setErrorMessage(StringUtils.toString(t));                }                channel.send(res);            } catch (RemotingException e) {                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);            }        });    } catch (Throwable e) {        res.setStatus(Response.SERVICE_ERROR);        res.setErrorMessage(StringUtils.toString(e));        channel.send(res);    }}

最终调用到 DubboProtocol

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {@Overridepublic CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException {    if (!(message instanceof Invocation)) {        throw new RemotingException(channel, "Unsupported request: "                + (message == null ? null : (message.getClass().getName() + ": " + message))                + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());    }    Invocation inv = (Invocation) message;    // 根据inv获取Invoker   去exporterMap中找    Invoker<?> invoker = getInvoker(channel, inv);    // need to consider backward-compatibility if it's a callback    if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {        String methodsStr = invoker.getUrl().getParameters().get("methods");        boolean hasMethod = false;        if (methodsStr == null || !methodsStr.contains(",")) {            hasMethod = inv.getMethodName().equals(methodsStr);        } else {            String[] methods = methodsStr.split(",");            for (String method : methods) {                if (inv.getMethodName().equals(method)) {                    hasMethod = true;                    break;                }            }        }        if (!hasMethod) {            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()                    + " not found in callback service interface ,invoke will be ignored."                    + " please update the api interface. url is:"                    + invoker.getUrl()) + " ,invocation is :" + inv);            return null;        }    }    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());    // 调用对应的invoke方法(最终wrapper.invokeMethod,参考服务暴露文档)    Result result = invoker.invoke(inv);    return result.thenApply(Function.identity());}

链式调用点 ProtocolFilterWrapper

@Overridepublic Result invoke(Invocation invocation) throws RpcException {    Result asyncResult;    try {        asyncResult = filter.invoke(next, invocation);    } catch (Exception e) {        if (filter instanceof ListenableFilter) {            ListenableFilter listenableFilter = ((ListenableFilter) filter);            try {                Filter.Listener listener = listenableFilter.listener(invocation);                if (listener != null) {                    listener.onError(e, invoker, invocation);                }            } finally {                listenableFilter.removeListener(invocation);            }        } else if (filter instanceof Filter.Listener) {            Filter.Listener listener = (Filter.Listener) filter;            listener.onError(e, invoker, invocation);        }        throw e;    } finally {    }    return asyncResult.whenCompleteWithContext((r, t) -> {        if (filter instanceof ListenableFilter) {            ListenableFilter listenableFilter = ((ListenableFilter) filter);            Filter.Listener listener = listenableFilter.listener(invocation);            try {                if (listener != null) {                    if (t == null) {                        listener.onResponse(r, invoker, invocation);                    } else {                        listener.onError(t, invoker, invocation);                    }                }            } finally {                listenableFilter.removeListener(invocation);            }        } else if (filter instanceof Filter.Listener) {            Filter.Listener listener = (Filter.Listener) filter;            if (t == null) {                listener.onResponse(r, invoker, invocation);            } else {                listener.onError(t, invoker, invocation);            }        }    });}

最终调用到Wrapper#AbstractProxyInvoker

@Overridepublic Result invoke(Invocation invocation) throws RpcException {    try {        Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());CompletableFuture future = wrapWithFuture(value);        CompletableFuture appResponseFuture = future.handle((obj, t) -> {            AppResponse result = new AppResponse();            if (t != null) {                if (t instanceof CompletionException) {                    result.setException(t.getCause());                } else {                    result.setException(t);                }            } else {                result.setValue(obj);            }            return result;        });        return new AsyncRpcResult(appResponseFuture, invocation);    } catch (InvocationTargetException e) {        if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {            logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);        }        return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);    } catch (Throwable e) {        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);    }}protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;

Javassist动态代理

@Overridepublic  Invoker getInvoker(T proxy, Class type, URL url) {    // TODO Wrapper cannot handle this scenario correctly: the classname contains '#39;    // 通过Javaassist封装成Wrapper类(Dubbo服务启动时生成,所以在运行时不会产生开销),减少反射的调用    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('#39;) < 0 ? proxy.getClass() : type);    return new AbstractProxyInvoker(proxy, type, url) {        @Override        // Wrapper最终调用最终调用服务提供者的接口实现类的方法        protected Object doInvoke(T proxy, String methodName,                                  Class<?>[] parameterTypes,                                  Object[] arguments) throws Throwable {            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);        }    };}
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章