Dubbo源码阅读之服务导出
前言
Github:https://github.com/HealerJean
整体逻辑
服务导出过程始于
Spring
容器发布刷新事件之后,Dubbo
在接收到事件后,会立即执行服务导出逻辑。整个逻辑大致可分为三个部分服务导出的入口类是
ServiceBean
,入口方法onApplicationEvent
。
1、第一部分是前置工作,主要用于检查参数,组装 URL。
2、第二部分是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程。
3、第三部分是向注册中心注册服务,用于服务发现。
1、服务导出入口
DubboBootstrapApplicationListener
该类继承了OneTimeExecutionApplicationContextEventListener
抽象类,OneTimeExecutionApplicationContextEventListener
又实现了ApplicationListener
接口onApplicationEvent
方法,该方法会在收到Spring
上下文刷新事件后执行。两个类的关键源码如下:
abstract class OneTimeExecutionApplicationContextEventListener implements ApplicationListener, ApplicationContextAware {
private ApplicationContext applicationContext;
public final void onApplicationEvent(ApplicationEvent event) {
//判断事件源是持有的ApplicationContext并且是应用上下文事件
if (isOriginalEventSource(event) && event instanceof ApplicationContextEvent) {
//执行方法
onApplicationContextEvent((ApplicationContextEvent) event);
}
}
//抽象方法交给子类执行
protected abstract void onApplicationContextEvent(ApplicationContextEvent event);
private boolean isOriginalEventSource(ApplicationEvent event) {
return (applicationContext == null) || Objects.equals(applicationContext, event.getSource());
}
@Override
public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public ApplicationContext getApplicationContext() {
return applicationContext;
}
}
在DubboBootstrapApplicationListener
的构造函数中会先去获取DubboBootstrap
的实例,在监听到ContextRefreshedEvent
事件时触发DubboBootstrap
的start
方法,
public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
implements Ordered {
/**
* The bean name of {@link DubboBootstrapApplicationListener}
*/
public static final String BEAN_NAME = "dubboBootstrapApplicationListener";
private final DubboBootstrap dubboBootstrap;
//实例化该监听器的的时候会先实例化dubboBootstrap 对象。
public DubboBootstrapApplicationListener() {
this.dubboBootstrap = DubboBootstrap.getInstance();
}
@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
//是上下文刷新事件则调用DubboBootstrap的start方法
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
//如果是上下文关闭事件则调用DubboBootstrap的stop方法
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
//dubbo开始 上下文刷新时间
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
dubboBootstrap.start();
}
private void onContextClosedEvent(ContextClosedEvent event) {
dubboBootstrap.stop();
}
@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
}
}
接着我们看看DubboBootstrap
的实例化及开始方法的源码。
public class DubboBootstrap extends GenericEventListener {
private static DubboBootstrap instance;
private final ConfigManager configManager;
private final Environment environment;
//加锁构造单例
public static synchronized DubboBootstrap getInstance() {
if (instance == null) {
instance = new DubboBootstrap();
}
return instance;
}
//私有化构造函数,保证只能被自己实例化
private DubboBootstrap() {
//通过SPI方式获取环境配置和环境管理实例,继承关系如下图
configManager = ApplicationModel.getConfigManager();
environment = ApplicationModel.getEnvironment();
//注册shutdown事件,回调DubboBootstrap的destroy方法,销毁所有导出及引用的服务等
DubboShutdownHook.getDubboShutdownHook().register();
ShutdownHookCallbacks.INSTANCE.addCallback(new ShutdownHookCallback() {
@Override
public void callback() throws Throwable {
DubboBootstrap.this.destroy();
}
});
}
public class ApplicationModel {
private static final ExtensionLoader<FrameworkExt> LOADER = ExtensionLoader.getExtensionLoader(FrameworkExt.class);
//通过SPI方式获取环境配置和环境管理实例
public static Environment getEnvironment() {
return (Environment) LOADER.getExtension(Environment.NAME);
}
public static ConfigManager getConfigManager() {
return (ConfigManager) LOADER.getExtension(ConfigManager.NAME);
}
}
2、DubboBootstrap
#start
public DubboBootstrap start() {
//原子操作,保证只启动一次
if (started.compareAndSet(false, true)) {
ready.set(false);
//1、初始化操作,主要是一些配置
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
// 2、导出dubbo服务(最终调用ServiceConfig的export方法)
exportServices();
// 当不只是注册服务提供者或者已经导出元数据的情况下执行
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 导出元数据服务(最终调用ServiceConfig实例,然后调用export方法)
exportMetadataService();
// 如果有则需要注册本地服务实例,通过SPI方式获取服务发现注册中心,然后调用他们的注册方法(默认自适应拓展实现是zookeeper)
registerServiceInstance();
}
// 3、服务导入
referServices();
if (asyncExportingFutures.size() > 0) {
new Thread(() -> {
try {
this.awaitFinish();
} catch (Exception e) {
logger.warn(NAME + " exportAsync occurred an exception.");
}
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}).start();
} else {
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}
if (logger.isInfoEnabled()) {
logger.info(NAME + " has started.");
}
}
return this;
}
2.1、DubboBootstrapd
#start
内部
通过
Start
方法源码可以看到,里面一次执行了服务初始化以服务导出和引入的方法,接下来我们追踪一下内部方法的具体实现。
private void initialize() {
// 原子操作确保只初始化一次
if (!initialized.compareAndSet(false, true)) {
return;
}
//初始化Dubbo组件的生命周期,这里主要是对环境配置初始化
ApplicationModel.iniFrameworkExts();
//开始构建配置中心
startConfigCenter();
//如果是zookeeper作为注册中心且没有指定配置中心时,使用注册中心做配置中心
useRegistryAsConfigCenterIfNecessary();
//加载远程的配置
loadRemoteConfigs();
//全局配置校验(应用、元数据、提供者、消费者、监控等)
checkGlobalConfigs();
//初始化元数据服务
initMetadataService();
//初始化事件监听器(将当前实例添加到事件监听器中)
initEventListener();
if (logger.isInfoEnabled()) {
logger.info(NAME + " has been initialized!");
}
}
2.1.1、initFrameworkExts
初始化Dubbo组件的生命周期,这里主要是对环境配置初始化
public static void initFrameworkExts() {
//这里面通过上面其实就可以看出来就是 一些环境的类,进行初始化
Set<FrameworkExt> exts = ExtensionLoader.getExtensionLoader(FrameworkExt.class).getSupportedExtensionInstances();
for (FrameworkExt ext : exts) {
ext.initialize();
}
}
2.1.2、startConfigCenter
开始构建配置中心,一般情况下下面的不会执行关键代码,因为我这里没有配置中心哟
private void startConfigCenter() {
Collection<ConfigCenterConfig> configCenters = configManager.getConfigCenters();
// check Config Center
if (CollectionUtils.isEmpty(configCenters)) {
ConfigCenterConfig configCenterConfig = new ConfigCenterConfig();
configCenterConfig.refresh();
if (configCenterConfig.isValid()) {
configManager.addConfigCenter(configCenterConfig);
configCenters = configManager.getConfigCenters();
}
} else {
for (ConfigCenterConfig configCenterConfig : configCenters) {
configCenterConfig.refresh();
ConfigValidationUtils.validateConfigCenterConfig(configCenterConfig);
}
}
if (CollectionUtils.isNotEmpty(configCenters)) {
CompositeDynamicConfiguration compositeDynamicConfiguration = new CompositeDynamicConfiguration();
for (ConfigCenterConfig configCenter : configCenters) {
compositeDynamicConfiguration.addConfiguration(prepareEnvironment(configCenter));
}
environment.setDynamicConfiguration(compositeDynamicConfiguration);
}
configManager.refreshAll();
}
2.1.3、useRegistryAsConfigCenterIfNecessary
如果是zookeeper作为注册中心且没有指定配置中心时,使用注册中心做配置中心
private void useRegistryAsConfigCenterIfNecessary() {
// we use the loading status of DynamicConfiguration to decide whether ConfigCenter has been initiated.
if (environment.getDynamicConfiguration().isPresent()) {
return;
}
if (CollectionUtils.isNotEmpty(configManager.getConfigCenters())) {
return;
}
//从环境中获取注册中心,我们会发现我这里其实只有一个,如果有多个的话则要进行过滤,筛选出符合条件的
configManager.getDefaultRegistries().stream()
.filter(registryConfig -> registryConfig.getUseAsConfigCenter() == null || registryConfig.getUseAsConfigCenter())
.forEach(registryConfig -> {
String protocol = registryConfig.getProtocol();
String id = "config-center-" + protocol + "-" + registryConfig.getPort();
ConfigCenterConfig cc = new ConfigCenterConfig();
cc.setId(id);
if (cc.getParameters() == null) {
cc.setParameters(new HashMap<>());
}
if (registryConfig.getParameters() != null) {
cc.getParameters().putAll(registryConfig.getParameters());
}
cc.getParameters().put(CLIENT_KEY, registryConfig.getClient());
cc.setProtocol(registryConfig.getProtocol());
cc.setPort(registryConfig.getPort());
cc.setAddress(registryConfig.getAddress());
cc.setNamespace(registryConfig.getGroup());
cc.setUsername(registryConfig.getUsername());
cc.setPassword(registryConfig.getPassword());
if (registryConfig.getTimeout() != null) {
cc.setTimeout(registryConfig.getTimeout().longValue());
}
cc.setHighestPriority(false);
configManager.addConfigCenter(cc);
});
startConfigCenter();
}
2.1.4、loadRemoteConfigs
加载远程的配置,
因为没有什么远程的配置,所以下面的代码基本上不会执行,各种List都是空的
private void loadRemoteConfigs() {
// registry ids to registry configs
List<RegistryConfig> tmpRegistries = new ArrayList<>();
Set<String> registryIds = configManager.getRegistryIds();
registryIds.forEach(id -> {
if (tmpRegistries.stream().noneMatch(reg -> reg.getId().equals(id))) {
tmpRegistries.add(configManager.getRegistry(id).orElseGet(() -> {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setId(id);
registryConfig.refresh();
return registryConfig;
}));
}
});
configManager.addRegistries(tmpRegistries);
// protocol ids to protocol configs
List<ProtocolConfig> tmpProtocols = new ArrayList<>();
Set<String> protocolIds = configManager.getProtocolIds();
protocolIds.forEach(id -> {
if (tmpProtocols.stream().noneMatch(prot -> prot.getId().equals(id))) {
tmpProtocols.add(configManager.getProtocol(id).orElseGet(() -> {
ProtocolConfig protocolConfig = new ProtocolConfig();
protocolConfig.setId(id);
protocolConfig.refresh();
return protocolConfig;
}));
}
});
configManager.addProtocols(tmpProtocols);
}
2.2、ServiceConfig
#export
上面DubboBootStrap的start方法初始化完成之后,开始导出服务执行方法exportServices
private void exportServices() {
//这个方法是获取我们所有的service服务类, 看下面的图片就明白了
configManager.getServices().forEach(sc -> {
//设置ServiceConfig的启动器为 当前 bootstrap
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
if (exportAsync) {
//异步导出,将导出任务提交到线程池异步完成
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
sc.export();
});
asyncExportingFutures.add(future);
//我这里是同步导出
} else {
//同步导出 则直接调用ServiceConfig的导出方法
sc.export();
exportedServices.add(sc);
}
});
}
导出服务方法最终都会调用ServiceConfig的export方法进行导出,接下来将进入这个方法源码分析。
public synchronized void export() {
//判断当前服务是否需要导出
if (!shouldExport()) {
return;
}
//启动类为空则获取一个实例(按照上面的流程下来其实已经执行了)
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
//校验并更细配置(默认配置、协议配置等)
checkAndUpdateSubConfigs();
//初始化元数据(设置版本、分组、类型及名称等属性,接口的一些属性)
serviceMetadata.setVersion(version);
serviceMetadata.setGroup(group);
serviceMetadata.setDefaultGroup(group);
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
//是否需要延迟导出
if (shouldDelay()) {
//提交导出任务到延迟导出调度器(可调度线程池)
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
//执行导出操作,我们执行这里
doExport();
}
//执行已导出操作
exported();
}
//判断是否要导出,如果已经导出了那么久不要导出了,如果没有导出的时候就返回true执行导出
public boolean shouldExport() {
Boolean export = getExport();
// default value is true
return export == null ? true : export;
}
//判断是否导出,如果已经有服务提供者了,就说明已经导出了
//有时候我们只是想本地启动服务进行一些调试工作,我们并不希望把本地启动的服务暴露出去给别人调用。此时,我们可通过配置 export 禁止服务导出
protected ProviderConfig provider;
@Override
public Boolean getExport() {
return (export == null && provider != null) ? provider.getExport() : export;
}
@Deprecated
public void init() {
initialize();
}
public void exported() {
// 发布一个服务导出事件( ServiceConfigExportedEvent)
dispatch(new ServiceConfigExportedEvent(this));
}
2.2.1、doExport
protected synchronized void doExport() {
//是否执行了unexport方法(表示服务注销了),执行了则抛出异常
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
//已经导出过则直接返回
if (exported) {
return;
}
//设置已导出标志
exported = true;
//路径及服务名为空
if (StringUtils.isEmpty(path)) {
//服务名为接口名
path = interfaceName;
}
//Dubbo 允许我们使用不同的协议导出服务,也允许我们向多个注册中心注册服务。Dubbo 在 doExportUrls 方法中对多协议,多注册中心进行了支持
//执行多协议多注册导出
doExportUrls();
}
2.2.1.1、 doExportUrls
多协议多注册中心导出
根据上一步中的导出逻辑可知,最后调用了
doExportUrls
方法,该方法对dubbo
多协议,多注册中心进行了支持,源码如分析如下:
private void doExportUrls() {
//获取服务缓存库
ServiceRepository repository = ApplicationModel.getServiceRepository();
//注册当前服务到本地缓存库
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
//注册服务提供者到缓存库
repository.registerProvider(
//根据接口名,服务组及版本号生成唯一名称 (163/com.healerjean.proj.service.MailClientService:0.0.1)
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
//加载注册中心地址(支持多注册中心,因此是集合)
//registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-provider&dubbo=2.0.2&file=/Users/healerjean/Desktop/logs/hljServerProvider2001&pid=3931&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880®istry=zookeeper&release=2.7.7×tamp=1604404226937
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
//支持多协议导出,遍历每个协议,执行这个协议所对应的注册中心的导出逻辑
for (ProtocolConfig protocolConfig : protocols) {
//根据协议配置生成服务地址(注册中心的key) 163/com.healerjean.proj.service.MailClientService:0.0.1
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// 如果指定了服务路径,需要再次注册到缓存中,保证此映射路径能获取到服务
repository.registerService(pathKey, interfaceClass);
// 设置原数据服务key
serviceMetadata.setServiceKey(pathKey);
//执行多注册中心单协议的导出逻辑
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
支持多协议导出,我这里是Dubb协议
2.2.1.2、loadRegistries
多协议导出首次调用
ConfigValidationUtils#loadRegistries
方法加载多注册中心,然后遍历每个协议执行该协议的导出逻辑。
1、检测是否存在注册中心配置类,不存在则抛出异常
2、构建参数映射集合,也就是 map
3、构建注册中心链接列表
5、遍历链接列表,并根据条件决定是否将其添加到 registryList 中
public static List<URL> loadRegistries(AbstractInterfaceConfig interfaceConfig, boolean provider) {
// check && override if necessary
List<URL> registryList = new ArrayList<URL>();
// 获取服务应用配置
ApplicationConfig application = interfaceConfig.getApplication();
// 获取服务注册配置
List<RegistryConfig> registries = interfaceConfig.getRegistries();
if (CollectionUtils.isNotEmpty(registries)) {// 注册配置不为空
for (RegistryConfig config : registries) {// 遍历注册配置
// 获取注册地址 zookeeper://127.0.0.1:2181
String address = config.getAddress();
if (StringUtils.isEmpty(address)) {
// 如果为设置默认值0.0.0.0
address = ANYHOST_VALUE;
}
// 如果地址不是N/A
if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
// 通过getter或者getParameters方法将ApplicationConfig和RegistryConfig属性放到map中
AbstractConfig.appendParameters(map, application);
AbstractConfig.appendParameters(map, config);
// path = org.apache.dubbo.registry.RegistryService
map.put(PATH_KEY, RegistryService.class.getName());
// 添加dubbo、版本、时间戳等参数到map中
AbstractInterfaceConfig.appendRuntimeParameters(map);
// 参数中是否包含协议protocol,如果没有设置为dubbo
if (!map.containsKey(PROTOCOL_KEY)) {
map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
}
// 根据参数和地址解析URL
//zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-provider&dubbo=2.0.2&file=/Users/healerjean/Desktop/logs/hljServerProvider2001&pid=17356&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7×tamp=1604655340823
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
// 根据添加registry参数组装注册中心地址
url = URLBuilder.from(url)
.addParameter(REGISTRY_KEY, url.getProtocol())//添加参数 registry=zookeeper
.setProtocol(extractRegistryType(url)).build(); //将头部协议修改为registry
//设置完成后 registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-provider&dubbo=2.0.2&file=/Users/healerjean/Desktop/logs/hljServerProvider2001&pid=17368&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880®istry=zookeeper&release=2.7.7×tamp=1604656403726
// 通过判断条件,决定是否添加 url 到 registryList 中,条件如下:
// (服务提供者 && register = true 或 null) || (非服务提供者 && subscribe = true 或 null)
if ((provider && url.getParameter(REGISTER_KEY, true))
|| (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
public static void appendParameters(Map<String, String> parameters, Object config) {
appendParameters(parameters, config, null);
}
2.2.1.3、doExportUrlsFor1Protocol
执行多注册中心单协议的导出逻辑
有多协议多注册中心源码分析可知,最终调用了单协议多注册中心导出方法,主要逻辑就是导出前先获取了各种配置缓存起来,然后获取创建包装实例、获取主机和端口,最后执行导出。
导出根据导出范围分为三个分支: scope = none
,不导出服务;scope != remote
,导出到本地;scope != local
,导出到远程。我下面的scope是null,所以要导出到本地和远程
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
// 协议名称为空,设置为dubbo,我这个配置了,进来就是dubbo
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
// 添加配置<side,provider>
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
// 添加dubbo、版本、时间戳等运行参数到map中
ServiceConfig.appendRuntimeParameters(map);
// 通过getter或者getParameters方法将ApplicationConfig、ModuleConfig、MetricsConfig、
// ProviderConfig、ProtocolConfig及ServiceConfig属性放到map中
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, provider);
AbstractConfig.appendParameters(map, protocolConfig);
AbstractConfig.appendParameters(map, this);
// 获取原数导出配置,如果合法则添加<metadata,remote>到map中
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
//是否存在 <dubbo:method> 标签的配置信息
if (CollectionUtils.isNotEmpty(getMethods())) {
for (MethodConfig method : getMethods()) {
// 通过getter或者getParameters方法将MethodConfig属性添加到map中,前缀是当前MethodConfig名称即<方法名.属性名,属性值>。
AbstractConfig.appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
//是否存在methodname.retry键,及是否配置了retry属性
if (map.containsKey(retryKey)) {
//存在则移除
String retryValue = map.remove(retryKey);
//如果retry配置的false,设置retries配置为0
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
//获取方法参数配置ArgumentConfig,存放到map中
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
// 判断参数类型是否为空
if (argument.getType() != null && argument.getType().length() > 0) {
//获取接口(导出服务)的所有方法,遍历
Method[] methods = interfaceClass.getMethods();
if (methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// 接口方法名和方法配置名称相同
if (methodName.equals(method.getName())) {
//获取接口方法参数类型
Class<?>[] argtypes = methods[i].getParameterTypes();
// 参数索引不是-1,-1表示未设置
if (argument.getIndex() != -1) {
// 检测 ArgumentConfig 中的 type 属性与方法参数列表中的参数名称是否一致
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
//一致的则添加配置到map<方法名.参数索引,参数配置>
AbstractConfig.appendParameters(map, argument,
method.getName() + "." + argument.getIndex());
} else {//不一致则抛出异常
throw new IllegalArgumentException(
"Argument config error : the index attribute and type attribute not match :index :"
+ argument.getIndex() + ", type:" + argument.getType());
}
} else {//未设置参数索引
// 遍历方法参数的所有类型
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
//查找和当前参数配置类型匹配的参数
if (argclazz.getName().equals(argument.getType())) {
//匹配则将配置放入map中<方法名.参数索引,参数配置>
AbstractConfig.appendParameters(map, argument,
method.getName() + "." + j);
//如果匹配到的参数类型设置了索引并且和当前索引不一致,抛出异常
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException(
"Argument config error : the index attribute and type attribute not match :index :"
+ argument.getIndex() + ", type:"
+ argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
//参数类型为空但是参数索引不是-1,添加配置到map中
AbstractConfig.appendParameters(map, argument,
method.getName() + "." + argument.getIndex());
} else {
//如果既没有配置参数索引又没哟配置参数类型则抛出异常
throw new IllegalArgumentException(
"Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
}
}
if (ProtocolUtils.isGeneric(generic)) {
//是泛化服务则设置<generic,generic值>、<methods,*>表示任意方法
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
//不是泛化服务,获取修订版本号,放入map
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
//创建并返回包装类方法名称
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
//没有包装方法名则设置<methods,*>
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
//如果有,则方面通过逗号分隔拼接放入map中,key=methods
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
//没有token且提供者不为空,获取提供者的token
if (ConfigUtils.isEmpty(token) && provider != null) {
token = provider.getToken();
}
//如果token不为空
if (!ConfigUtils.isEmpty(token)) {
//如果token是默认值(true或者default),则创建UUID作为token
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
// init serviceMetadata attachments
serviceMetadata.getAttachments().putAll(map);
//开始服务导出,上面一堆东西,无非就是构建mao,然后将所有的参数放到map中,然后下面将map制作为dubbo协议的url
// 127.0.0.1
//找到配置的主机,优先级如下:environment variables -> java system properties -> host property in config file
// -> /etc/hosts -> default network address -> first available network address
String host = findConfigedHosts(protocolConfig, registryURLs, map);
// 20880
// 找到配置的端口,优先级: environment variable -> java system properties ->
// port property in protocol config file -> protocol default port
Integer port = findConfigedPorts(protocolConfig, name, map);
//拼接URL :dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=3931&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1604405352872&version=0.0.1
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// 如果存对应的url扩展工厂,则再接着进行处理url,则获取自定义扩展配置,我这里没有进入
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol())
.getConfigurator(url).configure(url);
}
//获取导出范围,我这里获取到的是
String scope = url.getParameter(SCOPE_KEY);
// 如果导出单位不是字符串'none',则准备导出,我这里获取到的是null
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// 如果导出范围不是remote则执行本地导出逻辑
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// 如果配置导出范围不是local则执行远程导出
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
//注册中心地址不为空
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
// 如果协议是injvm 则不执行导出注册逻辑
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
//url配置dynamic参数
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
//获取监控地址
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
//监控地址不为空则添加到URL参数中key=monitor
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url
+ " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// 获取自定义代理配置
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
//如果存在,则为注册地址添加代理实现参数
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
//为服务引用生成Invoker对象
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
//生成提供者和配置包装Invoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker,
this);
//通过SPI自适应拓展获取Protocol的拓展实现,调用导出方法
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
//添加到导出器缓存
exporters.add(exporter);
}
} else {
//注册中心地址为空,导出服务到配置地址
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0 ServiceData Store
* 获取可写元数据服务,默认实现为本地
*/
WritableMetadataService metadataService = WritableMetadataService
.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
if (metadataService != null) {
//发布服务定义
metadataService.publishServiceDefinition(url);
}
}
}
//添加到服务引用url缓存中
this.urls.add(url);
}
3、导出核心逻辑
3.1、本地导出
url = dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=3931&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1604405352872&version=0.0.1
1、先通过SPI
方式获取ProxyFactory
创建Invoker
,
2、通过SPI
的方式获取Protocol
实现导出。
Dubbo
官方文档中对 Invoker
进行了说明:Invoker
是实体域,它是 Dubbo
的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke
调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
Dubbo
默认的 ProxyFactory
实现类是 JavassistProxyFactory
。下面我们到 JavassistProxyFactory
代码中,探索 Invoker
的创建过程。
private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();
//使用SPI自适应拓展机制,获取ProxyFactory和Protocol的实现。
//默认实现是DubboProtocol,这里是本地协议,也就是InjvmProtocol(根据Invoker才确认是InjvmProtocol的哦)
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
//代理工厂,这里的适配扩展是 JavassistProxyFactory
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
/**
* 导出服务到本地(及jvm导出)
*/
private void exportLocal(URL url) {
//组装导出地址,协议为injvm,
// injvm://127.0.0.1/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4172&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1604407350304&version=0.0.1
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
//使用ProxyFactory生成导出服务代理的实现,默认实现JavassistProxy
//使用Protocol调用服务导出逻辑,默认实现是DubboProtocol,这里是本地协议,也就是InjvmProtocol
Exporter<?> exporter = PROTOCOL.export(PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
//添加导出服务到缓存中
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
3.1.1、 Invoker
的创建
public interface Invoker<T> extends Node {
/**
* get service interface.
*
* @return service interface.
*/
Class<T> getInterface();
/**
* invoke.
*
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;
}
AbstractProxyInvoker
包含一个 Proxy
实例,代理了具体的服务类。
Proxy 用于代理目标类,Proxy 是一个抽象类,仅可以通过 getProxy(ClassLoader, Class[]) 方法创建子类。可以通过 newInstance(InvocationHandler) 来创建代理实例。
3.1.1.1、JavassistProxyFactory #getInvoker
JavassistProxyFactory
的方法:#getInvoker
创建了一个继承自AbstractProxyInvoker
类的匿名对象,并覆写了抽象方法#doInvoke
。覆写后的#doInvoke
逻辑比较简单,仅是将调用请求转发给了Wrapper
类的invokeMethod
方法。 (invoke
方法后面讲)
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
//为目标类创建包装类,无法解决类名包含$的情况,不能是代理类,所以 是type
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//创建了一个继承自 AbstractProxyInvoker 类的匿名类对象
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 调用 Wrapper 的 invokeMethod 方法(此方法实际上是上面 Wrapper.getWrapper生成的),invokeMethod 最终会调用目标方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
3.1.1.2、Wrapper #getWrapper
Wrapper
用于“包裹”目标类,Wrapper
是一个抽象类,仅可通过getWrapper(Class)
方法创建子类。在创建
Wrapper
子类的过程中,子类代码生成逻辑会对getWrappe
r 方法传入的Class
对象进行解析,拿到类方法,类成员变量等信息。以及生成invokeMethod
方法代码和其他一些方法代码。 再通过反射获取wpper
对象
public abstract class Wrapper {
private static final Map<Class<?>, Wrapper> WRAPPER_MAP = new ConcurrentHashMap<Class<?>, Wrapper>(); //class wrapper map
//^^^^^^
public static Wrapper getWrapper(Class<?> c) {
//因为无法代理动态类,因此是动态类时会循环找到不是动态类的父类
while (ClassGenerator.isDynamicClass(c))
{ //获取父类
c = c.getSuperclass();
}
//如果代理Object类,则直接返回固定实现OBJECT_WRAPPER
if (c == Object.class) {
return OBJECT_WRAPPER;
}
//代理其他类则调用makeWrapper创建一个实例,并存入缓存中(如果不存在,则创建并返回)
return WRAPPER_MAP.computeIfAbsent(c, key -> makeWrapper(key))
}
在创建
Wrapper
子类的过程中,子类代码生成逻辑会对makeWrapper
方法传入的 Class 对象进行解析,拿到诸如类方法,类成员变量等信息。以及生成invokeMethod
方法代码和其他一些方法代码。代码生成完毕后,再通过反射创建 Wrapper 实例。
private static Wrapper makeWrapper(Class<?> c) {
// 判断是否是基本类型的包装类,如果是则抛出异常
if (c.isPrimitive()) {
throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
}
// 获取类的全限定名称(包名+类名)
String name = c.getName();
// 获取类加载器,优先级如下:线程上下文加载器 -> c的类加载器 -> 系统类加载器
ClassLoader cl = ClassUtils.getClassLoader(c);
// 存放setPropertyValue方法代码
StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
// 存放getPropertyValue方法代码
StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
// 存放invokeMethod方法代码
StringBuilder c3 = new StringBuilder(
"public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws "
+ InvocationTargetException.class.getName() + "{ ");
// 生成类型转换代码及异常抛出代码
c1.append(name).append(" w; try{ w = ((").append(name)
.append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c2.append(name).append(" w; try{ w = ((").append(name)
.append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c3.append(name).append(" w; try{ w = ((").append(name)
.append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
// 存放属性<属性名,属性类型>
Map<String, Class<?>> pts = new HashMap<>();
// 存放方法<描述信息(可理解为方法签名), Method 实例>
Map<String, Method> ms = new LinkedHashMap<>();
// 存放所有的方法名
List<String> mns = new ArrayList<>();
// 所有在当前类中声明的方法名
List<String> dmns = new ArrayList<>();
// 获取 public访问级别的字段
for (Field f : c.getFields()) {
String fn = f.getName();
Class<?> ft = f.getType();
// 如果是static 或 transient 修饰的变量,则忽略
if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) {
continue;
}
// 生成条件判断及赋值语句,比如:
// if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3"))
.append("; return; }");
// 生成条件判断及返回语句,比如:
// if( $2.equals("name") ) { return ($w)w.name; }
c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
// 添加<属性名,属性类型>到集合中
pts.put(fn, ft);
}
//获取所有public访问级别的方法
Method[] methods = c.getMethods();
// 检测是否包含在当前类中声明的方法
boolean hasMethod = hasMethods(methods);
if (hasMethod) {
c3.append(" try{");
for (Method m : methods) {
// 忽略Object类声明的方法
if (m.getDeclaringClass() == Object.class) {
continue;
}
String mn = m.getName();
// 生成方法名判断语句,比如:
// if ( "setName".equals( $2 )
c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
//参数个数
int len = m.getParameterTypes().length;
// 生成“运行时传入的参数数量与方法参数列表长度”判断语句,比如:
// && $3.length == 1
c3.append(" && ").append(" $3.length == ").append(len);
// 检测方法是否存在重载情况,条件为:方法对象不同 && 方法名相同
boolean override = false;
for (Method m2 : methods) {
if (m != m2 && m.getName().equals(m2.getName())) {
override = true;
break;
}
}
// 对重载方法进行处理。判断是统一个方法时除了方法名、参数列表长度相同外,还必须判断参数类型是否一致,如:
// void setName(String name) 和 void setName(Integer name)
if (override) {
if (len > 0) {
for (int l = 0; l < len; l++) {
// 生成参数类型进行检测代码,比如:
// && $3[0].getName().equals("java.lang.String")
c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
.append(m.getParameterTypes()[l].getName()).append("\")");
}
}
}
// 添加 ) {,完成方法判断语句
c3.append(" ) { ");
// 上面的方法校验完成后的代码类似这样:
// if ("setName".equals($2) && $3.length == 1 && $3[0].getName().equals("java.lang.String")) {
// 根据返回值类型生成目标方法调用语句
if (m.getReturnType() == Void.TYPE) {
//返回值类型为空,则 w.setName((java.lang.String)$4[0]); return null;
c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");")
.append(" return null;");
} else {
//返回值类型不为空,则 // return ($w)w.getName();
c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4"))
.append(");");
}
// 添加 },
c3.append(" }");
/* 上面的方法校验完成后的代码类似这样:
*if ("setName".equals($2) && $3.length == 1 && $3[0].getName().equals("java.lang.String")) {
* w.setName((java.lang.String)$4[0]); return null;
* }
*/
///添加方法名到集合中
mns.add(mn);
//如果是当前类声明的方法,则添加到声明方法集合中
if (m.getDeclaringClass() == c) {
dmns.add(mn);
}
//添加<方法描述,方法>到集合中
ms.put(ReflectUtils.getDesc(m), m);
}
//添加异常捕获及抛出代码
c3.append(" } catch(Throwable e) { ");
c3.append(" throw new java.lang.reflect.InvocationTargetException(e); ");
c3.append(" }");
}
//添加NoSuchMethodException异常代码
c3.append(" throw new " + NoSuchMethodException.class.getName()
+ "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");
// 处理getter和setter方法
Matcher matcher;
for (Map.Entry<String, Method> entry : ms.entrySet()) {
String md = entry.getKey();
Method method = entry.getValue();
//是否是get方法
if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
//获取属性名
String pn = propertyName(matcher.group(1));
// 生成属性判断以及返回语句,示例如下:
// if( $2.equals("name") ) { return ($w).w.getName(); }
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName())
.append("(); }");
//存放<属性名,属性类型>到集合中
pts.put(pn, method.getReturnType());
//判断是否是is|has|can开头方法,生成代码逻辑同get方法
} else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
String pn = propertyName(matcher.group(1));
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName())
.append("(); }");
pts.put(pn, method.getReturnType());
//判断是set开头方法
} else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
//获取参数(属性)类型
Class<?> pt = method.getParameterTypes()[0];
//获取属性名
String pn = propertyName(matcher.group(1));
//生成属性判断及set方法语句,示例如下:
// if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(")
.append(arg(pt, "$3")).append("); return; }");
//存放<属性名,属性类型>到集合中
pts.put(pn, pt);
}
}
// 添加 抛出 NoSuchPropertyException 异常代码
c1.append(" throw new " + NoSuchPropertyException.class.getName()
+ "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
c2.append(" throw new " + NoSuchPropertyException.class.getName()
+ "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
// 生成class
//包装类数量加1并获取,原子操作
long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
//根据类加载器创建类生成器实例
ClassGenerator cc = ClassGenerator.newInstance(cl);
// 生成类名:有公共修饰符则org.apache.dubbo.common.bytecode.Wrapper,否则当前包装类名+$sw+当前包装类数量
cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
// 设置父类
cc.setSuperClass(Wrapper.class);
// 设置默认构造函数
cc.addDefaultConstructor();
// 添加属性名称数组字段
cc.addField("public static String[] pns;");
// 属性即类类型字段
cc.addField("public static " + Map.class.getName() + " pts;");
// 添加方法名称集合属性
cc.addField("public static String[] mns;");
// 添加本类声明的方法名称集合字段
cc.addField("public static String[] dmns;");
// 添加类属性
for (int i = 0, len = ms.size(); i < len; i++) {
cc.addField("public static Class[] mts" + i + ";");
}
//添加属性名获取方法
cc.addMethod("public String[] getPropertyNames(){ return pns; }");
// 添加是否存在某个属性方法
cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
// 获取某个属性类型
cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
// 获取方法名集合
cc.addMethod("public String[] getMethodNames(){ return mns; }");
//获取声明方法属性
cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
// 添加setPropertyValue方法、getPropertyValue方法、invokeMethod方法代码
cc.addMethod(c1.toString());
cc.addMethod(c2.toString());
cc.addMethod(c3.toString());
try {
Class<?> wc = cc.toClass();
// 设置字段值
wc.getField("pts").set(null, pts);
wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
wc.getField("mns").set(null, mns.toArray(new String[0]));
wc.getField("dmns").set(null, dmns.toArray(new String[0]));
int ix = 0;
for (Method m : ms.values()) {
wc.getField("mts" + ix++).set(null, m.getParameterTypes());
}
//创建Wrapper实例
return (Wrapper) wc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
//清理缓存
cc.release();
ms.clear();
mns.clear();
dmns.clear();
}
}
3.1.1.3、创建包装类的wapper
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.apache.dubbo.common.bytecode;
import com.healerjean.proj.service.MailClientService;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import org.apache.dubbo.common.bytecode.ClassGenerator.DC;
public class Wrapper0 extends Wrapper implements DC {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0;
public Wrapper0() {
}
public boolean hasProperty(String var1) {
return pts.containsKey(var1);
}
public Class getPropertyType(String var1) {
return (Class)pts.get(var1);
}
//第一个是代理对象
//第二个是是方法名
//第三个是参数类型的数组(同一个名字,不同参数类型)
//第四个是代理对象的参数
public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
MailClientService var5;
try {
var5 = (MailClientService)var1;
} catch (Throwable var8) {
throw new IllegalArgumentException(var8);
}
try {
if ("sentMail".equals(var2) && var3.length == 1) {
var5.sentMail((String)var4[0]);
return null;
}
} catch (Throwable var9) {
throw new InvocationTargetException(var9);
}
throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class com.healerjean.proj.service.MailClientService.");
}
public String[] getPropertyNames() {
return pns;
}
public Object getPropertyValue(Object var1, String var2) {
try {
MailClientService var3 = (MailClientService)var1;
} catch (Throwable var5) {
throw new IllegalArgumentException(var5);
}
throw new NoSuchPropertyException("Not found property \"" + var2 + "\" field or setter method in class com.healerjean.proj.service.MailClientService.");
}
public void setPropertyValue(Object var1, String var2, Object var3) {
try {
MailClientService var4 = (MailClientService)var1;
} catch (Throwable var6) {
throw new IllegalArgumentException(var6);
}
throw new NoSuchPropertyException("Not found property \"" + var2 + "\" field or setter method in class com.healerjean.proj.service.MailClientService.");
}
public String[] getMethodNames() {
return mns;
}
public String[] getDeclaredMethodNames() {
return dmns;
}
}
3.1.1.4、最终返回的AbstractProxyInvoker
有一个未实现的抽象方法
doInvoke
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
Logger logger = LoggerFactory.getLogger(AbstractProxyInvoker.class);
private final T proxy;
private final Class<T> type;
private final URL url;
public AbstractProxyInvoker(T proxy, Class<T> type, URL url) {
if (proxy == null) {
throw new IllegalArgumentException("proxy == null");
}
if (type == null) {
throw new IllegalArgumentException("interface == null");
}
if (!type.isInstance(proxy)) {
throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type);
}
this.proxy = proxy;
this.type = type;
this.url = url;
}
@Override
public Class<T> getInterface() {
return type;
}
@Override
public URL getUrl() {
return url;
}
@Override
public boolean isAvailable() {
return true;
}
@Override
public void destroy() {
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value);
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
return result;
});
return new AsyncRpcResult(appResponseFuture, invocation);
} catch (InvocationTargetException e) {
if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
private CompletableFuture<Object> wrapWithFuture(Object value) {
if (RpcContext.getContext().isAsyncStarted()) {
return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture();
} else if (value instanceof CompletableFuture) {
return (CompletableFuture<Object>) value;
}
return CompletableFuture.completedFuture(value);
}
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
@Override
public String toString() {
return getInterface() + " -> " + (getUrl() == null ? " " : getUrl().toString());
}
}
3.1..2、本地协议InjvmProtocol
导出
在获取Invoker之后,就会调用具体的
Protocol
实现类导出方法进行导出,首先是导出到本地,使用的是InjvmProtocol
协议,导出逻辑很简单,源码如下:
//代码有省略
public class InjvmProtocol extends AbstractProtocol implements Protocol {
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//创建一个InjvmExporter实例,InjvmExporter只是将道歉Invoker等缓存到本地
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
}
class InjvmExporter<T> extends AbstractExporter<T> {
private final String key;
private final Map<String, Exporter<?>> exporterMap;
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
exporterMap.put(key, this);
}
@Override
public void unexport() {
super.unexport();
exporterMap.remove(key);
}
}
//至此本地导出就基本完了
3.2、远程协议导出
与导出服务到本地相比,导出服务到远程的过程要复杂不少,其包含了服务导出与服务注册两个过程。这两个过程涉及到了大量的调用,比较复杂。按照代码执行顺序,本节先来分析服务导出逻辑,服务注册逻辑将在下一节进行分析。首先分析
RegistryProtocol
的export
方法。
// 导出到远程的部分源码
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
//如果存在,则为注册地址添加代理实现参数
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
//为服务引用生成Invoker对象(和上面的本地导出代码差不多,这里变成了注册协议(将dubbo协议的地址作为export参数放入)
//registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-provider&dubbo=2.0.2&export=dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1604408770580&version=0.0.1&file=/Users/healerjean/Desktop/logs/hljServerProvider2001&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880®istry=zookeeper&release=2.7.7×tamp=1604408770574
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
//生成提供者和配置包装Invoker(又多包装了一层)
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker,this);
//通过SPI自适应拓展获取Protocol的拓展实现,调用导出方法、这里使用的注册协议RegistryProtocol,还记得上面使用的是InJvmProtocol协议吧
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
//添加到导出器缓存
exporters.add(exporter);
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
//zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-provider&dubbo=2.0.2&export=dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1604408770580&version=0.0.1&file=/Users/healerjean/Desktop/logs/hljServerProvider2001&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7×tamp=1604408770574
URL registryUrl = getRegistryUrl(originInvoker);
// 服务提供中地址(本地导出服务地址)
// dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1604408770580&version=0.0.1
URL providerUrl = getProviderUrl(originInvoker);
// 获取订阅覆盖地址
//provider://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&category=configurators&check=false&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1604408770580&version=0.0.1
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
//创建覆盖监听器,存入缓存
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
//使用配置覆盖提供中者地址
//dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=13008&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1605001242523&version=0.0.1
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//导出服务,
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 获取已注册的服务提供者 URL
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// 获取 register 参数,默认为true
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
//为true则表示需要注册服务,向注册中心注册
register(registryUrl, registeredProviderUrl);
}
// 不建议使用!订阅2.6.x或之前版本中的重写规则。
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 设置注册和订阅地址
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//通知RegistryProtocolListener,服务导出
notifyExport(exporter);
//创建DestroyableExporter实例,确保每次导出时都返回一个新的导出器实例
return new DestroyableExporter<>(exporter);
}
protected URL getRegistryUrl(Invoker<?> originInvoker) {
URL registryUrl = originInvoker.getUrl();
if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);
}
return registryUrl;
}
private URL getProviderUrl(final Invoker<?> originInvoker) {
String export = originInvoker.getUrl().getParameterAndDecoded(EXPORT_KEY);
if (export == null || export.length() == 0) {
throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl());
}
return URL.valueOf(export);
}
private URL getSubscribedOverrideUrl(URL registeredProviderUrl) {
return registeredProviderUrl.setProtocol(PROVIDER_PROTOCOL)
.addParameters(CATEGORY_KEY, CONFIGURATORS_CATEGORY, CHECK_KEY, String.valueOf(false));
}
3.2.1、#doLocalExport
接着上面的源码分析导出服务方法
doLocalExport
的源码。
//providerurl <--> exporter
private final ConcurrentMap<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<>();
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
//根据Invoker参数获取缓存key
//dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=13008&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1605001242523&version=0.0.1
String key = getCacheKey(originInvoker);
//调用computeIfAbsent方法,将key和ExporterChangeableWrapper实例放入缓存
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
// 创建InvokerDelegate委托类实例
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// 创建ExporterChangeableWrapper实例,调用 DubboProtocol 的 export 方法导出服务
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
3.2.2、DubboProtocol #export
使用默认协议
dubbo
作为示例,分析导出逻辑。此处的protocol
变量会在运行时加载DubboProtocol
,并调用DubboProtocol
的 export 方法。
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 获取URL dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4494&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1604416434630&version=0.0.1
URL url = invoker.getUrl();
// 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:
// 163/com.healerjean.proj.service.MailClientService:0.0.1:20880
String key = serviceKey(url);
// 创建DubboExporter实例并放入缓存(其实到了这里基本上就算完事了,但是我们最重要的是启动dubbo服务)
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// 导出本地存根服务以发送事件
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY)
+ "], has set stubproxy support event ,but no stub methods founded."));
}
}
}
// 启动服务器
openServer(url);
// 优化序列化
optimizeSerialization(url);
return exporter;
}
3.2.3、#openServer
DubboExporter
的export
方法主要调用openServer
方法,而openServer
逻辑也很简单,获取参数isserver
,选择执行createServer
还是server的重置方法。
createServer
包含三个核心的逻辑。
第一是检测是否存在url
中 server
参数所代表的 Transporter
拓展(默认是netty
),不存在则抛出异常。
第二是创建服务器实例。
第三是检测是否支持url
中client
参数所表示的 Transporter
拓展,不存在也是抛出异常。
第三是看看Exchangers
是如何创建出ExchangeServer
服务的。首先调用org.apache.dubbo.remoting.exchange.Exchangers.bind(URL, ExchangeHandler)
方法
//dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=19748&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1605004230369&version=0.0.1
private void openServer(URL url) {
// 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
//10.236.122.217:20880
String key = url.getAddress();
// 在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。若某个端口上已有服务器实例,此时则调用 reset 方法重置服务器的一些配置。
// 客户端可以导出只供服务器调用的服务,默认为true
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// 双重检测创建服务(保证只创建一次)
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// 服务器已创建,则根据 url 中的配置重置服务器
server.reset(url);
}
}
}
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
private ProtocolServer createServer(URL url) {
//第一步:检测是否存在`url`中 `server` 参数所代表的 `Transporter `拓展(默认是`netty`),不存在则抛出异常。
url = URLBuilder.from(url)
// 服务器关闭时发送只读事件,默认情况下启用
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// 添加默认心跳检测参数60 * 1000
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
// 添加编码解码器参数
.addParameter(CODEC_KEY, DubboCodec.NAME).build();
//dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&channel.readonly.sent=true&codec=dubbo&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&heartbeat=60000&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=20112&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1605004964820&version=0.0.1
// 获取 server 参数,默认为 netty
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
// 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
if (str != null && str.length() > 0
&& !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
//第二步:创建服务器实例
ExchangeServer server;
try {// 创建 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
//第三步:检测是否支持 `client` 参数所表示的` Transporter` 拓展,不存在也是抛出异常。
// 获取 client 参数,可指定 netty,mina。我这里默认进来是null。直接返回DubboProtocolServe服务
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
// 通过SPI获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
// 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中, 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
// 创建DubboProtocolServer实例并返回
return new DubboProtocolServer(server);
}
@SPI("netty")
public interface Transporter {
/**
* Bind a server.
*
* @param url server url
* @param handler
* @return server
* @throws RemotingException
* @see org.apache.dubbo.remoting.Transporters#bind(URL, ChannelHandler...)
*/
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;
/**
* Connect to a server.
*
* @param url server url
* @param handler
* @return client
* @throws RemotingException
* @see org.apache.dubbo.remoting.Transporters#connect(URL, ChannelHandler...)
*/
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
3.2.3.1、Exchangers.bind
接下来就主要看看Exchangers是如何创建出
ExchangeServer
服务的
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
//dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&channel.readonly.sent=true&codec=dubbo&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&heartbeat=60000&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=19940&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1605006351222&version=0.0.1
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
//添加URL参数codec=exchange
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取 Exchanger,默认为 HeaderExchanger。
// 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
然后是org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger.bind(URL, ExchangeHandler)
方法
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 创建 HeaderExchangeServer 实例:
// 1. new HeaderExchangeHandler(handler)
// 2. new DecodeHandler(new HeaderExchangeHandler(handler))
// 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
3.2.3.2、Transporters.bind
该方法中的主要逻辑也就是调用
org.apache.dubbo.remoting.Transporters.bind(URL, ChannelHandler…)
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取自适应 Transporter
// 实例(ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();),并调用bind实例方法
return getTransporter().bind(url, handler);
}
//获取默认的NettyTransporter
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
默认的
Transporter
是NettyTransporter
,不过是注意是netty4.x,dubbo也提供了3.x的实现。
/**
* Default extension of {@link Transporter} using netty4.x.
*/
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new NettyClient(url, handler);
}
}
上面的bind方法就一句创建 NettyServer实例代码
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
//调用父类构造函数
// 添加线程名称参数,可以在CommonConstants中按thread_name_KEY和THREADPOOL_KEY自定义客户机线程池的名称和类型,默认名称DubboServerHandler
// 对handler进行包装MultiMessageHandler->HeartbeatHandler->handler
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}
紧接跟踪父类构造函数:
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
、、dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&channel.readonly.sent=true&codec=dubbo&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&heartbeat=60000&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=19940&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&threadname=DubboServerHandler-10.236.122.217:20880×tamp=1605006351222&version=0.0.1
// 调用父类构造方法(简单判断及解码器、超时时间等属性的获取赋值)
super(url, handler);
// 获取套接字地址
localAddress = getUrl().toInetSocketAddress();
//获取 ip 和端口
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
// anyhost=true或者是无效的本地主机则设置ip为0.0.0.0
bindIp = ANYHOST_VALUE;
}
// 创建绑定套接字地址
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 获取最大可接受连接数,默认0
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
// 获取超时时间 默认600000毫秒
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
// 打开服务
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export "
+ getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
// 创建执行器
executor = executorRepository.createExecutorIfAbsent(url);
}
有上面代码分析可知,需要在回到子类查看,doOpen方法的具体试下:
protected void doOpen() throws Throwable {
// 创建服务启动器
bootstrap = new ServerBootstrap();
// 创建 boss 和 worker 事件组
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), "NettyServerWorker");
// 创建NettyServer处理类实例
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
// 设置boss和worker事件组
bootstrap.group(bossGroup, workerGroup)
// 设置SocketChannel
.channel(NettyEventLoopFactory.serverSocketChannelClass())
// SO_REUSEADDR是让端口释放后立即就可以被再次使用。
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
// TCP_NODELAY选项是用来控制是否开启Nagle算法,该算法是为了提高较慢的广域网传输效率
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
// 创建一个 池化或非池化的缓存区分配器
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// 添加孩子渠道初始化处理实例
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 初始化通道时进行管道pipeline编码解码器等设置
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
ch.pipeline().addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// 绑定到指定的 ip 和端口上
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
// 不间断同步
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
3.3、远程协议注册
在远程服务的源码分析开始,我看见导出到远程的服务除了执行导出逻辑,需要执行服务的注册即org.apache.dubbo.registry.integration.RegistryProtocol.register(URL, URL)
方法
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
//zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-provider&dubbo=2.0.2&export=dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1604408770580&version=0.0.1&file=/Users/healerjean/Desktop/logs/hljServerProvider2001&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7×tamp=1604408770574
URL registryUrl = getRegistryUrl(originInvoker);
// 服务提供中地址(本地导出服务地址)
// dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1604408770580&version=0.0.1
URL providerUrl = getProviderUrl(originInvoker);
// 获取订阅覆盖地址
//provider://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&category=configurators&check=false&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1604408770580&version=0.0.1
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
//创建覆盖监听器,存入缓存
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
//使用配置覆盖提供中者地址
//dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=13008&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider×tamp=1605001242523&version=0.0.1
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//导出服务,
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 根据 URL 加载 Registry 实现类
final Registry registry = getRegistry(originInvoker);
// 获取已注册的服务提供者 URL
//dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=19940&release=2.7.7&revision=0.0.1&side=provider×tamp=1605006351222&version=0.0.1
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// 获取 register 参数,默认为true
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
//为true则表示需要注册服务,向注册中心注册
register(registryUrl, registeredProviderUrl);
}
// 不建议使用!订阅2.6.x或之前版本中的重写规则。
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 设置注册和订阅地址
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//通知RegistryProtocolListener,服务导出
notifyExport(exporter);
//创建DestroyableExporter实例,确保每次导出时都返回一个新的导出器实例
return new DestroyableExporter<>(exporter);
}
3.3.1、RegistryProtocol.#register
public void register(URL registryUrl, URL registeredProviderUrl) {
//获取注册Registry实例
Registry registry = registryFactory.getRegistry(registryUrl);
//执行注册
registry.register(registeredProviderUrl);
}
getRegistry
方法由org.apache.dubbo.registry.support.AbstractRegistryFactory
实现,具体逻辑如下:
public Registry getRegistry(URL url) {
//AtomicBoolean destroyed,已经执行过销毁则抛出异常
if (destroyed.get()) {
LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
"Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
return DEFAULT_NOP_REGISTRY;
}
// 组装URL(路径、接口等参数)
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
//创建注册缓存key = zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService
String key = createRegistryCacheKey(url);
// 锁定注册表访问进程以确保注册表的单个实例
LOCK.lock();
try {
//从缓存中获取,获取到则直接返回
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//缓存中不存在则通过SPI反方式创建,创建失败抛出异常
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
//放入缓存并返回
REGISTRIES.put(key, registry);
return registry;
} finally {
// 释放锁
LOCK.unlock();
}
}
根据上面源码分析可知,
getRegistry
方法先访问缓存,缓存未命中则调用createRegistry
创建Registry
,然后写入缓存。这里的createRegistry
由具体的子类实现,这里以ZookeeperRegistryFactory
为例解析。
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
/**
* zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptive
*/
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
// 创建 ZookeeperRegistry
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
ZookeeperRegistryFactory
的实现很简单就一句代码,直接创建ZookeeperRegistry
实例。因此下面将分析ZookeeperRegistry
的构造方法源码。
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
//调用父类构造(获取一些例如重试的时间)
super(url);
//如果anyhost=true或者主机地址是0.0.0.0,则抛出异常
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获取组名,默认为 dubbo
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
//不是/开头,则加上
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
//设置路径为group
this.root = group;
// 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
zkClient = zookeeperTransporter.connect(url);
// 添加状态监听器
zkClient.addStateListener((state) -> {
if (state == StateListener.RECONNECTED) {//重新连接状态
logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
" Since ephemeral ZNode will not get deleted for a connection lose, " +
"there's no need to re-register url of this instance.");
//当zookeeper连接从连接丢失中恢复时,它需要获取最新的提供程序列表。重新注册观察者只是一个副作用,不是强制性的。
ZookeeperRegistry.this.fetchLatestAddresses();
} else if (state == StateListener.NEW_SESSION_CREATED) {//新会话创建
logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
try {
//尝试重新注册URL并将此实例的侦听器重新订阅到注册表
ZookeeperRegistry.this.recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else if (state == StateListener.SESSION_LOST) {
logger.warn("Url of this instance will be deleted from registry soon. " +
"Dubbo client will try to re-register once a new session is created.");
} else if (state == StateListener.SUSPENDED) {
} else if (state == StateListener.CONNECTED) {
}
});
}
上面的源码分析可知,
zkClient
是通过zookeeperTransporter的connect
方法创建的,这里的zookeeperTransporter
类型为自适应拓展类,默认为CuratorZookeeperTransporter
。因此下面我们分析下CuratorZookeeperTransporter
的源码。
@SPI("curator")
public interface ZookeeperTransporter {
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);
}
public class CuratorZookeeperTransporter extends AbstractZookeeperTransporter {
@Override
public ZookeeperClient createZookeeperClient(URL url) {
//创建CuratorZookeeperClient实例
return new CuratorZookeeperClient(url);
}
}
可以看到CuratorZookeeperTransporter
逻辑很简单,直接创建了CuratorZookeeperClient
实例。
public CuratorZookeeperClient(URL url) {
// 调用父类构造,对url属性赋值
super(url);
try {
// 获取超时时间,默认5秒
int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
// 获取会话过期时间,默认1分钟
int sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS);
// 创建 CuratorFramework 构造器
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress()).retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(timeout).sessionTimeoutMs(sessionExpireMs);
// 获取权限信息
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
// 构建 CuratorFramework 实例
client = builder.build();
// 添加CuratorConnectionStateListener监听器
client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
// 启动客户端
client.start();
// 阻塞连接,直到连接成功或者超时
boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
if (!connected) {
// 连接失败抛出异常
throw new IllegalStateException("zookeeper not connected");
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
3.3.2、FailbackRegistry #register
上面已经分析了获取·
Registry
·实例的源码。接下来任然以·zookeeper
·为例分析注册方法register
的源码,这个方法定义在FailbackRegistry
抽象类中。
public class ZookeeperRegistry extends FailbackRegistry {
}
public void register(URL url) {
//不接受此协议类型的服务,直接返回
if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
return;
}
//调用父类方法,添加URL到registered缓存中
super.register(url);
//删除url失败注册任务缓存
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// 执行注册,子类实现
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 获取 check 参数,若 check = true 将会直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// 将失败的注册请求记录到失败列表,定期重试
addFailedRegistered(url);
}
}
上面源码分析可知,执行服务注册方法需要子类实现,这里分析下
zookeeper
的实现org.apache.dubbo.registry.zookeeper.ZookeeperRegistry.doRegister(URL)
@Override
public void doRegister(URL url) {
try {
// 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
// /${group}/${serviceInterface}/providers/${url}
// 是否是临时节点由参数dynamic决定,默认为true
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
toUrlPath(url) = /dubbo/com.healerjean.proj.service.MailClientService/providers/dubbo%3A%2F%2F10.236.122.217%3A20880%2Fcom.healerjean.proj.service.MailClientService%3Fanyhost%3Dtrue%26application%3Dhlj-server-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26group%3D163%26interface%3Dcom.healerjean.proj.service.MailClientService%26methods%3DsentMail%26pid%3D14248%26release%3D2.7.7%26revision%3D0.0.1%26side%3Dprovider%26timestamp%3D1605009104328%26version%3D0.0.1
ZookeeperRegistry
在 doRegister
中调用了 Zookeeper
客户端创建服务节点。节点路径由 toUrlPath
方法生成,org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient.create(String, boolean)
方法源码如下
private final Set<String> persistentExistNodePath = new ConcurrentHashSet<>();
@Override
public void create(String path, boolean ephemeral) {
// 不是临时节点
if (!ephemeral) {
// 检测持久节点缓存是否存在该节点,若存在则返回
if (persistentExistNodePath.contains(path)) {
return;
}
// zookeeper是否存在该节点,若存在则放入缓存并返回
if (checkExists(path)) {
persistentExistNodePath.add(path);
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
// 递归创建
create(path.substring(0, i), false);
}
if (ephemeral) {
// 创建临时节点
createEphemeral(path);
} else {
// 创建永久节点并放入缓存
createPersistent(path);
persistentExistNodePath.add(path);
}
}
上面方法会根据节点类型调用创建临时节点和永久节点方法,由子类实现,主要看
org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperClient
的实现。
@Override
public void createPersistent(String path) {
try {
client.create().forPath(path);
} catch (NodeExistsException e) {
logger.warn("ZNode " + path + " already exists.", e);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public void createEphemeral(String path) {
try {
client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (NodeExistsException e) {
logger.warn("ZNode " + path
+ " already exists, since we will only try to recreate a node on a session expiration"
+ ", this duplication might be caused by a delete delay from the zk server, which means the old expired session"
+ " may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, "
+ "we can just try to delete and create again.", e);
deletePath(path);
createEphemeral(path);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}