


Dubbo 服务引用的时机有两个,

第一个是在 Spring 容器调用 ReferenceBeanafterPropertiesSet 方法时引用服务,属于饿汉式的,当我们消费者代码中要使用dubbo服务提供者的时候的时候,使用注解 @DubboReference ,则会开启,因为这个时候springset代码执行完成了,此时需要注入的就是dubbo的服务了 ,我下文即使以这种方式介绍的。

第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用。属于懒汉式当我们的服务被注入到其他类中时,Spring 会第一时间调用 getObject 方法 并由该方法执行服务引用逻辑,默认情况下,Dubbo 使用懒汉式引用服务

下面我们按照 Dubbo 饿汉式进行分析,整个分析过程从 ReferenceBeanafterPropertiesSet 方法开始。


第一种是引用本地 (JVM) 服务



不管是哪种引用方式,最后都会得到一个 Invoker 实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker 实例

此时需要通过集群管理类 Cluster 将多个 Invoker 合并成一个实例。合并后的 Invoker 实例已经具备调用本地或远程服务的能力了,但并不能将此实例暴露给用户使用,这会对用户业务代码造成侵入。此时框架还需要通过代理工厂类 (ProxyFactory) 为服务接口生成代理类,并让代理类去调用 Invoker 逻辑。避免了 Dubbo 框架代码对业务代码的侵入,同时也让框架更容易使用。


1.1、入口 #afterPropertiesSet

public void afterPropertiesSet() throws Exception {

    // 准备dubbo配置类的bean

    // 模式是懒汉式初始化,这里舒适进来是null
    if (init == null) {
        init = false;

    // 判断是否需要初始化,如果需要的话,执行懒汉式的逻辑。进行调用
    if (shouldInit()) {

private void prepareDubboConfigBeans() {
    beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, ConfigCenterBean.class);
    beansOfTypeIncludingAncestors(applicationContext, MetadataReportConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, MetricsConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, SslConfig.class);

public Object getObject() {
    return get();

public synchronized T get() {
    // 已经调用过销毁方法,则抛出异常
    if (destroyed) {
        throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    // 检测 ref 是否为空,为空则通过 init 方法创建(要用于处理配置,以及调用 createProxy 生成代理类)
    if (ref == null) {
    return ref;


	public synchronized void init() {
		if (initialized) {
		if (bootstrap == null) {
			bootstrap = DubboBootstrap.getInstance();
		// 检查每个配置模块是否正确创建,并在必要时重写其属性。
		// 校验本地存根合法性
		ConfigValidationUtils.checkMock(interfaceClass, this);
		// 添加配置<side,consumer>
		Map<String, String> map = new HashMap<String, String>();
		// 添加版本、时间戳等运行时参数
		if (!ProtocolUtils.isGeneric(generic)) {
			// 获取版本,校验存入map
			String revision = Version.getVersion(interfaceClass, version);
			if (revision != null && revision.length() > 0) {
				map.put(REVISION_KEY, revision);
			String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
			if (methods.length == 0) {
				logger.warn("No method found in service interface " + interfaceClass.getName());
				map.put(METHODS_KEY, ANY_VALUE);
			} else {
				map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
		// 接口放入map
		map.put(INTERFACE_KEY, interfaceName);
		// 通过getter或者getParameters方法将MetricsConfig、ApplicationConfig和ModuleConfig属性放到map中
		AbstractConfig.appendParameters(map, getMetrics());
		AbstractConfig.appendParameters(map, getApplication());
		AbstractConfig.appendParameters(map, getModule());
		// 通过getter或者getParameters方法将ConsumerConfig和ReferenceConfig属性放到map中
		AbstractConfig.appendParameters(map, consumer);
		AbstractConfig.appendParameters(map, this);
		// 元数据报告配置不为空且合法,则设置<metadata,remote>到map中
		MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
		if (metadataReportConfig != null && metadataReportConfig.isValid()) {
		Map<String, AsyncMethodInfo> attributes = null;
		if (CollectionUtils.isNotEmpty(getMethods())) {
			attributes = new HashMap<>();
			for (MethodConfig methodConfig : getMethods()) {
				AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
				String retryKey = methodConfig.getName() + ".retry";
				if (map.containsKey(retryKey)) {
					String retryValue = map.remove(retryKey);
					if ("false".equals(retryValue)) {
						map.put(methodConfig.getName() + ".retries", "0");
				AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
				if (asyncMethodInfo != null) {
//                    consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
					attributes.put(methodConfig.getName(), asyncMethodInfo);
		String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
		if (StringUtils.isEmpty(hostToRegistry)) {
			hostToRegistry = NetUtils.getLocalHost();
		} else if (isInvalidLocalHost(hostToRegistry)) {
			throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY
					+ ", value:" + hostToRegistry);
		map.put(REGISTER_IP_KEY, hostToRegistry);
		ref = createProxy(map);
		serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
		// 根据服务名,从缓存中获取 ConsumerModel,并将设置 ConsumerModel 代理对象为ref
		ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
		initialized = true;

		// 分发服务引入配置初始化事件 ReferenceConfigInitializedEvent since 2.7.4
		dispatch(new ReferenceConfigInitializedEvent(this, invoker));



初始化大概逻辑就是校验及获取配置,然后根据配置创建服务引用代理,然后分发服务引用初始化的事件,因此本节主要分析服务引用创建org.apache.dubbo.config.ReferenceConfig.createProxy(Map<String, String>)源码。

private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();

private T createProxy(Map<String, String> map) {
	* 是否是本地服务引用 :如果未指定作用域,但目标服务是在同一个JVM中提供的,则希望进行本地调用(默认),我这里肯定是远程调用啦
    if (shouldJvmRefer(map)) {
        // 生成本地引用 URL,协议为 injvm
        URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
        // 调用 refer 方法构建 InjvmInvoker 实例
        invoker = REF_PROTOCOL.refer(interfaceClass, url);
        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service " + interfaceClass.getName());
    } else {
        // 远程调用

        // url 不为空,表明用户可能想进行点对点调用
        if (url != null && url.length() > 0) {
            // 当需要配置多个 url 时,可用分号(;)进行分割,这里会进行切分
            String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    // url路径为空
                    if (StringUtils.isEmpty(url.getPath())) {
                        // 设置接口全限定名为 url 路径
                        url = url.setPath(interfaceName);
                    // 检测 url 协议是否为 registry
                    if (UrlUtils.isRegistry(url)) {
                        // 若是,表明用户想使用指定的注册中心,将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
                        urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
                        // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
                        // 最后将合并后的配置设置为 url 查询字符串中。
                        urls.add(ClusterUtils.mergeUrl(url, map));

        } else {
            // 如果协议不是injvm
            if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                // 校验注册中心合法性
                // 加载注册中心地址
                List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                if (CollectionUtils.isNotEmpty(us)) {
                    for (URL u : us) {
                        // 加载监控地址,不为空则放入map中
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                        if (monitorUrl != null) {
                            map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        // 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
                        urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                // 未配置注册中心,抛出异常
                if (urls.isEmpty()) {
                    throw new IllegalStateException(
                        "No such any registry to reference " + interfaceName + " on the consumer "
                        + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()
                        + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
        // 单个注册中心或服务提供者(服务直连,下同)
        if (urls.size() == 1) {
            // 调用 RegistryProtocol 的 refer 构建 Invoker 实例 (我的是执行这个)
            invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
        } else {
            // 多个注册中心或多个服务提供者,或者两者混合
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            // 遍历所有的注册中心或者服务提供者地址
            for (URL url : urls) {
                // 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时
                // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
                invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                if (UrlUtils.isRegistry(url)) {
                    // 如果是注册中心地址,则将url赋值给注册中心
                    registryURL = url;
            if (registryURL != null) {
                // 注册表url可用于多订阅方案,默认情况下使用“区域感知”策略(ZoneAwareCluster)
                URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
                // invoker包装层级如下:
                // ZoneAwareClusterInvoker(StaticDirectory) ->
                // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                invoker = CLUSTER.join(new StaticDirectory(u, invokers));
            } else { // 没有注册中心地址则是直连,之间创建StaticDirectory
                invoker = CLUSTER.join(new StaticDirectory(invokers));
    // 需要进行可用性检查 并且Invoker不可用的时候,抛出异常
    if (shouldCheck() && !invoker.isAvailable()) {
        // invoker 执行销毁逻辑,并抛出异常
        throw new IllegalStateException("Failed to check the status of the service " + interfaceName
                                        + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName
                                        + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer "
                                        + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
    if (logger.isInfoEnabled()) {
        logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
    String metadata = map.get(METADATA_KEY);
    WritableMetadataService metadataService = WritableMetadataService
        .getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
    if (metadataService != null) {
        URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
    // 通过代理工厂创建代理
    return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));

protected boolean shouldJvmRefer(Map<String, String> map) {
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    boolean isJvmRefer;
    if (isInjvm() == null) {
        // if a url is specified, don't do local reference
        if (url != null && url.length() > 0) {
            isJvmRefer = false;
        } else {
            // by default, reference local service if there is
            isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
    } else {
        isJvmRefer = isInjvm();
    return isJvmRefer;

1.4、创建 Invoker

通过上面createProxy的源码分析可知,创建代理的核心逻辑在调用org.apache.dubbo.rpc.Protocol.refer(Class, URL)生成Invoker及最后调用代理工厂生成代理,因此本节先分析创建Invoker的源码。InvokerDubbo 的核心模型,代表一个可执行体。

在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。Protocol 实现类有很多,首先看下org.apache.dubbo.registry.integration.RegistryProtocol.refer(Class, URL)

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 取 registry 参数值,并将其设置为协议头
    url = getRegistryUrl(url);
    // 获取注册中心实例
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);

    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    String group = qs.get(GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            // 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑
            return doRefer(getMergeableCluster(), registry, type, url);
    // 调用 doRefer 继续执行服务引用逻辑
    return doRefer(cluster, registry, type, url);

private Cluster getMergeableCluster() {
    return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable");

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 创建 RegistryDirectory 实例
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // 设置注册中心和协议
    // 所有refer参数
    Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
    // 生成服务消费者链接
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(),
    // 是否需要注册服务消费者,在 consumers 目录下新节点
    if (directory.isShouldRegister()) {
    // 构建路由链
    // 订阅 providers、configurators、routers 等节点数据
    // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个虚拟提供者
    Invoker<T> invoker = cluster.join(directory);
    List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
    if (CollectionUtils.isEmpty(listeners)) {
        return invoker;
    RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker,
    for (RegistryProtocolListener listener : listeners) {
        listener.onRefer(this, registryInvokerWrapper);
    return registryInvokerWrapper;


上面分析了RegistryProtocol创建Invoker的过程,有兴趣的也可以看看DubboProtocol 创建Invoker过程。接下来分析创建代理过程的源码,首先是org.apache.dubbo.rpc.proxy.AbstractProxyFactory.getProxy(Invoker, boolean)

private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

private static final Class<?>[] INTERNAL_INTERFACES = new Class<?>[]{
            EchoService.class, Destroyable.class

public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
    Set<Class<?>> interfaces = new HashSet<>();
    // 获取接口地址,参数名interfaces
    String config = invoker.getUrl().getParameter(INTERFACES);
    if (config != null && config.length() > 0) {
        // 逗号切分接口列表
        String[] types = COMMA_SPLIT_PATTERN.split(config);
        for (String type : types) {
            // 反射加载接口类

    if (generic) {// 是泛化服务
        // 如果Invoker的接口不是GenericService的子接口,则像接口中添加一个子接口到接口集合中
        if (!GenericService.class.isAssignableFrom(invoker.getInterface())) {

        try {
            // 从url中找到真正的接口即interface参数的值,然后反射获取接口类存入接口集合中
            String realInterface = invoker.getUrl().getParameter(Constants.INTERFACE);
        } catch (Throwable e) {
            // ignore
    // 将Invoker的接口放入接口集合中
    // 将 EchoService.class, Destroyable.class放入接口集合中
    // 调用重载方法,又子类实现
    return getProxy(invoker, interfaces.toArray(new Class<?>[0]));


public class JavassistProxyFactory extends AbstractProxyFactory {

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
public static Proxy getProxy(Class<?>... ics) {
    // 调用重载
    return getProxy(ClassUtils.getClassLoader(Proxy.class), ics);

public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
    // 接口数量限制65535
    if (ics.length > MAX_PROXY_COUNT) {
        throw new IllegalArgumentException("interface limit exceeded");

    StringBuilder sb = new StringBuilder();
    // 遍历接口列表
    for (int i = 0; i < ics.length; i++) {
        String itf = ics[i].getName();
        // 不是接口类型,则抛出异常
        if (!ics[i].isInterface()) {
            throw new RuntimeException(itf + " is not a interface.");

        Class<?> tmp = null;
        try {
            // 反射重新加载接口类
            tmp = Class.forName(itf, false, cl);
        } catch (ClassNotFoundException e) {
        // 检测接口是否相同,这里 tmp 有可能为空,不同则抛出异常
        if (tmp != ics[i]) {
            throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
        // 拼接接口全限定名,分隔符为 ;

    // 使用拼接后的接口名作为 key
    String key = sb.toString();

    // 根据类加载器获取缓存
    final Map<String, Object> cache;
    synchronized (PROXY_CACHE_MAP) {
        cache = PROXY_CACHE_MAP.computeIfAbsent(cl, k -> new HashMap<>());

    Proxy proxy = null;
    synchronized (cache) {
        do {
            // 从缓存中获取 Reference<Proxy> 实例,渠道且是Reference实例则强转返回
            Object value = cache.get(key);
            if (value instanceof Reference<?>) {
                proxy = (Proxy) ((Reference<?>) value).get();
                if (proxy != null) {
                    return proxy;
            // 如果是挂起生成标记,则调用线程wait方法,保证只有一个线程操作
            if (value == PENDING_GENERATION_MARKER) {
                try {
                    // 其他线程在此处进行等待
                } catch (InterruptedException e) {
            } else {
                // 放置标志位到缓存中,并跳出 while 循环进行后续操作
                cache.put(key, PENDING_GENERATION_MARKER);
        } while (true);
    // 代理类计数器加1
    long id = PROXY_CLASS_COUNTER.getAndIncrement();
    String pkg = null;
    ClassGenerator ccp = null, ccm = null;
    try {
        // 创建 ClassGenerator 对象
        ccp = ClassGenerator.newInstance(cl);

        Set<String> worked = new HashSet<>();
        List<Method> methods = new ArrayList<>();

        for (int i = 0; i < ics.length; i++) {
            // 检测接口访问级别是否为 protected 或 privete
            if (!Modifier.isPublic(ics[i].getModifiers())) {
                // 获取接口包名
                String npkg = ics[i].getPackage().getName();
                if (pkg == null) {
                    pkg = npkg;
                } else {
                    // 非 public 级别的接口必须在同一个包下,否者抛出异常
                    if (!pkg.equals(npkg)) {
                        throw new IllegalArgumentException("non-public interfaces from different packages");
            // 添加接口到 ClassGenerator 中
            // 遍历接口方法
            for (Method method : ics[i].getMethods()) {
                // 获取方法描述,可理解为方法签名
                String desc = ReflectUtils.getDesc(method);
                // 如果方法描述字符串已在 worked 中或者是静态方法,则忽略。
                if (worked.contains(desc) || Modifier.isStatic(method.getModifiers())) {
                // 如果是接口并且是静态方法,则忽略
                if (ics[i].isInterface() && Modifier.isStatic(method.getModifiers())) {
                // 将签名存入worked中
                // 方法个数
                int ix = methods.size();
                // 方法返回值类型
                Class<?> rt = method.getReturnType();
                // 方法参数类型
                Class<?>[] pts = method.getParameterTypes();
                // 生成 Object[] args = new Object[pts.length]
                StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length)
                for (int j = 0; j < pts.length; j++) {
                    // 生成 args[0] =($W)$1;
                    // 生成 args[1] =($W)$2;
                    // ......
                    code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                // 生成 InvokerHandler 接口的 invoker 方法调用语句,如下:
                // Object ret = handler.invoke(this, methods[ix], args);
                code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);");
                if (!Void.TYPE.equals(rt)) {// 返回值类型不是void
                    // 生成返回语句,形如 return (java.lang.String) ret;
                    code.append(" return ").append(asArgument(rt, "ret")).append(";");

                // 添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中
                ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(),
        // 包名为空,设置为Proxy的包名
        if (pkg == null) {
            pkg = PACKAGE_NAME;

        // 构建接口代理类名称:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
        String pcn = pkg + ".proxy" + id;
        ccp.addField("public static java.lang.reflect.Method[] methods;");
        // 生成 private java.lang.reflect.InvocationHandler handler;
        ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
        // 生成构造函数
        // porxy0(java.lang.reflect.InvocationHandler arg0) {
        // handler=$1;
        // }
        ccp.addConstructor(Modifier.PUBLIC, new Class<?>[] { InvocationHandler.class }, new Class<?>[0],
        // 添加无参构造
        // 生成接口代理类
        Class<?> clazz = ccp.toClass();
        clazz.getField("methods").set(null, methods.toArray(new Method[0]));

        // 构建 Proxy 子类名称,比如 Proxy1 等
        String fcn = Proxy.class.getName() + id;
        // 创建cl类实例
        ccm = ClassGenerator.newInstance(cl);
        // 设置实例名称
        // 添加无参构造
        // 设置父类
        // 生成获取实例方法
        // public Object newInstance(java.lang.reflect.InvocationHandler h) {
        // return new org.apache.dubbo.proxy0($1);
        // }
        ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn
                      + "($1); }");
        // 生成 Proxy 实现类
        Class<?> pc = ccm.toClass();
        // 通过反射创建 Proxy 实现类实例
        proxy = (Proxy) pc.newInstance();
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        // 释放类生成器资源 ClassGenerator
        if (ccp != null) {
        if (ccm != null) {
        synchronized (cache) {
            // 代理为空即生成失败,移除缓存
            if (proxy == null) {
            } else {
                // 生成代理成功,虚引用存入缓存
                cache.put(key, new WeakReference<Proxy>(proxy));
            // 通知等待线程
    return proxy;


package org.apache.dubbo.common.bytecode;

import com.alibaba.dubbo.rpc.service.EchoService;
import com.healerjean.proj.dto.UserDTO;
import com.healerjean.proj.service.ProviderDubboService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import org.apache.dubbo.common.bytecode.ClassGenerator.DC;
import org.apache.dubbo.rpc.service.Destroyable;

public class proxy0 implements DC, Destroyable, EchoService, ProviderDubboService {
    public static Method[] methods;
    private InvocationHandler handler;

    public proxy0(InvocationHandler var1) {
        this.handler = var1;

    public proxy0() {
    //服务端的方法,最终那其实是通过调用 InvocationHandler 进行调用远程
    public UserDTO connect(String var1) {
        Object[] var2 = new Object[]{var1};
        Object var3 = this.handler.invoke(this, methods[1], var2);
        return (UserDTO)var3;

    public Object $echo(Object var1) {
        Object[] var2 = new Object[]{var1};
        Object var3 = this.handler.invoke(this, methods[0], var2);
        return (Object)var3;

    public void $destroy() {
        Object[] var1 = new Object[0];
        this.handler.invoke(this, methods[2], var1);


package org.apache.dubbo.rpc.proxy;

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ConsumerModel;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

 * InvokerHandler
public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
    private final Invoker<?> invoker;
    private ConsumerModel consumerModel;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
        // healerjean/com.healerjean.proj.service.ProviderDubboService:0.1
        String serviceKey = invoker.getUrl().getServiceKey();
        if (serviceKey != null) {
            this.consumerModel = ApplicationModel.getConsumerModel(serviceKey);

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        //RPC调用 对象,组装接口,方法名,参数
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
        String serviceKey = invoker.getUrl().getServiceKey();
        if (consumerModel != null) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
            rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));

        //先试用MockClusterInvoker  执行调用逻辑
        return invoker.invoke(rpcInvocation).recreate();


2.3、MockClusterInvoker invoke#

    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
            // 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法,
            // 比如 AbstractClusterInvoker 我这里是,但是并不重要
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
            //force:direct mock 直接执行 mock 逻辑,不发起远程调用
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常
            try {
                result = this.invoker.invoke(invocation);

                if(result.getException() != null && result.getException() instanceof RpcException){
                    RpcException rpcException= (RpcException)result.getException();
                        throw  rpcException;
                    }else { 
                        // 调用失败,执行 mock 逻辑
                        result = doMockInvoke(invocation, rpcException);

            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;

                if (logger.isWarnEnabled()) {
                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
                result = doMockInvoke(invocation, e);
        return result;



    public Result invoke(Invocation inv) throws RpcException {
        // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
        if (destroyed.get()) {
            logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
        RpcInvocation invocation = (RpcInvocation) inv;
        if (CollectionUtils.isNotEmptyMap(attachment)) {

        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {

        //设置调用模式 我这里是SYNC 也就是同步调用
        invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        AsyncRpcResult asyncResult;
        try {
             // 抽象方法,由子类实现
            asyncResult = (AsyncRpcResult) doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            Throwable te = e.getTargetException();
            if (te == null) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
        } catch (RpcException e) {
            if (e.isBiz()) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                throw e;
        } catch (Throwable e) {
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
        return asyncResult;




    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        // 设置 path (com.healerjean.proj.service.ProviderDubboService) 和 version (0.1) 到 attachment 中
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        try {
            // isOneway 为 true,表示“单向”通信
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            //默认3000 3s超时
            int timeout = calculateTimeout(invocation, methodName);
             // 我的是false
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                return result;
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);




final class ReferenceCountExchangeClient implements ExchangeClient {

    private final URL url;
    private final AtomicInteger referenceCount = new AtomicInteger(0);

    private ExchangeClient client;

    public ReferenceCountExchangeClient(ExchangeClient client) {
        this.client = client;
        // 引用计数自增
        this.url = client.getUrl();

    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        return client.request(request, timeout, executor);

ReferenceCountExchangeClient 内部定义了一个引用计数变量 referenceCount,每当该对象被引用一次 referenceCount 都会进行自增。每当 close 方法被调用时,referenceCount 进行自减。

ReferenceCountExchangeClient 内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象的相关方法。所以这里就不多说了,继续向下分析,这次是 HeaderExchangeClient。


public class HeaderExchangeClient implements ExchangeClient {

    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        return channel.request(request, timeout, executor);


即调用 HeaderExchangeChannel 对象的同签名方法。

HeaderExchangeClient 有什么用处呢?答案是封装了一些关于心跳检测的逻辑。心跳检测并非本文所关注的点,因此就不多说了,继续向下看。

final class HeaderExchangeChannel implements ExchangeChannel {

    private final Channel channel;

    HeaderExchangeChannel(Channel channel) {
        if (channel == null) {
            throw new IllegalArgumentException("channel == null");

        // 这里的 channel 指向的是 NettyClient
        this.channel = channel;

    public ResponseFuture request(Object request) throws RemotingException {
        return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(..., "Failed to send request ...);
        // 创建 Request 对象
        Request req = new Request();
        // 设置双向通信标志为 true
        // 这里的 request 变量类型为 RpcInvocation

        // 创建 DefaultFuture 对象
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            // 调用 NettyClient 的 send 方法发送请求
        } catch (RemotingException e) {
            throw e;
        // 返回 DefaultFuture 对象
        return future;

上面的方法首先定义了一个 Request 对象,然后再将该对象传给 NettyClient send 方法,进行后续的调用。需要说明的是,NettyClient 中并未实现 send 方法,该方法继承自父类 AbstractPeer,下面直接分析 AbstractPeer 的代码。


public abstract class AbstractPeer implements Endpoint, ChannelHandler {

    public void send(Object message) throws RemotingException {
        // 该方法由 AbstractClient 类实现
        send(message, url.getParameter(Constants.SENT_KEY, false));

    // 省略其他方法

public abstract class AbstractClient extends AbstractEndpoint implements Client {

    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {

        // 获取 Channel,getChannel 是一个抽象方法,具体由子类实现
        Channel channel = getChannel();
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send ...");

        // 继续向下调用
        channel.send(message, sent);

    protected abstract Channel getChannel();

    // 省略其他方法

Dubbo 使用 Netty 作为底层的通信框架,因此下面我们到 NettyClient 类中看一下 getChannel 方法的实现逻辑

public class NettyClient extends AbstractClient {

    // 这里的 Channel 全限定名称为 org.jboss.netty.channel.Channel
    private volatile Channel channel;

    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isConnected())
            return null;
        // 获取一个 NettyChannel 类型对象
        return NettyChannel.getOrAddChannel(c, getUrl(), this);

final class NettyChannel extends AbstractChannel {

    private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = 
        new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();

    private final org.jboss.netty.channel.Channel channel;

    /** 私有构造方法 */
    private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
        super(url, handler);
        if (channel == null) {
            throw new IllegalArgumentException("netty channel == null;");
        this.channel = channel;

    static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;

        // 尝试从集合中获取 NettyChannel 实例
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            // 如果 ret = null,则创建一个新的 NettyChannel 实例
            NettyChannel nc = new NettyChannel(ch, url, handler);
            if (ch.isConnected()) {
                // 将 <Channel, NettyChannel> 键值对存入 channelMap 集合中
                ret = channelMap.putIfAbsent(ch, nc);
            if (ret == null) {
                ret = nc;
        return ret;

获取到 NettyChannel 实例后,即可进行后续的调用。下面看一下 NettyChannelsend 方法。

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 发送消息(包含请求和响应消息)
        ChannelFuture future = channel.write(message);

        // sent 的值源于 <dubbo:method sent="true/false" /> 中 sent 的配置值,有两种配置值:
        //   1. true: 等待消息发出,消息发送失败将抛出异常
        //   2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回
        // 默认情况下 sent = false;
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 等待消息发出,若在规定时间没能发出,success 会被置为 false
            success = future.await(timeout);
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message ...");

    // 若 success 为 false,这里抛出异常
    if (!success) {
        throw new RemotingException(this, "Failed to send message ...");
  > InvokerInvocationHandler#invoke(Object, Method, Object[])
    > MockClusterInvoker#invoke(Invocation)
      > AbstractClusterInvoker#invoke(Invocation)
        > FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          > Filter#invoke(Invoker, Invocation)  // 包含多个 Filter 调用
            > ListenerInvokerWrapper#invoke(Invocation) 
              > AbstractInvoker#invoke(Invocation) 
                > DubboInvoker#doInvoke(Invocation)
                  > ReferenceCountExchangeClient#request(Object, int)
                    > HeaderExchangeClient#request(Object, int)
                      > HeaderExchangeChannel#request(Object, int)
                        > AbstractPeer#send(Object)
                          > AbstractClient#send(Object, boolean)
                            > NettyChannel#send(Object, boolean)
                              > NioClientSocketChannel#write(Object)
