Dubbo源码阅读之服务导入
前言
Github:https://github.com/HealerJean
Dubbo 服务引用的时机有两个,
第一个是在 Spring
容器调用 ReferenceBean
的 afterPropertiesSet
方法时引用服务,属于饿汉式的,当我们消费者代码中要使用dubbo
服务提供者的时候的时候,使用注解 @DubboReference
,则会开启,因为这个时候spring
的set
代码执行完成了,此时需要注入的就是dubbo
的服务了 ,我下文即使以这种方式介绍的。
第二个是在 ReferenceBean
对应的服务被注入到其他类中时引用。属于懒汉式当我们的服务被注入到其他类中时,Spring
会第一时间调用 getObject
方法 并由该方法执行服务引用逻辑,默认情况下,Dubbo
使用懒汉式引用服务
下面我们按照 Dubbo
饿汉式进行分析,整个分析过程从 ReferenceBean
的 afterPropertiesSet
方法开始。
。按照惯例,在进行具体工作之前,需先进行配置检查与收集工作。接着根据收集到的信息决定服务用的方式,一共有三种
第一种是引用本地 (JVM) 服务
第二是通过直连方式引用远程服务
第三是通过注册中心引用远程服务
不管是哪种引用方式,最后都会得到一个 Invoke
r 实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker
实例
此时需要通过集群管理类 Cluster
将多个 Invoker
合并成一个实例。合并后的 Invoker
实例已经具备调用本地或远程服务的能力了,但并不能将此实例暴露给用户使用,这会对用户业务代码造成侵入。此时框架还需要通过代理工厂类 (ProxyFactory
) 为服务接口生成代理类,并让代理类去调用 Invoker
逻辑。避免了 Dubbo
框架代码对业务代码的侵入,同时也让框架更容易使用。
1、服务导入
1.1、入口 #afterPropertiesSet
public void afterPropertiesSet() throws Exception {
// 准备dubbo配置类的bean
prepareDubboConfigBeans();
// 模式是懒汉式初始化,这里舒适进来是null
if (init == null) {
init = false;
}
// 判断是否需要初始化,如果需要的话,执行懒汉式的逻辑。进行调用
if (shouldInit()) {
getObject();
}
}
private void prepareDubboConfigBeans() {
beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class);
beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class);
beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class);
beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class);
beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class);
beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class);
beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class);
beansOfTypeIncludingAncestors(applicationContext, ConfigCenterBean.class);
beansOfTypeIncludingAncestors(applicationContext, MetadataReportConfig.class);
beansOfTypeIncludingAncestors(applicationContext, MetricsConfig.class);
beansOfTypeIncludingAncestors(applicationContext, SslConfig.class);
}
@Override
public Object getObject() {
return get();
}
public synchronized T get() {
// 已经调用过销毁方法,则抛出异常
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
// 检测 ref 是否为空,为空则通过 init 方法创建(要用于处理配置,以及调用 createProxy 生成代理类)
if (ref == null) {
init();
}
return ref;
}
1.2、#init
初始化
public synchronized void init() {
//已经初始化则返回,避免重复操作
if (initialized) {
return;
}
//我到了这里其实是null,也就是说dubbo还没有启动。,主要是初始化一些东西
//主要是通过SPI方式获取各种配置管理实例,然后调用初始化方法,初始化配置、监听器等
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
// 检查每个配置模块是否正确创建,并在必要时重写其属性。
checkAndUpdateSubConfigs();
// 校验本地存根合法性
checkStubAndLocal(interfaceClass);
//校验mock配置合法性
ConfigValidationUtils.checkMock(interfaceClass, this);
// 添加配置<side,consumer>
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);
// 添加版本、时间戳等运行时参数
ReferenceConfigBase.appendRuntimeParameters(map);
//不是泛化服务(泛化就是服务消费者并没有服务的接口)
if (!ProtocolUtils.isGeneric(generic)) {
// 获取版本,校验存入map
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
//获取包装类所有方法名称,放入map中,如果没有则放入<methods,*>
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
}
}
// 接口放入map
map.put(INTERFACE_KEY, interfaceName);
// 通过getter或者getParameters方法将MetricsConfig、ApplicationConfig和ModuleConfig属性放到map中
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
// 通过getter或者getParameters方法将ConsumerConfig和ReferenceConfig属性放到map中
AbstractConfig.appendParameters(map, consumer);
AbstractConfig.appendParameters(map, this);
// 元数据报告配置不为空且合法,则设置<metadata,remote>到map中
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
//存放异步方法信息
Map<String, AsyncMethodInfo> attributes = null;
if (CollectionUtils.isNotEmpty(getMethods())) {
attributes = new HashMap<>();
//遍历所有的方法配置存入map中
for (MethodConfig methodConfig : getMethods()) {
AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
String retryKey = methodConfig.getName() + ".retry";
if (map.containsKey(retryKey)) {
//如果存在methodname.retry则移除
String retryValue = map.remove(retryKey);
//如果retry对应的是false,则设置retries值为0
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
//提取方法信息中的异步方法信息,存入map
AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
if (asyncMethodInfo != null) {
// consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
attributes.put(methodConfig.getName(), asyncMethodInfo);
}
}
}
//获取注册中心IP地址
String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
if (StringUtils.isEmpty(hostToRegistry)) {
//配置为空则取本机地址
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
//是无效的本地主机,则抛出异常
throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY
+ ", value:" + hostToRegistry);
}
map.put(REGISTER_IP_KEY, hostToRegistry);
//将上面的配置信息添加到元数据附属信息中
serviceMetadata.getAttachments().putAll(map);
//根据配置创建代理
ref = createProxy(map);
//服务元数据中添加目标对象配置
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
// 根据服务名,从缓存中获取 ConsumerModel,并将设置 ConsumerModel 代理对象为ref
ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
consumerModel.setProxyObject(ref);
//将异步方法信息存入ConsumerModel并初始化
consumerModel.init(attributes);
//设置初始化完成
initialized = true;
// 分发服务引入配置初始化事件 ReferenceConfigInitializedEvent since 2.7.4
dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}
1.3、引用服务创建#createProxy
初始化大概逻辑就是校验及获取配置,然后根据配置创建服务引用代理,然后分发服务引用初始化的事件,因此本节主要分析服务引用创建
org.apache.dubbo.config.ReferenceConfig.createProxy(Map<String, String>)
源码。
private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
private T createProxy(Map<String, String> map) {
/*
* 是否是本地服务引用 :如果未指定作用域,但目标服务是在同一个JVM中提供的,则希望进行本地调用(默认),我这里肯定是远程调用啦
*/
if (shouldJvmRefer(map)) {
// 生成本地引用 URL,协议为 injvm
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
// 调用 refer 方法构建 InjvmInvoker 实例
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
// 远程调用
urls.clear();
// url 不为空,表明用户可能想进行点对点调用
if (url != null && url.length() > 0) {
// 当需要配置多个 url 时,可用分号(;)进行分割,这里会进行切分
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
// url路径为空
if (StringUtils.isEmpty(url.getPath())) {
// 设置接口全限定名为 url 路径
url = url.setPath(interfaceName);
}
// 检测 url 协议是否为 registry
if (UrlUtils.isRegistry(url)) {
// 若是,表明用户想使用指定的注册中心,将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
// 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
// 最后将合并后的配置设置为 url 查询字符串中。
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
//我执行走是这个
} else {
// 如果协议不是injvm
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
// 校验注册中心合法性
checkRegistry();
// 加载注册中心地址
//registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-consumer&dubbo=2.0.2&pid=1504&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40890®istry=zookeeper&release=2.7.7×tamp=1605086409440
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
// 加载监控地址,不为空则放入map中
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
//registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-consumer&dubbo=2.0.2&pid=1504&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40890&refer=application%3Dhlj-server-consumer%26dubbo%3D2.0.2%26group%3Dhealerjean%26init%3Dfalse%26interface%3Dcom.healerjean.proj.service.ProviderDubboService%26methods%3Dconnect%26pid%3D1504%26qos.accept.foreign.ip%3Dfalse%26qos.enable%3Dtrue%26qos.port%3D40890%26register.ip%3D10.236.122.217%26release%3D2.7.7%26revision%3D0.1%26side%3Dconsumer%26sticky%3Dfalse%26timeout%3D3000%26timestamp%3D1605085675160%26version%3D0.1®istry=zookeeper&release=2.7.7×tamp=1605086409440
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
// 未配置注册中心,抛出异常
if (urls.isEmpty()) {
throw new IllegalStateException(
"No such any registry to reference " + interfaceName + " on the consumer "
+ NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()
+ ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
// 单个注册中心或服务提供者(服务直连,下同)
if (urls.size() == 1) {
// 调用 RegistryProtocol 的 refer 构建 Invoker 实例 (我的是执行这个)
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
// 多个注册中心或多个服务提供者,或者两者混合
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 遍历所有的注册中心或者服务提供者地址
for (URL url : urls) {
// 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时
// 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
// 如果是注册中心地址,则将url赋值给注册中心
registryURL = url;
}
}
if (registryURL != null) {
// 注册表url可用于多订阅方案,默认情况下使用“区域感知”策略(ZoneAwareCluster)
URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
// invoker包装层级如下:
// ZoneAwareClusterInvoker(StaticDirectory) ->
// FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // 没有注册中心地址则是直连,之间创建StaticDirectory
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
// 需要进行可用性检查 并且Invoker不可用的时候,抛出异常
if (shouldCheck() && !invoker.isAvailable()) {
// invoker 执行销毁逻辑,并抛出异常
invoker.destroy();
throw new IllegalStateException("Failed to check the status of the service " + interfaceName
+ ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName
+ (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer "
+ NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
String metadata = map.get(METADATA_KEY);
WritableMetadataService metadataService = WritableMetadataService
.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
if (metadataService != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataService.publishServiceDefinition(consumerURL);
}
// 通过代理工厂创建代理
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
protected boolean shouldJvmRefer(Map<String, String> map) {
//temp://localhost?application=hlj-server-consumer&dubbo=2.0.2&group=healerjean&init=false&interface=com.healerjean.proj.service.ProviderDubboService&methods=connect&pid=1504&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40890®ister.ip=10.236.122.217&release=2.7.7&revision=0.1&side=consumer&sticky=false&timeout=3000×tamp=1605085675160&version=0.1
URL tmpUrl = new URL("temp", "localhost", 0, map);
boolean isJvmRefer;
if (isInjvm() == null) {
// if a url is specified, don't do local reference
if (url != null && url.length() > 0) {
isJvmRefer = false;
} else {
// by default, reference local service if there is
isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
}
} else {
isJvmRefer = isInjvm();
}
return isJvmRefer;
}
1.4、创建 Invoker
通过上面
createProxy
的源码分析可知,创建代理的核心逻辑在调用org.apache.dubbo.rpc.Protocol.refer(Class, URL)
生成Invoker
及最后调用代理工厂生成代理,因此本节先分析创建Invoker
的源码。Invoker
是Dubbo
的核心模型,代表一个可执行体。在服务提供方,
Invoke
r 用于调用服务提供类。在服务消费方,Invoker
用于执行远程调用。Invoker
是由Protocol
实现类构建而来。Protocol
实现类有很多,首先看下org.apache.dubbo.registry.integration.RegistryProtocol.refer(Class, URL)
。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 取 registry 参数值,并将其设置为协议头
//zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-consumer&dubbo=2.0.2&pid=1504&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40890&refer=application%3Dhlj-server-consumer%26dubbo%3D2.0.2%26group%3Dhealerjean%26init%3Dfalse%26interface%3Dcom.healerjean.proj.service.ProviderDubboService%26methods%3Dconnect%26pid%3D1504%26qos.accept.foreign.ip%3Dfalse%26qos.enable%3Dtrue%26qos.port%3D40890%26register.ip%3D10.236.122.217%26release%3D2.7.7%26revision%3D0.1%26side%3Dconsumer%26sticky%3Dfalse%26timeout%3D3000%26timestamp%3D1605085675160%26version%3D0.1&release=2.7.7×tamp=1605086409440
url = getRegistryUrl(url);
// 获取注册中心实例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
//如果是RegistryService类型,则直接调用代理工厂返回的Invoker
return proxyFactory.getInvoker((T) registry, type, url);
}
//将url中的refer参数值转为map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
//如果指定了分组参数
if (group != null && group.length() > 0) {
//且包含两个以上分组包括任意分组
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
// 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 调用 doRefer 继续执行服务引用逻辑
return doRefer(cluster, registry, type, url);
}
private Cluster getMergeableCluster() {
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable");
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建 RegistryDirectory 实例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 设置注册中心和协议
directory.setRegistry(registry);
directory.setProtocol(protocol);
// 所有refer参数
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
// 生成服务消费者链接
//consumer://10.236.122.217/com.healerjean.proj.service.ProviderDubboService?application=hlj-server-consumer&dubbo=2.0.2&group=healerjean&init=false&interface=com.healerjean.proj.service.ProviderDubboService&methods=connect&pid=1504&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40890&release=2.7.7&revision=0.1&side=consumer&sticky=false&timeout=3000×tamp=1605085675160&version=0.1
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(),
parameters);
// 是否需要注册服务消费者,在 consumers 目录下新节点
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
registry.register(directory.getRegisteredConsumerUrl());
}
// 构建路由链
directory.buildRouterChain(subscribeUrl);
// 订阅 providers、configurators、routers 等节点数据
directory.subscribe(toSubscribeUrl(subscribeUrl));
// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个虚拟提供者
Invoker<T> invoker = cluster.join(directory);
//通过SPI获取RegistryProtocolListener实例
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
//没有监听器,直接返货Invoker
return invoker;
}
//创建包装类RegistryInvokerWrapper实例
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker,
subscribeUrl);
//执行调用监听通知方法
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
//返回invoker包装类
return registryInvokerWrapper;
}
1.5、通过代理工厂创建代理
上面分析了
RegistryProtocol
创建Invoker
的过程,有兴趣的也可以看看DubboProtocol
创建Invoke
r过程。接下来分析创建代理过程的源码,首先是org.apache.dubbo.rpc.proxy.AbstractProxyFactory.getProxy(Invoker, boolean)
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
private static final Class<?>[] INTERNAL_INTERFACES = new Class<?>[]{
EchoService.class, Destroyable.class
};
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Set<Class<?>> interfaces = new HashSet<>();
// 获取接口地址,参数名interfaces
String config = invoker.getUrl().getParameter(INTERFACES);
if (config != null && config.length() > 0) {
// 逗号切分接口列表
String[] types = COMMA_SPLIT_PATTERN.split(config);
for (String type : types) {
// 反射加载接口类
interfaces.add(ReflectUtils.forName(type));
}
}
//我的是false,所以不进入
if (generic) {// 是泛化服务
// 如果Invoker的接口不是GenericService的子接口,则像接口中添加一个子接口到接口集合中
if (!GenericService.class.isAssignableFrom(invoker.getInterface())) {
interfaces.add(com.alibaba.dubbo.rpc.service.GenericService.class);
}
try {
// 从url中找到真正的接口即interface参数的值,然后反射获取接口类存入接口集合中
String realInterface = invoker.getUrl().getParameter(Constants.INTERFACE);
interfaces.add(ReflectUtils.forName(realInterface));
} catch (Throwable e) {
// ignore
}
}
// 将Invoker的接口放入接口集合中
interfaces.add(invoker.getInterface());
// 将 EchoService.class, Destroyable.class放入接口集合中
interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES));
// 调用重载方法,又子类实现
return getProxy(invoker, interfaces.toArray(new Class<?>[0]));
}
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
//省略,是不是很熟悉
}
public static Proxy getProxy(Class<?>... ics) {
// 调用重载
return getProxy(ClassUtils.getClassLoader(Proxy.class), ics);
}
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
// 接口数量限制65535
if (ics.length > MAX_PROXY_COUNT) {
throw new IllegalArgumentException("interface limit exceeded");
}
StringBuilder sb = new StringBuilder();
// 遍历接口列表
for (int i = 0; i < ics.length; i++) {
String itf = ics[i].getName();
// 不是接口类型,则抛出异常
if (!ics[i].isInterface()) {
throw new RuntimeException(itf + " is not a interface.");
}
Class<?> tmp = null;
try {
// 反射重新加载接口类
tmp = Class.forName(itf, false, cl);
} catch (ClassNotFoundException e) {
}
// 检测接口是否相同,这里 tmp 有可能为空,不同则抛出异常
if (tmp != ics[i]) {
throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
}
// 拼接接口全限定名,分隔符为 ;
sb.append(itf).append(';');
}
// 使用拼接后的接口名作为 key
String key = sb.toString();
// 根据类加载器获取缓存
final Map<String, Object> cache;
synchronized (PROXY_CACHE_MAP) {
cache = PROXY_CACHE_MAP.computeIfAbsent(cl, k -> new HashMap<>());
}
Proxy proxy = null;
synchronized (cache) {
do {
// 从缓存中获取 Reference<Proxy> 实例,渠道且是Reference实例则强转返回
Object value = cache.get(key);
if (value instanceof Reference<?>) {
proxy = (Proxy) ((Reference<?>) value).get();
if (proxy != null) {
return proxy;
}
}
// 如果是挂起生成标记,则调用线程wait方法,保证只有一个线程操作
if (value == PENDING_GENERATION_MARKER) {
try {
// 其他线程在此处进行等待
cache.wait();
} catch (InterruptedException e) {
}
} else {
// 放置标志位到缓存中,并跳出 while 循环进行后续操作
cache.put(key, PENDING_GENERATION_MARKER);
break;
}
} while (true);
}
// 代理类计数器加1
long id = PROXY_CLASS_COUNTER.getAndIncrement();
String pkg = null;
ClassGenerator ccp = null, ccm = null;
try {
// 创建 ClassGenerator 对象
ccp = ClassGenerator.newInstance(cl);
Set<String> worked = new HashSet<>();
List<Method> methods = new ArrayList<>();
for (int i = 0; i < ics.length; i++) {
// 检测接口访问级别是否为 protected 或 privete
if (!Modifier.isPublic(ics[i].getModifiers())) {
// 获取接口包名
String npkg = ics[i].getPackage().getName();
if (pkg == null) {
pkg = npkg;
} else {
// 非 public 级别的接口必须在同一个包下,否者抛出异常
if (!pkg.equals(npkg)) {
throw new IllegalArgumentException("non-public interfaces from different packages");
}
}
}
// 添加接口到 ClassGenerator 中
ccp.addInterface(ics[i]);
// 遍历接口方法
for (Method method : ics[i].getMethods()) {
// 获取方法描述,可理解为方法签名
String desc = ReflectUtils.getDesc(method);
// 如果方法描述字符串已在 worked 中或者是静态方法,则忽略。
if (worked.contains(desc) || Modifier.isStatic(method.getModifiers())) {
continue;
}
// 如果是接口并且是静态方法,则忽略
if (ics[i].isInterface() && Modifier.isStatic(method.getModifiers())) {
continue;
}
// 将签名存入worked中
worked.add(desc);
// 方法个数
int ix = methods.size();
// 方法返回值类型
Class<?> rt = method.getReturnType();
// 方法参数类型
Class<?>[] pts = method.getParameterTypes();
// 生成 Object[] args = new Object[pts.length]
StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length)
.append("];");
for (int j = 0; j < pts.length; j++) {
// 生成 args[0] =($W)$1;
// 生成 args[1] =($W)$2;
// ......
code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
}
// 生成 InvokerHandler 接口的 invoker 方法调用语句,如下:
// Object ret = handler.invoke(this, methods[ix], args);
code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);");
if (!Void.TYPE.equals(rt)) {// 返回值类型不是void
// 生成返回语句,形如 return (java.lang.String) ret;
code.append(" return ").append(asArgument(rt, "ret")).append(";");
}
methods.add(method);
// 添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中
ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(),
code.toString());
}
}
// 包名为空,设置为Proxy的包名
if (pkg == null) {
pkg = PACKAGE_NAME;
}
// 构建接口代理类名称:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
String pcn = pkg + ".proxy" + id;
ccp.setClassName(pcn);
ccp.addField("public static java.lang.reflect.Method[] methods;");
// 生成 private java.lang.reflect.InvocationHandler handler;
ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
// 生成构造函数
// porxy0(java.lang.reflect.InvocationHandler arg0) {
// handler=$1;
// }
ccp.addConstructor(Modifier.PUBLIC, new Class<?>[] { InvocationHandler.class }, new Class<?>[0],
"handler=$1;");
// 添加无参构造
ccp.addDefaultConstructor();
// 生成接口代理类
Class<?> clazz = ccp.toClass();
clazz.getField("methods").set(null, methods.toArray(new Method[0]));
// 构建 Proxy 子类名称,比如 Proxy1 等
String fcn = Proxy.class.getName() + id;
// 创建cl类实例
ccm = ClassGenerator.newInstance(cl);
// 设置实例名称
ccm.setClassName(fcn);
// 添加无参构造
ccm.addDefaultConstructor();
// 设置父类
ccm.setSuperClass(Proxy.class);
// 生成获取实例方法
// public Object newInstance(java.lang.reflect.InvocationHandler h) {
// return new org.apache.dubbo.proxy0($1);
// }
ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn
+ "($1); }");
// 生成 Proxy 实现类
Class<?> pc = ccm.toClass();
// 通过反射创建 Proxy 实现类实例
proxy = (Proxy) pc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
// 释放类生成器资源 ClassGenerator
if (ccp != null) {
ccp.release();
}
if (ccm != null) {
ccm.release();
}
synchronized (cache) {
// 代理为空即生成失败,移除缓存
if (proxy == null) {
cache.remove(key);
} else {
// 生成代理成功,虚引用存入缓存
cache.put(key, new WeakReference<Proxy>(proxy));
}
// 通知等待线程
cache.notifyAll();
}
}
return proxy;
}
1.7、生成的代理类proxy0
package org.apache.dubbo.common.bytecode;
import com.alibaba.dubbo.rpc.service.EchoService;
import com.healerjean.proj.dto.UserDTO;
import com.healerjean.proj.service.ProviderDubboService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import org.apache.dubbo.common.bytecode.ClassGenerator.DC;
import org.apache.dubbo.rpc.service.Destroyable;
public class proxy0 implements DC, Destroyable, EchoService, ProviderDubboService {
public static Method[] methods;
private InvocationHandler handler;
public proxy0(InvocationHandler var1) {
this.handler = var1;
}
public proxy0() {
}
//服务端的方法,最终那其实是通过调用 InvocationHandler 进行调用远程
public UserDTO connect(String var1) {
Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[1], var2);
return (UserDTO)var3;
}
public Object $echo(Object var1) {
Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[0], var2);
return (Object)var3;
}
public void $destroy() {
Object[] var1 = new Object[0];
this.handler.invoke(this, methods[2], var1);
}
}
1.6、InvokerInvocationHandler
package org.apache.dubbo.rpc.proxy;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ConsumerModel;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
/**
* InvokerHandler
*/
public class InvokerInvocationHandler implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
private final Invoker<?> invoker;
private ConsumerModel consumerModel;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
// healerjean/com.healerjean.proj.service.ProviderDubboService:0.1
String serviceKey = invoker.getUrl().getServiceKey();
if (serviceKey != null) {
this.consumerModel = ApplicationModel.getConsumerModel(serviceKey);
}
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
if (consumerModel != null) {
rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}
return invoker.invoke(rpcInvocation).recreate();
}
}
2、服务调用
会发现是我们代理类
2.1、代理类
package org.apache.dubbo.common.bytecode;
import com.alibaba.dubbo.rpc.service.EchoService;
import com.healerjean.proj.dto.UserDTO;
import com.healerjean.proj.service.ProviderDubboService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import org.apache.dubbo.common.bytecode.ClassGenerator.DC;
import org.apache.dubbo.rpc.service.Destroyable;
public class proxy0 implements DC, Destroyable, EchoService, ProviderDubboService {
public static Method[] methods;
private InvocationHandler handler;
public proxy0(InvocationHandler var1) {
this.handler = var1;
}
public proxy0() {
}
//服务端的方法,最终那其实是通过调用 InvocationHandler 进行调用远程
public UserDTO connect(String var1) {
Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[1], var2);
return (UserDTO)var3;
}
public Object $echo(Object var1) {
Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[0], var2);
return (Object)var3;
}
public void $destroy() {
Object[] var1 = new Object[0];
this.handler.invoke(this, methods[2], var1);
}
}
2.1、InvokerInvocationHandler
package org.apache.dubbo.rpc.proxy;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ConsumerModel;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
/**
* InvokerHandler
*/
public class InvokerInvocationHandler implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
private final Invoker<?> invoker;
private ConsumerModel consumerModel;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
// healerjean/com.healerjean.proj.service.ProviderDubboService:0.1
String serviceKey = invoker.getUrl().getServiceKey();
if (serviceKey != null) {
this.consumerModel = ApplicationModel.getConsumerModel(serviceKey);
}
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
//connect
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
//RPC调用 对象,组装接口,方法名,参数
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
//healerjean/com.healerjean.proj.service.ProviderDubboService:0.1
String serviceKey = invoker.getUrl().getServiceKey();
//讲服务提供者key放入RPC调用对象
rpcInvocation.setTargetServiceUniqueName(serviceKey);
if (consumerModel != null) {
rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}
//先试用MockClusterInvoker 执行调用逻辑
return invoker.invoke(rpcInvocation).recreate();
}
}
2.3、MockClusterInvoker invoke#
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
//判断该方法是否是moke调用,value为false
String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
// 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法,
// 比如 AbstractClusterInvoker 我这里是,但是并不重要
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
}
//force:direct mock 直接执行 mock 逻辑,不发起远程调用
result = doMockInvoke(invocation, null);
} else {
//fail-mock 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常
try {
result = this.invoker.invoke(invocation);
//fix:#4585
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
if(rpcException.isBiz()){
throw rpcException;
}else {
// 调用失败,执行 mock 逻辑
result = doMockInvoke(invocation, rpcException);
}
}
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}
2.4、AbstractInvoker
@Override
public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
if (destroyed.get()) {
logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (CollectionUtils.isNotEmptyMap(attachment)) {
invocation.addObjectAttachmentsIfAbsent(attachment);
}
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
invocation.addObjectAttachments(contextAttachments);
}
//设置调用模式 我这里是SYNC 也就是同步调用
invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
AsyncRpcResult asyncResult;
try {
// 抽象方法,由子类实现
asyncResult = (AsyncRpcResult) doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
}
} catch (RpcException e) {
if (e.isBiz()) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
throw e;
}
} catch (Throwable e) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
return asyncResult;
}
2.5、DubboInvoker
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
// 设置 path (com.healerjean.proj.service.ProviderDubboService) 和 version (0.1) 到 attachment 中
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// isOneway 为 true,表示“单向”通信
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
//默认3000 3s超时
int timeout = calculateTimeout(invocation, methodName);
// 我的是false
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
2.6、开始调用
org.apache.dubbo.rpc.protocol.dubbo.ReferenceCountExchangeClient
final class ReferenceCountExchangeClient implements ExchangeClient {
private final URL url;
private final AtomicInteger referenceCount = new AtomicInteger(0);
private ExchangeClient client;
public ReferenceCountExchangeClient(ExchangeClient client) {
this.client = client;
// 引用计数自增
referenceCount.incrementAndGet();
this.url = client.getUrl();
}
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
return client.request(request, timeout, executor);
}
}
ReferenceCountExchangeClient
内部定义了一个引用计数变量 referenceCount
,每当该对象被引用一次 referenceCoun
t 都会进行自增。每当 close
方法被调用时,referenceCoun
t 进行自减。
ReferenceCountExchangeClient
内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象的相关方法。所以这里就不多说了,继续向下分析,这次是 HeaderExchangeClient。
public class HeaderExchangeClient implements ExchangeClient {
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
return channel.request(request, timeout, executor);
}
}
即调用 HeaderExchangeChannel
对象的同签名方法。
那 HeaderExchangeClient
有什么用处呢?答案是封装了一些关于心跳检测的逻辑。心跳检测并非本文所关注的点,因此就不多说了,继续向下看。
final class HeaderExchangeChannel implements ExchangeChannel {
private final Channel channel;
HeaderExchangeChannel(Channel channel) {
if (channel == null) {
throw new IllegalArgumentException("channel == null");
}
// 这里的 channel 指向的是 NettyClient
this.channel = channel;
}
@Override
public ResponseFuture request(Object request) throws RemotingException {
return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
}
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(..., "Failed to send request ...);
}
// 创建 Request 对象
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
// 设置双向通信标志为 true
req.setTwoWay(true);
// 这里的 request 变量类型为 RpcInvocation
req.setData(request);
// 创建 DefaultFuture 对象
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// 调用 NettyClient 的 send 方法发送请求
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
// 返回 DefaultFuture 对象
return future;
}
}
上面的方法首先定义了一个 Request
对象,然后再将该对象传给 NettyClient
的 send
方法,进行后续的调用。需要说明的是,NettyClient
中并未实现 send
方法,该方法继承自父类 AbstractPeer
,下面直接分析 AbstractPeer
的代码。
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
@Override
public void send(Object message) throws RemotingException {
// 该方法由 AbstractClient 类实现
send(message, url.getParameter(Constants.SENT_KEY, false));
}
// 省略其他方法
}
public abstract class AbstractClient extends AbstractEndpoint implements Client {
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()) {
connect();
}
// 获取 Channel,getChannel 是一个抽象方法,具体由子类实现
Channel channel = getChannel();
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send ...");
}
// 继续向下调用
channel.send(message, sent);
}
protected abstract Channel getChannel();
// 省略其他方法
}
Dubbo
使用 Netty
作为底层的通信框架,因此下面我们到 NettyClient
类中看一下 getChannel
方法的实现逻辑
public class NettyClient extends AbstractClient {
// 这里的 Channel 全限定名称为 org.jboss.netty.channel.Channel
private volatile Channel channel;
@Override
protected com.alibaba.dubbo.remoting.Channel getChannel() {
Channel c = channel;
if (c == null || !c.isConnected())
return null;
// 获取一个 NettyChannel 类型对象
return NettyChannel.getOrAddChannel(c, getUrl(), this);
}
}
final class NettyChannel extends AbstractChannel {
private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap =
new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();
private final org.jboss.netty.channel.Channel channel;
/** 私有构造方法 */
private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
super(url, handler);
if (channel == null) {
throw new IllegalArgumentException("netty channel == null;");
}
this.channel = channel;
}
static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
if (ch == null) {
return null;
}
// 尝试从集合中获取 NettyChannel 实例
NettyChannel ret = channelMap.get(ch);
if (ret == null) {
// 如果 ret = null,则创建一个新的 NettyChannel 实例
NettyChannel nc = new NettyChannel(ch, url, handler);
if (ch.isConnected()) {
// 将 <Channel, NettyChannel> 键值对存入 channelMap 集合中
ret = channelMap.putIfAbsent(ch, nc);
}
if (ret == null) {
ret = nc;
}
}
return ret;
}
}
获取到 NettyChannel
实例后,即可进行后续的调用。下面看一下 NettyChannel
的 send
方法。
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 发送消息(包含请求和响应消息)
ChannelFuture future = channel.write(message);
// sent 的值源于 <dubbo:method sent="true/false" /> 中 sent 的配置值,有两种配置值:
// 1. true: 等待消息发出,消息发送失败将抛出异常
// 2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回
// 默认情况下 sent = false;
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 等待消息发出,若在规定时间没能发出,success 会被置为 false
success = future.await(timeout);
}
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message ...");
}
// 若 success 为 false,这里抛出异常
if (!success) {
throw new RemotingException(this, "Failed to send message ...");
}
}
proxy0#sayHello(String)
—> InvokerInvocationHandler#invoke(Object, Method, Object[])
—> MockClusterInvoker#invoke(Invocation)
—> AbstractClusterInvoker#invoke(Invocation)
—> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
—> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用
—> ListenerInvokerWrapper#invoke(Invocation)
—> AbstractInvoker#invoke(Invocation)
—> DubboInvoker#doInvoke(Invocation)
—> ReferenceCountExchangeClient#request(Object, int)
—> HeaderExchangeClient#request(Object, int)
—> HeaderExchangeChannel#request(Object, int)
—> AbstractPeer#send(Object)
—> AbstractClient#send(Object, boolean)
—> NettyChannel#send(Object, boolean)
—> NioClientSocketChannel#write(Object)