前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

Dubbo 服务引用的时机有两个,

第一个是在 Spring 容器调用 ReferenceBeanafterPropertiesSet 方法时引用服务,属于饿汉式的,当我们消费者代码中要使用dubbo服务提供者的时候的时候,使用注解 @DubboReference ,则会开启,因为这个时候springset代码执行完成了,此时需要注入的就是dubbo的服务了 ,我下文即使以这种方式介绍的。

第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用。属于懒汉式当我们的服务被注入到其他类中时,Spring 会第一时间调用 getObject 方法 并由该方法执行服务引用逻辑,默认情况下,Dubbo 使用懒汉式引用服务

下面我们按照 Dubbo 饿汉式进行分析,整个分析过程从 ReferenceBeanafterPropertiesSet 方法开始。

。按照惯例,在进行具体工作之前,需先进行配置检查与收集工作。接着根据收集到的信息决定服务用的方式,一共有三种

第一种是引用本地 (JVM) 服务

第二是通过直连方式引用远程服务

第三是通过注册中心引用远程服务

不管是哪种引用方式,最后都会得到一个 Invoker 实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker 实例

此时需要通过集群管理类 Cluster 将多个 Invoker 合并成一个实例。合并后的 Invoker 实例已经具备调用本地或远程服务的能力了,但并不能将此实例暴露给用户使用,这会对用户业务代码造成侵入。此时框架还需要通过代理工厂类 (ProxyFactory) 为服务接口生成代理类,并让代理类去调用 Invoker 逻辑。避免了 Dubbo 框架代码对业务代码的侵入,同时也让框架更容易使用。

1、服务导入

1.1、入口 #afterPropertiesSet

public void afterPropertiesSet() throws Exception {

    // 准备dubbo配置类的bean
    prepareDubboConfigBeans();

    // 模式是懒汉式初始化,这里舒适进来是null
    if (init == null) {
        init = false;
    }

    // 判断是否需要初始化,如果需要的话,执行懒汉式的逻辑。进行调用
    if (shouldInit()) {
        getObject();
    }
}


private void prepareDubboConfigBeans() {
    beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, ConfigCenterBean.class);
    beansOfTypeIncludingAncestors(applicationContext, MetadataReportConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, MetricsConfig.class);
    beansOfTypeIncludingAncestors(applicationContext, SslConfig.class);
}



@Override
public Object getObject() {
    return get();
}


public synchronized T get() {
    // 已经调用过销毁方法,则抛出异常
    if (destroyed) {
        throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    }
    
    // 检测 ref 是否为空,为空则通过 init 方法创建(要用于处理配置,以及调用 createProxy 生成代理类)
    if (ref == null) {
        init();
    }
    return ref;
}


1.2、#init初始化

	public synchronized void init() {
		//已经初始化则返回,避免重复操作
		if (initialized) {
			return;
		}
        
        //我到了这里其实是null,也就是说dubbo还没有启动。,主要是初始化一些东西
		//主要是通过SPI方式获取各种配置管理实例,然后调用初始化方法,初始化配置、监听器等
		if (bootstrap == null) {
			bootstrap = DubboBootstrap.getInstance();
			bootstrap.init();
		}
        
        
		// 检查每个配置模块是否正确创建,并在必要时重写其属性。
		checkAndUpdateSubConfigs();
        
		// 校验本地存根合法性
		checkStubAndLocal(interfaceClass);
        
		//校验mock配置合法性
		ConfigValidationUtils.checkMock(interfaceClass, this);
        
		// 添加配置<side,consumer>
		Map<String, String> map = new HashMap<String, String>();
		map.put(SIDE_KEY, CONSUMER_SIDE);
        
		// 添加版本、时间戳等运行时参数
		ReferenceConfigBase.appendRuntimeParameters(map);
        
		//不是泛化服务(泛化就是服务消费者并没有服务的接口)
		if (!ProtocolUtils.isGeneric(generic)) {
			// 获取版本,校验存入map
			String revision = Version.getVersion(interfaceClass, version);
			if (revision != null && revision.length() > 0) {
				map.put(REVISION_KEY, revision);
			}
			//获取包装类所有方法名称,放入map中,如果没有则放入<methods,*>
			String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
			if (methods.length == 0) {
				logger.warn("No method found in service interface " + interfaceClass.getName());
				map.put(METHODS_KEY, ANY_VALUE);
			} else {
				map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
			}
		}
		// 接口放入map
		map.put(INTERFACE_KEY, interfaceName);
		// 通过getter或者getParameters方法将MetricsConfig、ApplicationConfig和ModuleConfig属性放到map中
		AbstractConfig.appendParameters(map, getMetrics());
		AbstractConfig.appendParameters(map, getApplication());
		AbstractConfig.appendParameters(map, getModule());
		// 通过getter或者getParameters方法将ConsumerConfig和ReferenceConfig属性放到map中
		AbstractConfig.appendParameters(map, consumer);
		AbstractConfig.appendParameters(map, this);
		// 元数据报告配置不为空且合法,则设置<metadata,remote>到map中
		MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
		if (metadataReportConfig != null && metadataReportConfig.isValid()) {
			map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
		}
		//存放异步方法信息
		Map<String, AsyncMethodInfo> attributes = null;
		if (CollectionUtils.isNotEmpty(getMethods())) {
			attributes = new HashMap<>();
			//遍历所有的方法配置存入map中
			for (MethodConfig methodConfig : getMethods()) {
				AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
				String retryKey = methodConfig.getName() + ".retry";
				if (map.containsKey(retryKey)) {
					//如果存在methodname.retry则移除
					String retryValue = map.remove(retryKey);
					//如果retry对应的是false,则设置retries值为0
					if ("false".equals(retryValue)) {
						map.put(methodConfig.getName() + ".retries", "0");
					}
				}
				//提取方法信息中的异步方法信息,存入map
				AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
				if (asyncMethodInfo != null) {
//                    consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
					attributes.put(methodConfig.getName(), asyncMethodInfo);
				}
			}
		}
		//获取注册中心IP地址
		String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
		if (StringUtils.isEmpty(hostToRegistry)) {
			//配置为空则取本机地址
			hostToRegistry = NetUtils.getLocalHost();
		} else if (isInvalidLocalHost(hostToRegistry)) {
			//是无效的本地主机,则抛出异常
			throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY
					+ ", value:" + hostToRegistry);
		}
        
        
		map.put(REGISTER_IP_KEY, hostToRegistry);
		//将上面的配置信息添加到元数据附属信息中
		serviceMetadata.getAttachments().putAll(map);
        
        
        
		//根据配置创建代理
		ref = createProxy(map);
        
        
        
		//服务元数据中添加目标对象配置
		serviceMetadata.setTarget(ref);
		serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
		// 根据服务名,从缓存中获取 ConsumerModel,并将设置 ConsumerModel 代理对象为ref
		ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
		consumerModel.setProxyObject(ref);
		//将异步方法信息存入ConsumerModel并初始化
		consumerModel.init(attributes);
		//设置初始化完成
		initialized = true;

		// 分发服务引入配置初始化事件 ReferenceConfigInitializedEvent since 2.7.4
		dispatch(new ReferenceConfigInitializedEvent(this, invoker));
	}

image-20201111165750030

1.3、引用服务创建#createProxy

初始化大概逻辑就是校验及获取配置,然后根据配置创建服务引用代理,然后分发服务引用初始化的事件,因此本节主要分析服务引用创建org.apache.dubbo.config.ReferenceConfig.createProxy(Map<String, String>)源码。

private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();


private T createProxy(Map<String, String> map) {
    /*
	* 是否是本地服务引用 :如果未指定作用域,但目标服务是在同一个JVM中提供的,则希望进行本地调用(默认),我这里肯定是远程调用啦
	*/
    if (shouldJvmRefer(map)) {
        // 生成本地引用 URL,协议为 injvm
        URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
        // 调用 refer 方法构建 InjvmInvoker 实例
        invoker = REF_PROTOCOL.refer(interfaceClass, url);
        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service " + interfaceClass.getName());
        }
    } else {
        // 远程调用

        urls.clear();
        // url 不为空,表明用户可能想进行点对点调用
        if (url != null && url.length() > 0) {
            // 当需要配置多个 url 时,可用分号(;)进行分割,这里会进行切分
            String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    // url路径为空
                    if (StringUtils.isEmpty(url.getPath())) {
                        // 设置接口全限定名为 url 路径
                        url = url.setPath(interfaceName);
                    }
                    // 检测 url 协议是否为 registry
                    if (UrlUtils.isRegistry(url)) {
                        // 若是,表明用户想使用指定的注册中心,将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
                        urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
                        // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
                        // 最后将合并后的配置设置为 url 查询字符串中。
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }

            //我执行走是这个
        } else {
            // 如果协议不是injvm
            if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                // 校验注册中心合法性
                checkRegistry();
                // 加载注册中心地址
                //registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-consumer&dubbo=2.0.2&pid=1504&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40890&registry=zookeeper&release=2.7.7&timestamp=1605086409440
                List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                if (CollectionUtils.isNotEmpty(us)) {
                    for (URL u : us) {
                        // 加载监控地址,不为空则放入map中
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                        if (monitorUrl != null) {
                            map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
                        //registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-consumer&dubbo=2.0.2&pid=1504&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40890&refer=application%3Dhlj-server-consumer%26dubbo%3D2.0.2%26group%3Dhealerjean%26init%3Dfalse%26interface%3Dcom.healerjean.proj.service.ProviderDubboService%26methods%3Dconnect%26pid%3D1504%26qos.accept.foreign.ip%3Dfalse%26qos.enable%3Dtrue%26qos.port%3D40890%26register.ip%3D10.236.122.217%26release%3D2.7.7%26revision%3D0.1%26side%3Dconsumer%26sticky%3Dfalse%26timeout%3D3000%26timestamp%3D1605085675160%26version%3D0.1&registry=zookeeper&release=2.7.7&timestamp=1605086409440
                        urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                // 未配置注册中心,抛出异常
                if (urls.isEmpty()) {
                    throw new IllegalStateException(
                        "No such any registry to reference " + interfaceName + " on the consumer "
                        + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()
                        + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }
        }
        // 单个注册中心或服务提供者(服务直连,下同)
        if (urls.size() == 1) {
            // 调用 RegistryProtocol 的 refer 构建 Invoker 实例 (我的是执行这个)
            invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
        } else {
            // 多个注册中心或多个服务提供者,或者两者混合
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            // 遍历所有的注册中心或者服务提供者地址
            for (URL url : urls) {
                // 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时
                // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
                invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                if (UrlUtils.isRegistry(url)) {
                    // 如果是注册中心地址,则将url赋值给注册中心
                    registryURL = url;
                }
            }
            if (registryURL != null) {
                // 注册表url可用于多订阅方案,默认情况下使用“区域感知”策略(ZoneAwareCluster)
                URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
                // invoker包装层级如下:
                // ZoneAwareClusterInvoker(StaticDirectory) ->
                // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                invoker = CLUSTER.join(new StaticDirectory(u, invokers));
            } else { // 没有注册中心地址则是直连,之间创建StaticDirectory
                invoker = CLUSTER.join(new StaticDirectory(invokers));
            }
        }
    }
    
    
    // 需要进行可用性检查 并且Invoker不可用的时候,抛出异常
    if (shouldCheck() && !invoker.isAvailable()) {
        // invoker 执行销毁逻辑,并抛出异常
        invoker.destroy();
        throw new IllegalStateException("Failed to check the status of the service " + interfaceName
                                        + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName
                                        + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer "
                                        + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
    }
    if (logger.isInfoEnabled()) {
        logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
    }
   
    String metadata = map.get(METADATA_KEY);
    WritableMetadataService metadataService = WritableMetadataService
        .getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
    if (metadataService != null) {
        URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
        metadataService.publishServiceDefinition(consumerURL);
    }
    
    
    
    
    // 通过代理工厂创建代理
    return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}






protected boolean shouldJvmRefer(Map<String, String> map) {
    //temp://localhost?application=hlj-server-consumer&dubbo=2.0.2&group=healerjean&init=false&interface=com.healerjean.proj.service.ProviderDubboService&methods=connect&pid=1504&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40890&register.ip=10.236.122.217&release=2.7.7&revision=0.1&side=consumer&sticky=false&timeout=3000&timestamp=1605085675160&version=0.1
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    boolean isJvmRefer;
    if (isInjvm() == null) {
        // if a url is specified, don't do local reference
        if (url != null && url.length() > 0) {
            isJvmRefer = false;
        } else {
            // by default, reference local service if there is
            isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
        }
    } else {
        isJvmRefer = isInjvm();
    }
    return isJvmRefer;
}

1.4、创建 Invoker

通过上面createProxy的源码分析可知,创建代理的核心逻辑在调用org.apache.dubbo.rpc.Protocol.refer(Class, URL)生成Invoker及最后调用代理工厂生成代理,因此本节先分析创建Invoker的源码。InvokerDubbo 的核心模型,代表一个可执行体。

在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。Protocol 实现类有很多,首先看下org.apache.dubbo.registry.integration.RegistryProtocol.refer(Class, URL)

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 取 registry 参数值,并将其设置为协议头
    //zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-consumer&dubbo=2.0.2&pid=1504&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40890&refer=application%3Dhlj-server-consumer%26dubbo%3D2.0.2%26group%3Dhealerjean%26init%3Dfalse%26interface%3Dcom.healerjean.proj.service.ProviderDubboService%26methods%3Dconnect%26pid%3D1504%26qos.accept.foreign.ip%3Dfalse%26qos.enable%3Dtrue%26qos.port%3D40890%26register.ip%3D10.236.122.217%26release%3D2.7.7%26revision%3D0.1%26side%3Dconsumer%26sticky%3Dfalse%26timeout%3D3000%26timestamp%3D1605085675160%26version%3D0.1&release=2.7.7&timestamp=1605086409440
    url = getRegistryUrl(url);
    // 获取注册中心实例
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        //如果是RegistryService类型,则直接调用代理工厂返回的Invoker
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    //将url中的refer参数值转为map
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    String group = qs.get(GROUP_KEY);
    //如果指定了分组参数
    if (group != null && group.length() > 0) {
        //且包含两个以上分组包括任意分组
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            // 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    // 调用 doRefer 继续执行服务引用逻辑
    return doRefer(cluster, registry, type, url);
}








private Cluster getMergeableCluster() {
    return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable");
}


private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 创建 RegistryDirectory 实例
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // 设置注册中心和协议
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // 所有refer参数
    Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
    // 生成服务消费者链接
    //consumer://10.236.122.217/com.healerjean.proj.service.ProviderDubboService?application=hlj-server-consumer&dubbo=2.0.2&group=healerjean&init=false&interface=com.healerjean.proj.service.ProviderDubboService&methods=connect&pid=1504&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40890&release=2.7.7&revision=0.1&side=consumer&sticky=false&timeout=3000&timestamp=1605085675160&version=0.1
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(),
                               parameters);
   
    
    // 是否需要注册服务消费者,在 consumers 目录下新节点
    if (directory.isShouldRegister()) {
        directory.setRegisteredConsumerUrl(subscribeUrl);
        registry.register(directory.getRegisteredConsumerUrl());
    }
    // 构建路由链
    directory.buildRouterChain(subscribeUrl);
    // 订阅 providers、configurators、routers 等节点数据
    directory.subscribe(toSubscribeUrl(subscribeUrl));
    
    // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个虚拟提供者
    Invoker<T> invoker = cluster.join(directory);
    
    //通过SPI获取RegistryProtocolListener实例
    List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
    if (CollectionUtils.isEmpty(listeners)) {
        //没有监听器,直接返货Invoker
        return invoker;
    }
    //创建包装类RegistryInvokerWrapper实例
    RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker,
                                                                                    subscribeUrl);
    //执行调用监听通知方法
    for (RegistryProtocolListener listener : listeners) {
        listener.onRefer(this, registryInvokerWrapper);
    }
    //返回invoker包装类
    return registryInvokerWrapper;
}

1.5、通过代理工厂创建代理

上面分析了RegistryProtocol创建Invoker的过程,有兴趣的也可以看看DubboProtocol 创建Invoker过程。接下来分析创建代理过程的源码,首先是org.apache.dubbo.rpc.proxy.AbstractProxyFactory.getProxy(Invoker, boolean)

private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

private static final Class<?>[] INTERNAL_INTERFACES = new Class<?>[]{
            EchoService.class, Destroyable.class
    };

public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
    Set<Class<?>> interfaces = new HashSet<>();
    // 获取接口地址,参数名interfaces
    String config = invoker.getUrl().getParameter(INTERFACES);
    if (config != null && config.length() > 0) {
        // 逗号切分接口列表
        String[] types = COMMA_SPLIT_PATTERN.split(config);
        for (String type : types) {
            // 反射加载接口类
            interfaces.add(ReflectUtils.forName(type));
        }
    }

    //我的是false,所以不进入
    if (generic) {// 是泛化服务
        // 如果Invoker的接口不是GenericService的子接口,则像接口中添加一个子接口到接口集合中
        if (!GenericService.class.isAssignableFrom(invoker.getInterface())) {
            interfaces.add(com.alibaba.dubbo.rpc.service.GenericService.class);
        }

        try {
            // 从url中找到真正的接口即interface参数的值,然后反射获取接口类存入接口集合中
            String realInterface = invoker.getUrl().getParameter(Constants.INTERFACE);
            interfaces.add(ReflectUtils.forName(realInterface));
        } catch (Throwable e) {
            // ignore
        }
    }
    
    
    
    // 将Invoker的接口放入接口集合中
    interfaces.add(invoker.getInterface());
    // 将 EchoService.class, Destroyable.class放入接口集合中
    interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES));
    // 调用重载方法,又子类实现
    return getProxy(invoker, interfaces.toArray(new Class<?>[0]));
}

image-20201111175521596

public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
    
    //省略,是不是很熟悉
    
}
public static Proxy getProxy(Class<?>... ics) {
    // 调用重载
    return getProxy(ClassUtils.getClassLoader(Proxy.class), ics);
}


public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
    // 接口数量限制65535
    if (ics.length > MAX_PROXY_COUNT) {
        throw new IllegalArgumentException("interface limit exceeded");
    }

    StringBuilder sb = new StringBuilder();
    // 遍历接口列表
    for (int i = 0; i < ics.length; i++) {
        String itf = ics[i].getName();
        // 不是接口类型,则抛出异常
        if (!ics[i].isInterface()) {
            throw new RuntimeException(itf + " is not a interface.");
        }

        Class<?> tmp = null;
        try {
            // 反射重新加载接口类
            tmp = Class.forName(itf, false, cl);
        } catch (ClassNotFoundException e) {
        }
        // 检测接口是否相同,这里 tmp 有可能为空,不同则抛出异常
        if (tmp != ics[i]) {
            throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
        }
        // 拼接接口全限定名,分隔符为 ;
        sb.append(itf).append(';');
    }

    // 使用拼接后的接口名作为 key
    String key = sb.toString();

    // 根据类加载器获取缓存
    final Map<String, Object> cache;
    synchronized (PROXY_CACHE_MAP) {
        cache = PROXY_CACHE_MAP.computeIfAbsent(cl, k -> new HashMap<>());
    }

    Proxy proxy = null;
    synchronized (cache) {
        do {
            // 从缓存中获取 Reference<Proxy> 实例,渠道且是Reference实例则强转返回
            Object value = cache.get(key);
            if (value instanceof Reference<?>) {
                proxy = (Proxy) ((Reference<?>) value).get();
                if (proxy != null) {
                    return proxy;
                }
            }
            // 如果是挂起生成标记,则调用线程wait方法,保证只有一个线程操作
            if (value == PENDING_GENERATION_MARKER) {
                try {
                    // 其他线程在此处进行等待
                    cache.wait();
                } catch (InterruptedException e) {
                }
            } else {
                // 放置标志位到缓存中,并跳出 while 循环进行后续操作
                cache.put(key, PENDING_GENERATION_MARKER);
                break;
            }
        } while (true);
    }
    // 代理类计数器加1
    long id = PROXY_CLASS_COUNTER.getAndIncrement();
    String pkg = null;
    ClassGenerator ccp = null, ccm = null;
    try {
        // 创建 ClassGenerator 对象
        ccp = ClassGenerator.newInstance(cl);

        Set<String> worked = new HashSet<>();
        List<Method> methods = new ArrayList<>();

        for (int i = 0; i < ics.length; i++) {
            // 检测接口访问级别是否为 protected 或 privete
            if (!Modifier.isPublic(ics[i].getModifiers())) {
                // 获取接口包名
                String npkg = ics[i].getPackage().getName();
                if (pkg == null) {
                    pkg = npkg;
                } else {
                    // 非 public 级别的接口必须在同一个包下,否者抛出异常
                    if (!pkg.equals(npkg)) {
                        throw new IllegalArgumentException("non-public interfaces from different packages");
                    }
                }
            }
            // 添加接口到 ClassGenerator 中
            ccp.addInterface(ics[i]);
            // 遍历接口方法
            for (Method method : ics[i].getMethods()) {
                // 获取方法描述,可理解为方法签名
                String desc = ReflectUtils.getDesc(method);
                // 如果方法描述字符串已在 worked 中或者是静态方法,则忽略。
                if (worked.contains(desc) || Modifier.isStatic(method.getModifiers())) {
                    continue;
                }
                // 如果是接口并且是静态方法,则忽略
                if (ics[i].isInterface() && Modifier.isStatic(method.getModifiers())) {
                    continue;
                }
                // 将签名存入worked中
                worked.add(desc);
                // 方法个数
                int ix = methods.size();
                // 方法返回值类型
                Class<?> rt = method.getReturnType();
                // 方法参数类型
                Class<?>[] pts = method.getParameterTypes();
                // 生成 Object[] args = new Object[pts.length]
                StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length)
                    .append("];");
                for (int j = 0; j < pts.length; j++) {
                    // 生成 args[0] =($W)$1;
                    // 生成 args[1] =($W)$2;
                    // ......
                    code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                }
                // 生成 InvokerHandler 接口的 invoker 方法调用语句,如下:
                // Object ret = handler.invoke(this, methods[ix], args);
                code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);");
                if (!Void.TYPE.equals(rt)) {// 返回值类型不是void
                    // 生成返回语句,形如 return (java.lang.String) ret;
                    code.append(" return ").append(asArgument(rt, "ret")).append(";");
                }

                methods.add(method);
                // 添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中
                ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(),
                              code.toString());
            }
        }
        // 包名为空,设置为Proxy的包名
        if (pkg == null) {
            pkg = PACKAGE_NAME;
        }

        // 构建接口代理类名称:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
        String pcn = pkg + ".proxy" + id;
        ccp.setClassName(pcn);
        ccp.addField("public static java.lang.reflect.Method[] methods;");
        // 生成 private java.lang.reflect.InvocationHandler handler;
        ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
        // 生成构造函数
        // porxy0(java.lang.reflect.InvocationHandler arg0) {
        // handler=$1;
        // }
        ccp.addConstructor(Modifier.PUBLIC, new Class<?>[] { InvocationHandler.class }, new Class<?>[0],
                           "handler=$1;");
        // 添加无参构造
        ccp.addDefaultConstructor();
        // 生成接口代理类
        Class<?> clazz = ccp.toClass();
        clazz.getField("methods").set(null, methods.toArray(new Method[0]));

        // 构建 Proxy 子类名称,比如 Proxy1 等
        String fcn = Proxy.class.getName() + id;
        // 创建cl类实例
        ccm = ClassGenerator.newInstance(cl);
        // 设置实例名称
        ccm.setClassName(fcn);
        // 添加无参构造
        ccm.addDefaultConstructor();
        // 设置父类
        ccm.setSuperClass(Proxy.class);
        // 生成获取实例方法
        // public Object newInstance(java.lang.reflect.InvocationHandler h) {
        // return new org.apache.dubbo.proxy0($1);
        // }
        ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn
                      + "($1); }");
        // 生成 Proxy 实现类
        Class<?> pc = ccm.toClass();
        // 通过反射创建 Proxy 实现类实例
        proxy = (Proxy) pc.newInstance();
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        // 释放类生成器资源 ClassGenerator
        if (ccp != null) {
            ccp.release();
        }
        if (ccm != null) {
            ccm.release();
        }
        synchronized (cache) {
            // 代理为空即生成失败,移除缓存
            if (proxy == null) {
                cache.remove(key);
            } else {
                // 生成代理成功,虚引用存入缓存
                cache.put(key, new WeakReference<Proxy>(proxy));
            }
            // 通知等待线程
            cache.notifyAll();
        }
    }
    return proxy;
}

1.7、生成的代理类proxy0

package org.apache.dubbo.common.bytecode;

import com.alibaba.dubbo.rpc.service.EchoService;
import com.healerjean.proj.dto.UserDTO;
import com.healerjean.proj.service.ProviderDubboService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import org.apache.dubbo.common.bytecode.ClassGenerator.DC;
import org.apache.dubbo.rpc.service.Destroyable;

public class proxy0 implements DC, Destroyable, EchoService, ProviderDubboService {
    public static Method[] methods;
    private InvocationHandler handler;

    public proxy0(InvocationHandler var1) {
        this.handler = var1;
    }

    public proxy0() {
    }
	
    //服务端的方法,最终那其实是通过调用 InvocationHandler 进行调用远程
    public UserDTO connect(String var1) {
        Object[] var2 = new Object[]{var1};
        Object var3 = this.handler.invoke(this, methods[1], var2);
        return (UserDTO)var3;
    }

    public Object $echo(Object var1) {
        Object[] var2 = new Object[]{var1};
        Object var3 = this.handler.invoke(this, methods[0], var2);
        return (Object)var3;
    }

    public void $destroy() {
        Object[] var1 = new Object[0];
        this.handler.invoke(this, methods[2], var1);
    }
}

1.6、InvokerInvocationHandler


package org.apache.dubbo.rpc.proxy;

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ConsumerModel;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
 * InvokerHandler
 */
public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
    private final Invoker<?> invoker;
    private ConsumerModel consumerModel;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
        // healerjean/com.healerjean.proj.service.ProviderDubboService:0.1
        String serviceKey = invoker.getUrl().getServiceKey();
        if (serviceKey != null) {
            this.consumerModel = ApplicationModel.getConsumerModel(serviceKey);
        }
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                invoker.destroy();
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
            }
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        }
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
        String serviceKey = invoker.getUrl().getServiceKey();
        rpcInvocation.setTargetServiceUniqueName(serviceKey);
      
        if (consumerModel != null) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
            rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
        }

        return invoker.invoke(rpcInvocation).recreate();
    }
}

2、服务调用

会发现是我们代理类

image-20201112152717936

2.1、代理类

package org.apache.dubbo.common.bytecode;

import com.alibaba.dubbo.rpc.service.EchoService;
import com.healerjean.proj.dto.UserDTO;
import com.healerjean.proj.service.ProviderDubboService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import org.apache.dubbo.common.bytecode.ClassGenerator.DC;
import org.apache.dubbo.rpc.service.Destroyable;

public class proxy0 implements DC, Destroyable, EchoService, ProviderDubboService {
    public static Method[] methods;
    private InvocationHandler handler;

    public proxy0(InvocationHandler var1) {
        this.handler = var1;
    }

    public proxy0() {
    }
	
    //服务端的方法,最终那其实是通过调用 InvocationHandler 进行调用远程
    public UserDTO connect(String var1) {
        Object[] var2 = new Object[]{var1};
        Object var3 = this.handler.invoke(this, methods[1], var2);
        return (UserDTO)var3;
    }

    public Object $echo(Object var1) {
        Object[] var2 = new Object[]{var1};
        Object var3 = this.handler.invoke(this, methods[0], var2);
        return (Object)var3;
    }

    public void $destroy() {
        Object[] var1 = new Object[0];
        this.handler.invoke(this, methods[2], var1);
    }
}

2.1、InvokerInvocationHandler


package org.apache.dubbo.rpc.proxy;

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ConsumerModel;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
 * InvokerHandler
 */
public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
    private final Invoker<?> invoker;
    private ConsumerModel consumerModel;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
        // healerjean/com.healerjean.proj.service.ProviderDubboService:0.1
        String serviceKey = invoker.getUrl().getServiceKey();
        if (serviceKey != null) {
            this.consumerModel = ApplicationModel.getConsumerModel(serviceKey);
        }
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        //connect
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                invoker.destroy();
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
            }
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        }
        
        //RPC调用 对象,组装接口,方法名,参数
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
        
        //healerjean/com.healerjean.proj.service.ProviderDubboService:0.1
        String serviceKey = invoker.getUrl().getServiceKey();
        //讲服务提供者key放入RPC调用对象
        rpcInvocation.setTargetServiceUniqueName(serviceKey);
      
        if (consumerModel != null) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
            rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
        }

        //先试用MockClusterInvoker  执行调用逻辑
        return invoker.invoke(rpcInvocation).recreate();
    }
}

image-20201112154329527

2.3、MockClusterInvoker invoke#

   @Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
		
        //判断该方法是否是moke调用,value为false
        String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
            // 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法,
            // 比如 AbstractClusterInvoker 我这里是,但是并不重要
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
            }
            //force:direct mock 直接执行 mock 逻辑,不发起远程调用
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常
            try {
                result = this.invoker.invoke(invocation);

                //fix:#4585
                if(result.getException() != null && result.getException() instanceof RpcException){
                    RpcException rpcException= (RpcException)result.getException();
                    if(rpcException.isBiz()){
                        throw  rpcException;
                    }else { 
                        // 调用失败,执行 mock 逻辑
                        result = doMockInvoke(invocation, rpcException);
                    }
                }

            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                }

                if (logger.isWarnEnabled()) {
                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
                }
                result = doMockInvoke(invocation, e);
            }
        }
        return result;
    }

image-20201112154635147

2.4、AbstractInvoker


    @Override
    public Result invoke(Invocation inv) throws RpcException {
        // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
        if (destroyed.get()) {
            logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
        }
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        if (CollectionUtils.isNotEmptyMap(attachment)) {
            invocation.addObjectAttachmentsIfAbsent(attachment);
        }

        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
            invocation.addObjectAttachments(contextAttachments);
        }

        //设置调用模式 我这里是SYNC 也就是同步调用
        invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        AsyncRpcResult asyncResult;
        try {
             // 抽象方法,由子类实现
            asyncResult = (AsyncRpcResult) doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            Throwable te = e.getTargetException();
            if (te == null) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                }
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
            }
        } catch (RpcException e) {
            if (e.isBiz()) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                throw e;
            }
        } catch (Throwable e) {
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        }
        RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
        return asyncResult;
    }

image-20201112184324538

image-20201112184403050

2.5、DubboInvoker

  @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        // 设置 path (com.healerjean.proj.service.ProviderDubboService) 和 version (0.1) 到 attachment 中
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // isOneway 为 true,表示“单向”通信
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            //默认3000 3s超时
            int timeout = calculateTimeout(invocation, methodName);
             // 我的是false
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

image-20201112185628646

2.6、开始调用

org.apache.dubbo.rpc.protocol.dubbo.ReferenceCountExchangeClient

final class ReferenceCountExchangeClient implements ExchangeClient {

    private final URL url;
    private final AtomicInteger referenceCount = new AtomicInteger(0);

    private ExchangeClient client;

    public ReferenceCountExchangeClient(ExchangeClient client) {
        this.client = client;
        // 引用计数自增
        referenceCount.incrementAndGet();
        this.url = client.getUrl();
    }
 

@Override
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        return client.request(request, timeout, executor);
    }
    
    
    
}

ReferenceCountExchangeClient 内部定义了一个引用计数变量 referenceCount,每当该对象被引用一次 referenceCount 都会进行自增。每当 close 方法被调用时,referenceCount 进行自减。

ReferenceCountExchangeClient 内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象的相关方法。所以这里就不多说了,继续向下分析,这次是 HeaderExchangeClient。

image-20201112185851146

public class HeaderExchangeClient implements ExchangeClient {


    @Override
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        return channel.request(request, timeout, executor);
    }




}

即调用 HeaderExchangeChannel 对象的同签名方法。

HeaderExchangeClient 有什么用处呢?答案是封装了一些关于心跳检测的逻辑。心跳检测并非本文所关注的点,因此就不多说了,继续向下看。

final class HeaderExchangeChannel implements ExchangeChannel {

    private final Channel channel;

    HeaderExchangeChannel(Channel channel) {
        if (channel == null) {
            throw new IllegalArgumentException("channel == null");
        }

        // 这里的 channel 指向的是 NettyClient
        this.channel = channel;
    }

    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(..., "Failed to send request ...);
        }
        // 创建 Request 对象
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
                                        
        // 设置双向通信标志为 true
        req.setTwoWay(true);
                                        
        // 这里的 request 变量类型为 RpcInvocation
        req.setData(request);

        // 创建 DefaultFuture 对象
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            // 调用 NettyClient 的 send 方法发送请求
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        // 返回 DefaultFuture 对象
        return future;
    }
}

上面的方法首先定义了一个 Request 对象,然后再将该对象传给 NettyClient send 方法,进行后续的调用。需要说明的是,NettyClient 中并未实现 send 方法,该方法继承自父类 AbstractPeer,下面直接分析 AbstractPeer 的代码。

image-20201112190433811

public abstract class AbstractPeer implements Endpoint, ChannelHandler {

    @Override
    public void send(Object message) throws RemotingException {
        // 该方法由 AbstractClient 类实现
        send(message, url.getParameter(Constants.SENT_KEY, false));
    }

    // 省略其他方法
}

public abstract class AbstractClient extends AbstractEndpoint implements Client {

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {
            connect();
        }

        // 获取 Channel,getChannel 是一个抽象方法,具体由子类实现
        Channel channel = getChannel();
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send ...");
        }

        // 继续向下调用
        channel.send(message, sent);
    }

    protected abstract Channel getChannel();

    // 省略其他方法
}

Dubbo 使用 Netty 作为底层的通信框架,因此下面我们到 NettyClient 类中看一下 getChannel 方法的实现逻辑

public class NettyClient extends AbstractClient {

    // 这里的 Channel 全限定名称为 org.jboss.netty.channel.Channel
    private volatile Channel channel;

    @Override
    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isConnected())
            return null;
        // 获取一个 NettyChannel 类型对象
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }
}

final class NettyChannel extends AbstractChannel {

    private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = 
        new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();

    private final org.jboss.netty.channel.Channel channel;

    /** 私有构造方法 */
    private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
        super(url, handler);
        if (channel == null) {
            throw new IllegalArgumentException("netty channel == null;");
        }
        this.channel = channel;
    }

    static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;
        }

        // 尝试从集合中获取 NettyChannel 实例
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            // 如果 ret = null,则创建一个新的 NettyChannel 实例
            NettyChannel nc = new NettyChannel(ch, url, handler);
            if (ch.isConnected()) {
                // 将 <Channel, NettyChannel> 键值对存入 channelMap 集合中
                ret = channelMap.putIfAbsent(ch, nc);
            }
            if (ret == null) {
                ret = nc;
            }
        }
        return ret;
    }
}

获取到 NettyChannel 实例后,即可进行后续的调用。下面看一下 NettyChannelsend 方法。

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 发送消息(包含请求和响应消息)
        ChannelFuture future = channel.write(message);

        // sent 的值源于 <dubbo:method sent="true/false" /> 中 sent 的配置值,有两种配置值:
        //   1. true: 等待消息发出,消息发送失败将抛出异常
        //   2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回
        // 默认情况下 sent = false;
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 等待消息发出,若在规定时间没能发出,success 会被置为 false
            success = future.await(timeout);
        }
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message ...");
    }

    // 若 success 为 false,这里抛出异常
    if (!success) {
        throw new RemotingException(this, "Failed to send message ...");
    }
}
proxy0#sayHello(String)
  > InvokerInvocationHandler#invoke(Object, Method, Object[])
    > MockClusterInvoker#invoke(Invocation)
      > AbstractClusterInvoker#invoke(Invocation)
        > FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          > Filter#invoke(Invoker, Invocation)  // 包含多个 Filter 调用
            > ListenerInvokerWrapper#invoke(Invocation) 
              > AbstractInvoker#invoke(Invocation) 
                > DubboInvoker#doInvoke(Invocation)
                  > ReferenceCountExchangeClient#request(Object, int)
                    > HeaderExchangeClient#request(Object, int)
                      > HeaderExchangeChannel#request(Object, int)
                        > AbstractPeer#send(Object)
                          > AbstractClient#send(Object, boolean)
                            > NettyChannel#send(Object, boolean)
                              > NioClientSocketChannel#write(Object)

ContactAuthor