前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

整体逻辑

服务导出过程始于 Spring 容器发布刷新事件之后,Dubbo 在接收到事件后,会立即执行服务导出逻辑。整个逻辑大致可分为三个部分

服务导出的入口类是 ServiceBean,入口方法 onApplicationEvent

1、第一部分是前置工作,主要用于检查参数,组装 URL。

2、第二部分是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程。

3、第三部分是向注册中心注册服务,用于服务发现。

1、服务导出入口

DubboBootstrapApplicationListener该类继承了OneTimeExecutionApplicationContextEventListener抽象类,OneTimeExecutionApplicationContextEventListener又实现了ApplicationListener接口onApplicationEvent方法,该方法会在收到 Spring 上下文刷新事件后执行。两个类的关键源码如下:


abstract class OneTimeExecutionApplicationContextEventListener implements ApplicationListener, ApplicationContextAware {

    private ApplicationContext applicationContext;

    public final void onApplicationEvent(ApplicationEvent event) {
         //判断事件源是持有的ApplicationContext并且是应用上下文事件
        if (isOriginalEventSource(event) && event instanceof ApplicationContextEvent) {
          
          //执行方法
            onApplicationContextEvent((ApplicationContextEvent) event);
        }
    }

   //抽象方法交给子类执行
    protected abstract void onApplicationContextEvent(ApplicationContextEvent event);

   
    private boolean isOriginalEventSource(ApplicationEvent event) {
        return (applicationContext == null)  || Objects.equals(applicationContext, event.getSource());
    }

    @Override
    public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public ApplicationContext getApplicationContext() {
        return applicationContext;
    }
}

DubboBootstrapApplicationListener 的构造函数中会先去获取DubboBootstrap 的实例,在监听到ContextRefreshedEvent事件时触发DubboBootstrapstart方法,

public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
        implements Ordered {

    /**
     * The bean name of {@link DubboBootstrapApplicationListener}
     */
    public static final String BEAN_NAME = "dubboBootstrapApplicationListener";

    private final DubboBootstrap dubboBootstrap;

  
  //实例化该监听器的的时候会先实例化dubboBootstrap 对象。
    public DubboBootstrapApplicationListener() {
        this.dubboBootstrap = DubboBootstrap.getInstance();
    }
    
    @Override
    public void onApplicationContextEvent(ApplicationContextEvent event) {
      //是上下文刷新事件则调用DubboBootstrap的start方法
        if (event instanceof ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
          
          //如果是上下文关闭事件则调用DubboBootstrap的stop方法
        } else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }

   //dubbo开始 上下文刷新时间
    private void onContextRefreshedEvent(ContextRefreshedEvent event) {
        dubboBootstrap.start();
    }

    private void onContextClosedEvent(ContextClosedEvent event) {
        dubboBootstrap.stop();
    }

    @Override
    public int getOrder() {
        return LOWEST_PRECEDENCE;
    }
}

接着我们看看DubboBootstrap 的实例化及开始方法的源码。

public class DubboBootstrap extends GenericEventListener {


    private static DubboBootstrap instance;

    private final ConfigManager configManager;

    private final Environment environment;

  
   //加锁构造单例
    public static synchronized DubboBootstrap getInstance() {
        if (instance == null) {
            instance = new DubboBootstrap();
        }
        return instance;
    }
  
  
    //私有化构造函数,保证只能被自己实例化
    private DubboBootstrap() {
      
    	//通过SPI方式获取环境配置和环境管理实例,继承关系如下图
        configManager = ApplicationModel.getConfigManager();
        environment = ApplicationModel.getEnvironment();
      
        //注册shutdown事件,回调DubboBootstrap的destroy方法,销毁所有导出及引用的服务等
        DubboShutdownHook.getDubboShutdownHook().register();
        ShutdownHookCallbacks.INSTANCE.addCallback(new ShutdownHookCallback() {
            @Override
            public void callback() throws Throwable {
                DubboBootstrap.this.destroy();
            }
        });
    }
  
  
public class ApplicationModel {


  private static final ExtensionLoader<FrameworkExt> LOADER = ExtensionLoader.getExtensionLoader(FrameworkExt.class);


  //通过SPI方式获取环境配置和环境管理实例
  public static Environment getEnvironment() {
    return (Environment) LOADER.getExtension(Environment.NAME);
  }

  public static ConfigManager getConfigManager() {
    return (ConfigManager) LOADER.getExtension(ConfigManager.NAME);
  }


}

image-20201103190544304

2、DubboBootstrap #start

    public DubboBootstrap start() {
      
         //原子操作,保证只启动一次
        if (started.compareAndSet(false, true)) {
            ready.set(false);
          
          //1、初始化操作,主要是一些配置
            initialize();
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " is starting...");
            }
          
          // 2、导出dubbo服务(最终调用ServiceConfig的export方法)
            exportServices();

            // 当不只是注册服务提供者或者已经导出元数据的情况下执行
            if (!isOnlyRegisterProvider() || hasExportedServices()) {
              
                // 导出元数据服务(最终调用ServiceConfig实例,然后调用export方法)
                exportMetadataService();
              
                // 如果有则需要注册本地服务实例,通过SPI方式获取服务发现注册中心,然后调用他们的注册方法(默认自适应拓展实现是zookeeper)
                registerServiceInstance();
            }

           // 3、服务导入
            referServices();
            if (asyncExportingFutures.size() > 0) {
                new Thread(() -> {
                    try {
                        this.awaitFinish();
                    } catch (Exception e) {
                        logger.warn(NAME + " exportAsync occurred an exception.");
                    }
                    ready.set(true);
                    if (logger.isInfoEnabled()) {
                        logger.info(NAME + " is ready.");
                    }
                }).start();
            } else {
                ready.set(true);
                if (logger.isInfoEnabled()) {
                    logger.info(NAME + " is ready.");
                }
            }
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " has started.");
            }
        }
        return this;
    }

2.1、DubboBootstrapd #start内部

通过Start方法源码可以看到,里面一次执行了服务初始化以服务导出和引入的方法,接下来我们追踪一下内部方法的具体实现。

 private void initialize() {
    	// 原子操作确保只初始化一次
        if (!initialized.compareAndSet(false, true)) {
            return;
        }
        //初始化Dubbo组件的生命周期,这里主要是对环境配置初始化
        ApplicationModel.iniFrameworkExts();
   
        //开始构建配置中心
        startConfigCenter();
   
        //如果是zookeeper作为注册中心且没有指定配置中心时,使用注册中心做配置中心
        useRegistryAsConfigCenterIfNecessary();
   
        //加载远程的配置
        loadRemoteConfigs();
   
        //全局配置校验(应用、元数据、提供者、消费者、监控等)
        checkGlobalConfigs();
   
        //初始化元数据服务
        initMetadataService();
   
        //初始化事件监听器(将当前实例添加到事件监听器中)
        initEventListener();

        if (logger.isInfoEnabled()) {
            logger.info(NAME + " has been initialized!");
        }
    }

2.1.1、initFrameworkExts

初始化Dubbo组件的生命周期,这里主要是对环境配置初始化

    public static void initFrameworkExts() {
      //这里面通过上面其实就可以看出来就是 一些环境的类,进行初始化
        Set<FrameworkExt> exts = ExtensionLoader.getExtensionLoader(FrameworkExt.class).getSupportedExtensionInstances();
        for (FrameworkExt ext : exts) {
            ext.initialize();
        }
    }

image-20201103191736747

2.1.2、startConfigCenter

开始构建配置中心,一般情况下下面的不会执行关键代码,因为我这里没有配置中心哟

    private void startConfigCenter() {
        Collection<ConfigCenterConfig> configCenters = configManager.getConfigCenters();

        // check Config Center
        if (CollectionUtils.isEmpty(configCenters)) {
            ConfigCenterConfig configCenterConfig = new ConfigCenterConfig();
            configCenterConfig.refresh();
            if (configCenterConfig.isValid()) {
                configManager.addConfigCenter(configCenterConfig);
                configCenters = configManager.getConfigCenters();
            }
        } else {
            for (ConfigCenterConfig configCenterConfig : configCenters) {
                configCenterConfig.refresh();
                ConfigValidationUtils.validateConfigCenterConfig(configCenterConfig);
            }
        }

        if (CollectionUtils.isNotEmpty(configCenters)) {
            CompositeDynamicConfiguration compositeDynamicConfiguration = new CompositeDynamicConfiguration();
            for (ConfigCenterConfig configCenter : configCenters) {
                compositeDynamicConfiguration.addConfiguration(prepareEnvironment(configCenter));
            }
            environment.setDynamicConfiguration(compositeDynamicConfiguration);
        }
        configManager.refreshAll();
    }

2.1.3、useRegistryAsConfigCenterIfNecessary

如果是zookeeper作为注册中心且没有指定配置中心时,使用注册中心做配置中心

private void useRegistryAsConfigCenterIfNecessary() {
        // we use the loading status of DynamicConfiguration to decide whether ConfigCenter has been initiated.
        if (environment.getDynamicConfiguration().isPresent()) {
            return;
        }

        if (CollectionUtils.isNotEmpty(configManager.getConfigCenters())) {
            return;
        }

  			//从环境中获取注册中心,我们会发现我这里其实只有一个,如果有多个的话则要进行过滤,筛选出符合条件的
        configManager.getDefaultRegistries().stream()
                .filter(registryConfig -> registryConfig.getUseAsConfigCenter() == null || registryConfig.getUseAsConfigCenter())
                .forEach(registryConfig -> {
                    String protocol = registryConfig.getProtocol();
                    String id = "config-center-" + protocol + "-" + registryConfig.getPort();
                    ConfigCenterConfig cc = new ConfigCenterConfig();
                    cc.setId(id);
                    if (cc.getParameters() == null) {
                        cc.setParameters(new HashMap<>());
                    }
                    if (registryConfig.getParameters() != null) {
                        cc.getParameters().putAll(registryConfig.getParameters());
                    }
                    cc.getParameters().put(CLIENT_KEY, registryConfig.getClient());
                    cc.setProtocol(registryConfig.getProtocol());
                    cc.setPort(registryConfig.getPort());
                    cc.setAddress(registryConfig.getAddress());
                    cc.setNamespace(registryConfig.getGroup());
                    cc.setUsername(registryConfig.getUsername());
                    cc.setPassword(registryConfig.getPassword());
                    if (registryConfig.getTimeout() != null) {
                        cc.setTimeout(registryConfig.getTimeout().longValue());
                    }
                    cc.setHighestPriority(false);
                    configManager.addConfigCenter(cc);
                });
        startConfigCenter();
    }

image-20201103192418363

2.1.4、loadRemoteConfigs

加载远程的配置,

因为没有什么远程的配置,所以下面的代码基本上不会执行,各种List都是空的

    private void loadRemoteConfigs() {
        // registry ids to registry configs
        List<RegistryConfig> tmpRegistries = new ArrayList<>();
        Set<String> registryIds = configManager.getRegistryIds();
        registryIds.forEach(id -> {
            if (tmpRegistries.stream().noneMatch(reg -> reg.getId().equals(id))) {
                tmpRegistries.add(configManager.getRegistry(id).orElseGet(() -> {
                    RegistryConfig registryConfig = new RegistryConfig();
                    registryConfig.setId(id);
                    registryConfig.refresh();
                    return registryConfig;
                }));
            }
        });

        configManager.addRegistries(tmpRegistries);

        // protocol ids to protocol configs
        List<ProtocolConfig> tmpProtocols = new ArrayList<>();
        Set<String> protocolIds = configManager.getProtocolIds();
        protocolIds.forEach(id -> {
            if (tmpProtocols.stream().noneMatch(prot -> prot.getId().equals(id))) {
                tmpProtocols.add(configManager.getProtocol(id).orElseGet(() -> {
                    ProtocolConfig protocolConfig = new ProtocolConfig();
                    protocolConfig.setId(id);
                    protocolConfig.refresh();
                    return protocolConfig;
                }));
            }
        });

        configManager.addProtocols(tmpProtocols);
    }

2.2、ServiceConfig #export

上面DubboBootStrap的start方法初始化完成之后,开始导出服务执行方法exportServices

 private void exportServices() {
 			
 			  //这个方法是获取我们所有的service服务类, 看下面的图片就明白了
        configManager.getServices().forEach(sc -> {
          
            //设置ServiceConfig的启动器为 当前 bootstrap
            ServiceConfig serviceConfig = (ServiceConfig) sc;
            serviceConfig.setBootstrap(this);
            
            if (exportAsync) {
              
            	//异步导出,将导出任务提交到线程池异步完成
                ExecutorService executor = executorRepository.getServiceExporterExecutor();
                Future<?> future = executor.submit(() -> {
                    sc.export();
                });
                asyncExportingFutures.add(future);
              
              //我这里是同步导出
            } else {
            	//同步导出 则直接调用ServiceConfig的导出方法
                sc.export();
                exportedServices.add(sc);
            }
        });
    }

image-20201103193239308

导出服务方法最终都会调用ServiceConfig的export方法进行导出,接下来将进入这个方法源码分析。

image-20201103193524982


 public synchronized void export() {
    	//判断当前服务是否需要导出
        if (!shouldExport()) {
            return;
        }
   
        //启动类为空则获取一个实例(按照上面的流程下来其实已经执行了)
        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }
     
     
        //校验并更细配置(默认配置、协议配置等)
        checkAndUpdateSubConfigs();

        //初始化元数据(设置版本、分组、类型及名称等属性,接口的一些属性)
        serviceMetadata.setVersion(version);
        serviceMetadata.setGroup(group);
        serviceMetadata.setDefaultGroup(group);
        serviceMetadata.setServiceType(getInterfaceClass());
        serviceMetadata.setServiceInterfaceName(getInterface());
        serviceMetadata.setTarget(getRef());
   
   
   
        //是否需要延迟导出
        if (shouldDelay()) {
        	//提交导出任务到延迟导出调度器(可调度线程池)
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
        	//执行导出操作,我们执行这里
            doExport();
        }
    
        //执行已导出操作
        exported();
    }

	//判断是否要导出,如果已经导出了那么久不要导出了,如果没有导出的时候就返回true执行导出
    public boolean shouldExport() {
        Boolean export = getExport();
        // default value is true
        return export == null ? true : export;
    }

	//判断是否导出,如果已经有服务提供者了,就说明已经导出了
    //有时候我们只是想本地启动服务进行一些调试工作,我们并不希望把本地启动的服务暴露出去给别人调用。此时,我们可通过配置 export 禁止服务导出
    protected ProviderConfig provider;
    @Override
    public Boolean getExport() {
        return (export == null && provider != null) ? provider.getExport() : export;
    }
	
   @Deprecated
    public void init() {
        initialize();
    }


    public void exported() {
        // 发布一个服务导出事件( ServiceConfigExportedEvent)
        dispatch(new ServiceConfigExportedEvent(this));
    }

image-20201103194358114

2.2.1、doExport

protected synchronized void doExport() {
    	//是否执行了unexport方法(表示服务注销了),执行了则抛出异常
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
  
        //已经导出过则直接返回
        if (exported) {
            return;
        }
        //设置已导出标志
        exported = true;
        //路径及服务名为空
        if (StringUtils.isEmpty(path)) {
        	//服务名为接口名
            path = interfaceName;
        }
  
  
        //Dubbo 允许我们使用不同的协议导出服务,也允许我们向多个注册中心注册服务。Dubbo 在 doExportUrls 方法中对多协议,多注册中心进行了支持
       //执行多协议多注册导出
        doExportUrls();
    }

image-20201103194533056

2.2.1.1、 doExportUrls

多协议多注册中心导出

根据上一步中的导出逻辑可知,最后调用了doExportUrls方法,该方法对dubbo多协议,多注册中心进行了支持,源码如分析如下:

 private void doExportUrls() {
    	//获取服务缓存库
        ServiceRepository repository = ApplicationModel.getServiceRepository();
   
        //注册当前服务到本地缓存库
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
   
        //注册服务提供者到缓存库
        repository.registerProvider(
        		//根据接口名,服务组及版本号生成唯一名称 (163/com.healerjean.proj.service.MailClientService:0.0.1)
                getUniqueServiceName(),
                ref,
                serviceDescriptor,
                this,
                serviceMetadata
        );
   
   
   		
        //加载注册中心地址(支持多注册中心,因此是集合) 
   			//registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-provider&dubbo=2.0.2&file=/Users/healerjean/Desktop/logs/hljServerProvider2001&pid=3931&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&registry=zookeeper&release=2.7.7&timestamp=1604404226937 
        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
        
        //支持多协议导出,遍历每个协议,执行这个协议所对应的注册中心的导出逻辑
        for (ProtocolConfig protocolConfig : protocols) {
        	//根据协议配置生成服务地址(注册中心的key) 163/com.healerjean.proj.service.MailClientService:0.0.1
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);
          
            // 如果指定了服务路径,需要再次注册到缓存中,保证此映射路径能获取到服务
            repository.registerService(pathKey, interfaceClass);
          
            // 设置原数据服务key
            serviceMetadata.setServiceKey(pathKey);
          
            //执行多注册中心单协议的导出逻辑
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

image-20201103194915051

image-20201103195052497

支持多协议导出,我这里是Dubb协议

image-20201103195649286

image-20201103195827653

2.2.1.2、loadRegistries

多协议导出首次调用ConfigValidationUtils#loadRegistries方法加载多注册中心,然后遍历每个协议执行该协议的导出逻辑。

1、检测是否存在注册中心配置类,不存在则抛出异常

2、构建参数映射集合,也就是 map

3、构建注册中心链接列表

5、遍历链接列表,并根据条件决定是否将其添加到 registryList 中

public static List<URL> loadRegistries(AbstractInterfaceConfig interfaceConfig, boolean provider) {
    // check && override if necessary
    List<URL> registryList = new ArrayList<URL>();
    // 获取服务应用配置
    ApplicationConfig application = interfaceConfig.getApplication();

    // 获取服务注册配置
    List<RegistryConfig> registries = interfaceConfig.getRegistries();

    if (CollectionUtils.isNotEmpty(registries)) {// 注册配置不为空
        for (RegistryConfig config : registries) {// 遍历注册配置

            // 获取注册地址 zookeeper://127.0.0.1:2181
            String address = config.getAddress();
            if (StringUtils.isEmpty(address)) {
                // 如果为设置默认值0.0.0.0
                address = ANYHOST_VALUE;
            }

            // 如果地址不是N/A
            if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                Map<String, String> map = new HashMap<String, String>();
                // 通过getter或者getParameters方法将ApplicationConfig和RegistryConfig属性放到map中
                AbstractConfig.appendParameters(map, application);
                AbstractConfig.appendParameters(map, config);

                //  path = org.apache.dubbo.registry.RegistryService 
                map.put(PATH_KEY, RegistryService.class.getName());

                // 添加dubbo、版本、时间戳等参数到map中
                AbstractInterfaceConfig.appendRuntimeParameters(map);

                // 参数中是否包含协议protocol,如果没有设置为dubbo
                if (!map.containsKey(PROTOCOL_KEY)) {
                    map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
                }
                // 根据参数和地址解析URL
                //zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-provider&dubbo=2.0.2&file=/Users/healerjean/Desktop/logs/hljServerProvider2001&pid=17356&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&timestamp=1604655340823
                List<URL> urls = UrlUtils.parseURLs(address, map);

                for (URL url : urls) {
                    // 根据添加registry参数组装注册中心地址
                    url = URLBuilder.from(url)
                        .addParameter(REGISTRY_KEY, url.getProtocol())//添加参数 registry=zookeeper
                        .setProtocol(extractRegistryType(url)).build(); //将头部协议修改为registry
                    //设置完成后 registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-provider&dubbo=2.0.2&file=/Users/healerjean/Desktop/logs/hljServerProvider2001&pid=17368&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&registry=zookeeper&release=2.7.7&timestamp=1604656403726
                    
                    
                    
                    // 通过判断条件,决定是否添加 url 到 registryList 中,条件如下:
                    // (服务提供者 && register = true 或 null) || (非服务提供者 && subscribe = true 或 null)
                    if ((provider && url.getParameter(REGISTER_KEY, true))
                        || (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
                        registryList.add(url);
                    }
                }
            }
        }
    }
    return registryList;
}



public static void appendParameters(Map<String, String> parameters, Object config) {
    appendParameters(parameters, config, null);
}

image-20201106173852766

image-20201106173922214

image-20201106173522361

image-20201109160748697

2.2.1.3、doExportUrlsFor1Protocol

执行多注册中心单协议的导出逻辑

有多协议多注册中心源码分析可知,最终调用了单协议多注册中心导出方法,主要逻辑就是导出前先获取了各种配置缓存起来,然后获取创建包装实例、获取主机和端口,最后执行导出。

导出根据导出范围分为三个分支: scope = none,不导出服务;scope != remote,导出到本地;scope != local,导出到远程。我下面的scope是null,所以要导出到本地和远程

	private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
		String name = protocolConfig.getName();
		// 协议名称为空,设置为dubbo,我这个配置了,进来就是dubbo
		if (StringUtils.isEmpty(name)) {
			name = DUBBO;
		}
    
    
		// 添加配置<side,provider>
		Map<String, String> map = new HashMap<String, String>();
		map.put(SIDE_KEY, PROVIDER_SIDE);
    
		// 添加dubbo、版本、时间戳等运行参数到map中
		ServiceConfig.appendRuntimeParameters(map);
    
		// 通过getter或者getParameters方法将ApplicationConfig、ModuleConfig、MetricsConfig、
		// ProviderConfig、ProtocolConfig及ServiceConfig属性放到map中
		AbstractConfig.appendParameters(map, getMetrics());
		AbstractConfig.appendParameters(map, getApplication());
		AbstractConfig.appendParameters(map, provider);
		AbstractConfig.appendParameters(map, protocolConfig);
		AbstractConfig.appendParameters(map, this);
    
		// 获取原数导出配置,如果合法则添加<metadata,remote>到map中
		MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
		if (metadataReportConfig != null && metadataReportConfig.isValid()) {
			map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
		}
		//是否存在 <dubbo:method> 标签的配置信息
		if (CollectionUtils.isNotEmpty(getMethods())) {
			for (MethodConfig method : getMethods()) {
				// 通过getter或者getParameters方法将MethodConfig属性添加到map中,前缀是当前MethodConfig名称即<方法名.属性名,属性值>。
				AbstractConfig.appendParameters(map, method, method.getName());
				String retryKey = method.getName() + ".retry";
				//是否存在methodname.retry键,及是否配置了retry属性
				if (map.containsKey(retryKey)) {
					//存在则移除
					String retryValue = map.remove(retryKey);
					//如果retry配置的false,设置retries配置为0
					if ("false".equals(retryValue)) {
						map.put(method.getName() + ".retries", "0");
					}
				}
				//获取方法参数配置ArgumentConfig,存放到map中
				List<ArgumentConfig> arguments = method.getArguments();
				if (CollectionUtils.isNotEmpty(arguments)) {
					for (ArgumentConfig argument : arguments) {
						// 判断参数类型是否为空
						if (argument.getType() != null && argument.getType().length() > 0) {
							//获取接口(导出服务)的所有方法,遍历
							Method[] methods = interfaceClass.getMethods();
							if (methods.length > 0) {
								for (int i = 0; i < methods.length; i++) {
									String methodName = methods[i].getName();
									// 接口方法名和方法配置名称相同
									if (methodName.equals(method.getName())) {
										//获取接口方法参数类型
										Class<?>[] argtypes = methods[i].getParameterTypes();
										// 参数索引不是-1,-1表示未设置
										if (argument.getIndex() != -1) {
											 // 检测 ArgumentConfig 中的 type 属性与方法参数列表中的参数名称是否一致
											if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
												//一致的则添加配置到map<方法名.参数索引,参数配置>
												AbstractConfig.appendParameters(map, argument,
														method.getName() + "." + argument.getIndex());
											} else {//不一致则抛出异常
												throw new IllegalArgumentException(
														"Argument config error : the index attribute and type attribute not match :index :"
																+ argument.getIndex() + ", type:" + argument.getType());
											}
										} else {//未设置参数索引
											// 遍历方法参数的所有类型
											for (int j = 0; j < argtypes.length; j++) {
												Class<?> argclazz = argtypes[j];
												//查找和当前参数配置类型匹配的参数
												if (argclazz.getName().equals(argument.getType())) {
													//匹配则将配置放入map中<方法名.参数索引,参数配置>
													AbstractConfig.appendParameters(map, argument,
															method.getName() + "." + j);
													//如果匹配到的参数类型设置了索引并且和当前索引不一致,抛出异常
													if (argument.getIndex() != -1 && argument.getIndex() != j) {
														throw new IllegalArgumentException(
																"Argument config error : the index attribute and type attribute not match :index :"
																		+ argument.getIndex() + ", type:"
																		+ argument.getType());
													}
												}
											}
										}
									}
								}
							}
						} else if (argument.getIndex() != -1) {
							//参数类型为空但是参数索引不是-1,添加配置到map中
							AbstractConfig.appendParameters(map, argument,
									method.getName() + "." + argument.getIndex());
						} else {
							//如果既没有配置参数索引又没哟配置参数类型则抛出异常
							throw new IllegalArgumentException(
									"Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
						}

					}
				}
			} 
		}
		
		if (ProtocolUtils.isGeneric(generic)) {
			//是泛化服务则设置<generic,generic值>、<methods,*>表示任意方法
			map.put(GENERIC_KEY, generic);
			map.put(METHODS_KEY, ANY_VALUE);
		} else {
			//不是泛化服务,获取修订版本号,放入map
			String revision = Version.getVersion(interfaceClass, version);
			if (revision != null && revision.length() > 0) {
				map.put(REVISION_KEY, revision);
			}
			//创建并返回包装类方法名称
			String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
			if (methods.length == 0) {
				//没有包装方法名则设置<methods,*>
				logger.warn("No method found in service interface " + interfaceClass.getName());
				map.put(METHODS_KEY, ANY_VALUE);
			} else {
				//如果有,则方面通过逗号分隔拼接放入map中,key=methods
				map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
			}
		}

		//没有token且提供者不为空,获取提供者的token
		if (ConfigUtils.isEmpty(token) && provider != null) {
			token = provider.getToken();
		}
		//如果token不为空
		if (!ConfigUtils.isEmpty(token)) {
			//如果token是默认值(true或者default),则创建UUID作为token
			if (ConfigUtils.isDefault(token)) {
				map.put(TOKEN_KEY, UUID.randomUUID().toString());
			} else {
				map.put(TOKEN_KEY, token);
			}
		}
		// init serviceMetadata attachments
		serviceMetadata.getAttachments().putAll(map);

    
    
    
    
    
    
    //开始服务导出,上面一堆东西,无非就是构建mao,然后将所有的参数放到map中,然后下面将map制作为dubbo协议的url
    
		// 127.0.0.1
        //找到配置的主机,优先级如下:environment variables -> java system properties -> host property in config file 
		// -> /etc/hosts -> default network address -> first available network address
		String host = findConfigedHosts(protocolConfig, registryURLs, map);
		// 20880
        // 找到配置的端口,优先级: environment variable -> java system properties ->
		// port property in protocol config file -> protocol default port
		Integer port = findConfigedPorts(protocolConfig, name, map);
		//拼接URL :dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=3931&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1604405352872&version=0.0.1
		URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

    
		// 如果存对应的url扩展工厂,则再接着进行处理url,则获取自定义扩展配置,我这里没有进入
		if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
			url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol())
					.getConfigurator(url).configure(url);
		}
        
        
		//获取导出范围,我这里获取到的是
		String scope = url.getParameter(SCOPE_KEY);
		// 如果导出单位不是字符串'none',则准备导出,我这里获取到的是null
		if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

			// 如果导出范围不是remote则执行本地导出逻辑
			if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
				exportLocal(url);
			}
      
      
      
      
			// 如果配置导出范围不是local则执行远程导出
			if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
				//注册中心地址不为空
				if (CollectionUtils.isNotEmpty(registryURLs)) {
					for (URL registryURL : registryURLs) {
						// 如果协议是injvm 则不执行导出注册逻辑
						if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
							continue;
						}
						//url配置dynamic参数
						url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
						//获取监控地址
						URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
						if (monitorUrl != null) {
							//监控地址不为空则添加到URL参数中key=monitor
							url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
						}
						if (logger.isInfoEnabled()) {
							if (url.getParameter(REGISTER_KEY, true)) {
								logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url
										+ " to registry " + registryURL);
							} else {
								logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
							}
						}

						// 获取自定义代理配置
						String proxy = url.getParameter(PROXY_KEY);
						if (StringUtils.isNotEmpty(proxy)) {
							//如果存在,则为注册地址添加代理实现参数
							registryURL = registryURL.addParameter(PROXY_KEY, proxy);
						}
						//为服务引用生成Invoker对象
						Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
								registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
						//生成提供者和配置包装Invoker
						DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker,
								this);
						//通过SPI自适应拓展获取Protocol的拓展实现,调用导出方法
						Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
						//添加到导出器缓存
						exporters.add(exporter);
					}
				} else {
					//注册中心地址为空,导出服务到配置地址
					if (logger.isInfoEnabled()) {
						logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
					}
					Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
					DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

					Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
					exporters.add(exporter);
				}
				/**
				 * @since 2.7.0 ServiceData Store
				 * 获取可写元数据服务,默认实现为本地
				 */
				WritableMetadataService metadataService = WritableMetadataService
						.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
				if (metadataService != null) {
					//发布服务定义
					metadataService.publishServiceDefinition(url);
				}
			}
		}
		//添加到服务引用url缓存中
		this.urls.add(url);
	}

image-20201109161047848

image-20201109161250293

3、导出核心逻辑

3.1、本地导出

url = dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=3931&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1604405352872&version=0.0.1

1、先通过SPI方式获取ProxyFactory 创建Invoker

2、通过SPI的方式获取Protocol实现导出。

Dubbo 官方文档中对 Invoker 进行了说明:Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。

Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory。下面我们到 JavassistProxyFactory 代码中,探索 Invoker 的创建过程。

private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();


//使用SPI自适应拓展机制,获取ProxyFactory和Protocol的实现。
//默认实现是DubboProtocol,这里是本地协议,也就是InjvmProtocol(根据Invoker才确认是InjvmProtocol的哦)
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
//代理工厂,这里的适配扩展是 JavassistProxyFactory
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();


/**
* 导出服务到本地(及jvm导出)
*/
private void exportLocal(URL url) {
    //组装导出地址,协议为injvm,
    // injvm://127.0.0.1/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4172&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1604407350304&version=0.0.1
    URL local = URLBuilder.from(url)
        .setProtocol(LOCAL_PROTOCOL)
        .setHost(LOCALHOST_VALUE)
        .setPort(0)
        .build();

    //使用ProxyFactory生成导出服务代理的实现,默认实现JavassistProxy
    //使用Protocol调用服务导出逻辑,默认实现是DubboProtocol,这里是本地协议,也就是InjvmProtocol
    Exporter<?> exporter = PROTOCOL.export(PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));

    //添加导出服务到缓存中
    exporters.add(exporter);
    logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}

image-20201103203135494

image-20201109175208925

image-20201109175555258

3.1.1、 Invoker 的创建

public interface Invoker<T> extends Node {

    /**
     * get service interface.
     *
     * @return service interface.
     */
    Class<T> getInterface();

    /**
     * invoke.
     *
     * @param invocation
     * @return result
     * @throws RpcException
     */
    Result invoke(Invocation invocation) throws RpcException;

}

AbstractProxyInvoker 包含一个 Proxy 实例,代理了具体的服务类。

Proxy 用于代理目标类,Proxy 是一个抽象类,仅可以通过 getProxy(ClassLoader, Class[]) 方法创建子类。可以通过 newInstance(InvocationHandler) 来创建代理实例。

3.1.1.1、JavassistProxyFactory #getInvoker

JavassistProxyFactory 的方法:#getInvoker创建了一个继承自 AbstractProxyInvoker 类的匿名对象,并覆写了抽象方法 #doInvoke。覆写后的 #doInvoke 逻辑比较简单,仅是将调用请求转发给了 Wrapper 类的 invokeMethod 方法。 ( invoke方法后面讲)

public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    	//为目标类创建包装类,无法解决类名包含$的情况,不能是代理类,所以 是type
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
      
      
        //创建了一个继承自 AbstractProxyInvoker 类的匿名类对象
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
              
            	// 调用 Wrapper 的 invokeMethod 方法(此方法实际上是上面 Wrapper.getWrapper生成的),invokeMethod 最终会调用目标方法
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

3.1.1.2、Wrapper #getWrapper

Wrapper 用于“包裹”目标类,Wrapper 是一个抽象类,仅可通过 getWrapper(Class) 方法创建子类。

在创建 Wrapper 子类的过程中,子类代码生成逻辑会对 getWrapper 方法传入的 Class 对象进行解析,拿到类方法,类成员变量等信息。以及生成 invokeMethod 方法代码和其他一些方法代码。 再通过反射获取wpper对象

public abstract class Wrapper {

    private static final Map<Class<?>, Wrapper> WRAPPER_MAP = new ConcurrentHashMap<Class<?>, Wrapper>(); //class wrapper map

    //^^^^^^

    public static Wrapper getWrapper(Class<?> c) {
        //因为无法代理动态类,因此是动态类时会循环找到不是动态类的父类
        while (ClassGenerator.isDynamicClass(c)) 
        {	//获取父类
            c = c.getSuperclass();
        }

        //如果代理Object类,则直接返回固定实现OBJECT_WRAPPER
        if (c == Object.class) {
            return OBJECT_WRAPPER;
        }

        //代理其他类则调用makeWrapper创建一个实例,并存入缓存中(如果不存在,则创建并返回)
        return WRAPPER_MAP.computeIfAbsent(c, key -> makeWrapper(key))



    }

在创建 Wrapper 子类的过程中,子类代码生成逻辑会对 makeWrapper方法传入的 Class 对象进行解析,拿到诸如类方法,类成员变量等信息。以及生成 invokeMethod 方法代码和其他一些方法代码。代码生成完毕后,再通过反射创建 Wrapper 实例。

private static Wrapper makeWrapper(Class<?> c) {
    // 判断是否是基本类型的包装类,如果是则抛出异常
    if (c.isPrimitive()) {
        throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
    }

    // 获取类的全限定名称(包名+类名)
    String name = c.getName();
    // 获取类加载器,优先级如下:线程上下文加载器 -> c的类加载器 -> 系统类加载器
    ClassLoader cl = ClassUtils.getClassLoader(c);
    // 存放setPropertyValue方法代码
    StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
    // 存放getPropertyValue方法代码
    StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
    // 存放invokeMethod方法代码
    StringBuilder c3 = new StringBuilder(
        "public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws "
        + InvocationTargetException.class.getName() + "{ ");


    // 生成类型转换代码及异常抛出代码
    c1.append(name).append(" w; try{ w = ((").append(name)
        .append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
    c2.append(name).append(" w; try{ w = ((").append(name)
        .append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
    c3.append(name).append(" w; try{ w = ((").append(name)
        .append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");

    // 存放属性<属性名,属性类型>
    Map<String, Class<?>> pts = new HashMap<>();
    // 存放方法<描述信息(可理解为方法签名), Method 实例>
    Map<String, Method> ms = new LinkedHashMap<>();
    // 存放所有的方法名
    List<String> mns = new ArrayList<>();
    // 所有在当前类中声明的方法名
    List<String> dmns = new ArrayList<>();

    // 获取 public访问级别的字段
    for (Field f : c.getFields()) {
        String fn = f.getName();
        Class<?> ft = f.getType();
        // 如果是static 或 transient 修饰的变量,则忽略
        if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) {
            continue;
        }
        // 生成条件判断及赋值语句,比如:
        // if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
        c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3"))
            .append("; return; }");
        // 生成条件判断及返回语句,比如:
        // if( $2.equals("name") ) { return ($w)w.name; }
        c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
        // 添加<属性名,属性类型>到集合中
        pts.put(fn, ft);
    }

    //获取所有public访问级别的方法
    Method[] methods = c.getMethods();
    // 检测是否包含在当前类中声明的方法
    boolean hasMethod = hasMethods(methods);
    if (hasMethod) {
        c3.append(" try{");
        for (Method m : methods) {
            // 忽略Object类声明的方法
            if (m.getDeclaringClass() == Object.class) {
                continue;
            }

            String mn = m.getName();
            // 生成方法名判断语句,比如:
            // if ( "setName".equals( $2 )
            c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
            //参数个数
            int len = m.getParameterTypes().length;
            // 生成“运行时传入的参数数量与方法参数列表长度”判断语句,比如:
            // && $3.length == 1
            c3.append(" && ").append(" $3.length == ").append(len);
            // 检测方法是否存在重载情况,条件为:方法对象不同 && 方法名相同
            boolean override = false;
            for (Method m2 : methods) {
                if (m != m2 && m.getName().equals(m2.getName())) {
                    override = true;
                    break;
                }
            }
            // 对重载方法进行处理。判断是统一个方法时除了方法名、参数列表长度相同外,还必须判断参数类型是否一致,如:
            // void setName(String name) 和  void setName(Integer name)
            if (override) {
                if (len > 0) {
                    for (int l = 0; l < len; l++) {
                        // 生成参数类型进行检测代码,比如:
                        // && $3[0].getName().equals("java.lang.String") 
                        c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
                            .append(m.getParameterTypes()[l].getName()).append("\")");
                    }
                }
            }
            // 添加 ) {,完成方法判断语句
            c3.append(" ) { ");
            // 上面的方法校验完成后的代码类似这样:
            // if ("setName".equals($2) && $3.length == 1 && $3[0].getName().equals("java.lang.String")) {

            // 根据返回值类型生成目标方法调用语句
            if (m.getReturnType() == Void.TYPE) {
                //返回值类型为空,则 w.setName((java.lang.String)$4[0]); return null;
                c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");")
                    .append(" return null;");
            } else {
                //返回值类型不为空,则  // return ($w)w.getName();
                c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4"))
                    .append(");");
            }
            // 添加 },
            c3.append(" }");
            /* 上面的方法校验完成后的代码类似这样:
		        *if ("setName".equals($2) && $3.length == 1 && $3[0].getName().equals("java.lang.String")) {
		        *   w.setName((java.lang.String)$4[0]); return null;
		        *  }
		        */

            ///添加方法名到集合中
            mns.add(mn);
            //如果是当前类声明的方法,则添加到声明方法集合中
            if (m.getDeclaringClass() == c) {
                dmns.add(mn);
            }
            //添加<方法描述,方法>到集合中
            ms.put(ReflectUtils.getDesc(m), m);
        }
        //添加异常捕获及抛出代码
        c3.append(" } catch(Throwable e) { ");
        c3.append("     throw new java.lang.reflect.InvocationTargetException(e); ");
        c3.append(" }");
    }
    //添加NoSuchMethodException异常代码
    c3.append(" throw new " + NoSuchMethodException.class.getName()
              + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");

    // 处理getter和setter方法
    Matcher matcher;
    for (Map.Entry<String, Method> entry : ms.entrySet()) {
        String md = entry.getKey();
        Method method = entry.getValue();
        //是否是get方法
        if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
            //获取属性名
            String pn = propertyName(matcher.group(1));
            // 生成属性判断以及返回语句,示例如下:
            // if( $2.equals("name") ) { return ($w).w.getName(); }
            c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName())
                .append("(); }");
            //存放<属性名,属性类型>到集合中
            pts.put(pn, method.getReturnType());
            //判断是否是is|has|can开头方法,生成代码逻辑同get方法
        } else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
            String pn = propertyName(matcher.group(1));
            c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName())
                .append("(); }");
            pts.put(pn, method.getReturnType());
            //判断是set开头方法
        } else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
            //获取参数(属性)类型
            Class<?> pt = method.getParameterTypes()[0];
            //获取属性名
            String pn = propertyName(matcher.group(1));
            //生成属性判断及set方法语句,示例如下:
            // if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
            c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(")
                .append(arg(pt, "$3")).append("); return; }");
            //存放<属性名,属性类型>到集合中
            pts.put(pn, pt);
        }
    }
    // 添加 抛出 NoSuchPropertyException 异常代码
    c1.append(" throw new " + NoSuchPropertyException.class.getName()
              + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
    c2.append(" throw new " + NoSuchPropertyException.class.getName()
              + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");

    // 生成class
    //包装类数量加1并获取,原子操作
    long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
    //根据类加载器创建类生成器实例
    ClassGenerator cc = ClassGenerator.newInstance(cl);
    // 生成类名:有公共修饰符则org.apache.dubbo.common.bytecode.Wrapper,否则当前包装类名+$sw+当前包装类数量
    cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
    // 设置父类
    cc.setSuperClass(Wrapper.class);
    // 设置默认构造函数
    cc.addDefaultConstructor();
    // 添加属性名称数组字段
    cc.addField("public static String[] pns;"); 
    // 属性即类类型字段
    cc.addField("public static " + Map.class.getName() + " pts;"); 
    // 添加方法名称集合属性
    cc.addField("public static String[] mns;");
    // 添加本类声明的方法名称集合字段
    cc.addField("public static String[] dmns;");
    // 添加类属性
    for (int i = 0, len = ms.size(); i < len; i++) {
        cc.addField("public static Class[] mts" + i + ";");
    }
    //添加属性名获取方法
    cc.addMethod("public String[] getPropertyNames(){ return pns; }");
    // 添加是否存在某个属性方法
    cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
    // 获取某个属性类型
    cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
    // 获取方法名集合
    cc.addMethod("public String[] getMethodNames(){ return mns; }");
    //获取声明方法属性
    cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
    // 添加setPropertyValue方法、getPropertyValue方法、invokeMethod方法代码
    cc.addMethod(c1.toString());
    cc.addMethod(c2.toString());
    cc.addMethod(c3.toString());

    try {
        Class<?> wc = cc.toClass();
        // 设置字段值
        wc.getField("pts").set(null, pts);
        wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
        wc.getField("mns").set(null, mns.toArray(new String[0]));
        wc.getField("dmns").set(null, dmns.toArray(new String[0]));
        int ix = 0;
        for (Method m : ms.values()) {
            wc.getField("mts" + ix++).set(null, m.getParameterTypes());
        }
        //创建Wrapper实例
        return (Wrapper) wc.newInstance();
    } catch (RuntimeException e) {
        throw e;
    } catch (Throwable e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        //清理缓存
        cc.release();
        ms.clear();
        mns.clear();
        dmns.clear();
    }
}

3.1.1.3、创建包装类的wapper

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.dubbo.common.bytecode;

import com.healerjean.proj.service.MailClientService;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import org.apache.dubbo.common.bytecode.ClassGenerator.DC;

public class Wrapper0 extends Wrapper implements DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    public Wrapper0() {
    }

    public boolean hasProperty(String var1) {
        return pts.containsKey(var1);
    }

    public Class getPropertyType(String var1) {
        return (Class)pts.get(var1);
    }
	
    //第一个是代理对象
    //第二个是是方法名
    //第三个是参数类型的数组(同一个名字,不同参数类型)
    //第四个是代理对象的参数
    public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
        MailClientService var5;
        try {
            var5 = (MailClientService)var1;
        } catch (Throwable var8) {
            throw new IllegalArgumentException(var8);
        }

        try {
            if ("sentMail".equals(var2) && var3.length == 1) {
                var5.sentMail((String)var4[0]);
                return null;
            }
        } catch (Throwable var9) {
            throw new InvocationTargetException(var9);
        }

        throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class com.healerjean.proj.service.MailClientService.");
    }

    public String[] getPropertyNames() {
        return pns;
    }

    public Object getPropertyValue(Object var1, String var2) {
        try {
            MailClientService var3 = (MailClientService)var1;
        } catch (Throwable var5) {
            throw new IllegalArgumentException(var5);
        }

        throw new NoSuchPropertyException("Not found property \"" + var2 + "\" field or setter method in class com.healerjean.proj.service.MailClientService.");
    }

    public void setPropertyValue(Object var1, String var2, Object var3) {
        try {
            MailClientService var4 = (MailClientService)var1;
        } catch (Throwable var6) {
            throw new IllegalArgumentException(var6);
        }

        throw new NoSuchPropertyException("Not found property \"" + var2 + "\" field or setter method in class com.healerjean.proj.service.MailClientService.");
    }

    public String[] getMethodNames() {
        return mns;
    }

    public String[] getDeclaredMethodNames() {
        return dmns;
    }
}

3.1.1.4、最终返回的AbstractProxyInvoker

有一个未实现的抽象方法doInvoke

public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
    Logger logger = LoggerFactory.getLogger(AbstractProxyInvoker.class);

    private final T proxy;

    private final Class<T> type;

    private final URL url;

    public AbstractProxyInvoker(T proxy, Class<T> type, URL url) {
        if (proxy == null) {
            throw new IllegalArgumentException("proxy == null");
        }
        if (type == null) {
            throw new IllegalArgumentException("interface == null");
        }
        if (!type.isInstance(proxy)) {
            throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type);
        }
        this.proxy = proxy;
        this.type = type;
        this.url = url;
    }

    @Override
    public Class<T> getInterface() {
        return type;
    }

    @Override
    public URL getUrl() {
        return url;
    }

    @Override
    public boolean isAvailable() {
        return true;
    }

    @Override
    public void destroy() {
    }

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        try {
            Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
			CompletableFuture<Object> future = wrapWithFuture(value);
            CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
                AppResponse result = new AppResponse();
                if (t != null) {
                    if (t instanceof CompletionException) {
                        result.setException(t.getCause());
                    } else {
                        result.setException(t);
                    }
                } else {
                    result.setValue(obj);
                }
                return result;
            });
            return new AsyncRpcResult(appResponseFuture, invocation);
        } catch (InvocationTargetException e) {
            if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
                logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            }
            return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

	private CompletableFuture<Object> wrapWithFuture(Object value) {
        if (RpcContext.getContext().isAsyncStarted()) {
            return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture();
        } else if (value instanceof CompletableFuture) {
            return (CompletableFuture<Object>) value;
        }
        return CompletableFuture.completedFuture(value);
    }

    protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;

    @Override
    public String toString() {
        return getInterface() + " -> " + (getUrl() == null ? " " : getUrl().toString());
    }


}

3.1..2、本地协议InjvmProtocol 导出

在获取Invoker之后,就会调用具体的Protocol实现类导出方法进行导出,首先是导出到本地,使用的是InjvmProtocol协议,导出逻辑很简单,源码如下:

//代码有省略
public class InjvmProtocol extends AbstractProtocol implements Protocol {

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    	//创建一个InjvmExporter实例,InjvmExporter只是将道歉Invoker等缓存到本地
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }
    
}




class InjvmExporter<T> extends AbstractExporter<T> {

    private final String key;

    private final Map<String, Exporter<?>> exporterMap;

    InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
        super(invoker);
        this.key = key;
        this.exporterMap = exporterMap;
        exporterMap.put(key, this);
    }

    @Override
    public void unexport() {
        super.unexport();
        exporterMap.remove(key);
    }

}

//至此本地导出就基本完了

3.2、远程协议导出

与导出服务到本地相比,导出服务到远程的过程要复杂不少,其包含了服务导出与服务注册两个过程。这两个过程涉及到了大量的调用,比较复杂。按照代码执行顺序,本节先来分析服务导出逻辑,服务注册逻辑将在下一节进行分析。首先分析RegistryProtocol export 方法。

		// 导出到远程的部分源码
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
	//如果存在,则为注册地址添加代理实现参数
	registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}

//为服务引用生成Invoker对象(和上面的本地导出代码差不多,这里变成了注册协议(将dubbo协议的地址作为export参数放入)
//registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-provider&dubbo=2.0.2&export=dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1604408770580&version=0.0.1&file=/Users/healerjean/Desktop/logs/hljServerProvider2001&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&registry=zookeeper&release=2.7.7&timestamp=1604408770574    
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

//生成提供者和配置包装Invoker(又多包装了一层)
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker,this);

//通过SPI自适应拓展获取Protocol的拓展实现,调用导出方法、这里使用的注册协议RegistryProtocol,还记得上面使用的是InJvmProtocol协议吧
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);

//添加到导出器缓存
exporters.add(exporter);
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
    //zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-provider&dubbo=2.0.2&export=dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1604408770580&version=0.0.1&file=/Users/healerjean/Desktop/logs/hljServerProvider2001&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&timestamp=1604408770574  

    URL registryUrl = getRegistryUrl(originInvoker);

    // 服务提供中地址(本地导出服务地址)
    // dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1604408770580&version=0.0.1
    URL providerUrl = getProviderUrl(originInvoker);

    // 获取订阅覆盖地址
    //provider://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&category=configurators&check=false&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1604408770580&version=0.0.1
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);


    //创建覆盖监听器,存入缓存
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);


    //使用配置覆盖提供中者地址
   //dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=13008&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1605001242523&version=0.0.1
    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);

    //导出服务,
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
    final Registry registry = getRegistry(originInvoker);
    
    // 获取已注册的服务提供者 URL
    final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
    
    // 获取 register 参数,默认为true
    boolean register = providerUrl.getParameter(REGISTER_KEY, true);

    if (register) {
        //为true则表示需要注册服务,向注册中心注册
        register(registryUrl, registeredProviderUrl);
    }
    

    // 不建议使用!订阅2.6.x或之前版本中的重写规则。
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    // 设置注册和订阅地址
    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);
    //通知RegistryProtocolListener,服务导出
    notifyExport(exporter);
    //创建DestroyableExporter实例,确保每次导出时都返回一个新的导出器实例
    return new DestroyableExporter<>(exporter);
}





protected URL getRegistryUrl(Invoker<?> originInvoker) {
    URL registryUrl = originInvoker.getUrl();
    if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
        String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
        registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);
    }
    return registryUrl;
}

private URL getProviderUrl(final Invoker<?> originInvoker) {
    String export = originInvoker.getUrl().getParameterAndDecoded(EXPORT_KEY);
    if (export == null || export.length() == 0) {
        throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl());
    }
    return URL.valueOf(export);
}

  private URL getSubscribedOverrideUrl(URL registeredProviderUrl) {
        return registeredProviderUrl.setProtocol(PROVIDER_PROTOCOL)
                .addParameters(CATEGORY_KEY, CONFIGURATORS_CATEGORY, CHECK_KEY, String.valueOf(false));
    }

3.2.1、#doLocalExport

接着上面的源码分析导出服务方法doLocalExport的源码。

//providerurl <--> exporter
private final ConcurrentMap<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<>();

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    //根据Invoker参数获取缓存key
    //dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=13008&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1605001242523&version=0.0.1
    String key = getCacheKey(originInvoker);

    //调用computeIfAbsent方法,将key和ExporterChangeableWrapper实例放入缓存

    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        // 创建InvokerDelegate委托类实例
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);

        // 创建ExporterChangeableWrapper实例,调用 DubboProtocol 的 export 方法导出服务
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    });
}

image-20201103231614913

3.2.2、DubboProtocol #export

使用默认协议dubbo作为示例,分析导出逻辑。此处的 protocol 变量会在运行时加载 DubboProtocol,并调用 DubboProtocol 的 export 方法。

protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();


@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // 获取URL dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4494&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1604416434630&version=0.0.1
    URL url = invoker.getUrl();

    // 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:
    // 163/com.healerjean.proj.service.MailClientService:0.0.1:20880
    String key = serviceKey(url);

    // 创建DubboExporter实例并放入缓存(其实到了这里基本上就算完事了,但是我们最重要的是启动dubbo服务)
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    // 导出本地存根服务以发送事件
    Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY)
                                                      + "], has set stubproxy support event ,but no stub methods founded."));
            }

        }
    }

    // 启动服务器
    openServer(url);

    // 优化序列化
    optimizeSerialization(url);

    return exporter;
}




image-20201110180237572

3.2.3、#openServer

DubboExporterexport方法主要调用openServer方法,而openServer逻辑也很简单,获取参数isserver,选择执行createServer 还是server的重置方法。

createServer 包含三个核心的逻辑。

第一是检测是否存在urlserver 参数所代表的 Transporter 拓展(默认是netty),不存在则抛出异常。

第二是创建服务器实例。

第三是检测是否支持urlclient 参数所表示的 Transporter 拓展,不存在也是抛出异常。

第三是看看Exchangers是如何创建出ExchangeServer服务的。首先调用org.apache.dubbo.remoting.exchange.Exchangers.bind(URL, ExchangeHandler)方法

//dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=19748&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1605004230369&version=0.0.1
private void openServer(URL url) {
    // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
    //10.236.122.217:20880 
    String key = url.getAddress();

    // 在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。若某个端口上已有服务器实例,此时则调用 reset 方法重置服务器的一些配置。
    // 客户端可以导出只供服务器调用的服务,默认为true
    boolean isServer = url.getParameter(IS_SERVER_KEY, true);
    if (isServer) {
        // 双重检测创建服务(保证只创建一次)
        ProtocolServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));
                }
            }
        } else {
            // 服务器已创建,则根据 url 中的配置重置服务器
            server.reset(url);
        }
    }
}


private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {


    private ProtocolServer createServer(URL url) {
        //第一步:检测是否存在`url`中 `server` 参数所代表的 `Transporter `拓展(默认是`netty`),不存在则抛出异常。

        url = URLBuilder.from(url)
            // 服务器关闭时发送只读事件,默认情况下启用
            .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
            // 添加默认心跳检测参数60 * 1000
            .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
            // 添加编码解码器参数
            .addParameter(CODEC_KEY, DubboCodec.NAME).build();

        //dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&channel.readonly.sent=true&codec=dubbo&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&heartbeat=60000&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=20112&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1605004964820&version=0.0.1

        // 获取 server 参数,默认为 netty
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

        // 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
        if (str != null && str.length() > 0
            && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        //第二步:创建服务器实例
        ExchangeServer server;
        try {// 创建 ExchangeServer
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        //第三步:检测是否支持 `client` 参数所表示的` Transporter` 拓展,不存在也是抛出异常。 
        // 获取 client 参数,可指定 netty,mina。我这里默认进来是null。直接返回DubboProtocolServe服务
        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            // 通过SPI获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            // 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中, 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }



        // 创建DubboProtocolServer实例并返回
        return new DubboProtocolServer(server);
    }
@SPI("netty")
public interface Transporter {

    /**
     * Bind a server.
     *
     * @param url     server url
     * @param handler
     * @return server
     * @throws RemotingException
     * @see org.apache.dubbo.remoting.Transporters#bind(URL, ChannelHandler...)
     */
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;

    /**
     * Connect to a server.
     *
     * @param url     server url
     * @param handler
     * @return client
     * @throws RemotingException
     * @see org.apache.dubbo.remoting.Transporters#connect(URL, ChannelHandler...)
     */
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

3.2.3.1、Exchangers.bind

接下来就主要看看Exchangers是如何创建出ExchangeServer服务的

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    //dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&channel.readonly.sent=true&codec=dubbo&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&heartbeat=60000&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=19940&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1605006351222&version=0.0.1
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }

    //添加URL参数codec=exchange 
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");

    // 获取 Exchanger,默认为 HeaderExchanger。
    // 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
    return getExchanger(url).bind(url, handler);
}


public static Exchanger getExchanger(URL url) {
    String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
    return getExchanger(type);
}

public static Exchanger getExchanger(String type) {
    return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}

然后是org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger.bind(URL, ExchangeHandler)方法

  @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    	// 创建 HeaderExchangeServer 实例:
    	//   1. new HeaderExchangeHandler(handler)
    	//	 2. new DecodeHandler(new HeaderExchangeHandler(handler))
    	//   3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

3.2.3.2、Transporters.bind

该方法中的主要逻辑也就是调用org.apache.dubbo.remoting.Transporters.bind(URL, ChannelHandler…)

public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    // 获取自适应 Transporter
    // 实例(ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();),并调用bind实例方法
    return getTransporter().bind(url, handler);
}




//获取默认的NettyTransporter
public static Transporter getTransporter() {
    return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

默认的TransporterNettyTransporter,不过是注意是netty4.x,dubbo也提供了3.x的实现。

/**
 * Default extension of {@link Transporter} using netty4.x.
 */
public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyServer(url, handler);
    }

    @Override
    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyClient(url, handler);
    }

}

上面的bind方法就一句创建 NettyServer实例代码

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
  //调用父类构造函数
  // 添加线程名称参数,可以在CommonConstants中按thread_name_KEY和THREADPOOL_KEY自定义客户机线程池的名称和类型,默认名称DubboServerHandler
  // 对handler进行包装MultiMessageHandler->HeartbeatHandler->handler
  super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}

紧接跟踪父类构造函数:

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    、、dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&channel.readonly.sent=true&codec=dubbo&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&heartbeat=60000&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=19940&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&threadname=DubboServerHandler-10.236.122.217:20880&timestamp=1605006351222&version=0.0.1
    // 调用父类构造方法(简单判断及解码器、超时时间等属性的获取赋值)
    super(url, handler);

    // 获取套接字地址
    localAddress = getUrl().toInetSocketAddress();

    //获取 ip 和端口
    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());

    if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        // anyhost=true或者是无效的本地主机则设置ip为0.0.0.0
        bindIp = ANYHOST_VALUE;
    }

    // 创建绑定套接字地址
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    // 获取最大可接受连接数,默认0
    this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
    // 获取超时时间 默认600000毫秒
    this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
    try {
        // 打开服务
        doOpen();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export "
                        + getLocalAddress());
        }
    } catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    // 创建执行器
    executor = executorRepository.createExecutorIfAbsent(url);
}

有上面代码分析可知,需要在回到子类查看,doOpen方法的具体试下:

protected void doOpen() throws Throwable {
    // 创建服务启动器
    bootstrap = new ServerBootstrap();
    // 创建 boss 和 worker 事件组
    bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
    workerGroup = NettyEventLoopFactory.eventLoopGroup(
        getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), "NettyServerWorker");
    // 创建NettyServer处理类实例
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();
    // 设置boss和worker事件组
    bootstrap.group(bossGroup, workerGroup)
        // 设置SocketChannel
        .channel(NettyEventLoopFactory.serverSocketChannelClass())
        // SO_REUSEADDR是让端口释放后立即就可以被再次使用。
        .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
        // TCP_NODELAY选项是用来控制是否开启Nagle算法,该算法是为了提高较慢的广域网传输效率
        .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
        // 创建一个 池化或非池化的缓存区分配器
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        // 添加孩子渠道初始化处理实例
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 初始化通道时进行管道pipeline编码解码器等设置
                int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                    ch.pipeline().addLast("negotiation",
                                          SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                }
                ch.pipeline().addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder())
                    .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                    .addLast("handler", nettyServerHandler);
            }
        });
    // 绑定到指定的 ip 和端口上
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    // 不间断同步
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();

}

3.3、远程协议注册

在远程服务的源码分析开始,我看见导出到远程的服务除了执行导出逻辑,需要执行服务的注册即org.apache.dubbo.registry.integration.RegistryProtocol.register(URL, URL)方法

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
    //zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hlj-server-provider&dubbo=2.0.2&export=dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1604408770580&version=0.0.1&file=/Users/healerjean/Desktop/logs/hljServerProvider2001&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&timestamp=1604408770574  

    URL registryUrl = getRegistryUrl(originInvoker);

    // 服务提供中地址(本地导出服务地址)
    // dubbo://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1604408770580&version=0.0.1
    URL providerUrl = getProviderUrl(originInvoker);

    // 获取订阅覆盖地址
    //provider://127.0.0.1:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=127.0.0.1&bind.port=20880&category=configurators&check=false&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=4248&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1604408770580&version=0.0.1
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);


    //创建覆盖监听器,存入缓存
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);


    //使用配置覆盖提供中者地址
   //dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&bind.ip=10.236.122.217&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=13008&qos.accept.foreign.ip=false&qos.enable=true&qos.port=40880&release=2.7.7&revision=0.0.1&side=provider&timestamp=1605001242523&version=0.0.1
    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);

    //导出服务,
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // 根据 URL 加载 Registry 实现类
    final Registry registry = getRegistry(originInvoker);
    
    // 获取已注册的服务提供者 URL
    //dubbo://10.236.122.217:20880/com.healerjean.proj.service.MailClientService?anyhost=true&application=hlj-server-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=163&interface=com.healerjean.proj.service.MailClientService&methods=sentMail&pid=19940&release=2.7.7&revision=0.0.1&side=provider&timestamp=1605006351222&version=0.0.1
    final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
    
    // 获取 register 参数,默认为true
    boolean register = providerUrl.getParameter(REGISTER_KEY, true);

    if (register) {
        //为true则表示需要注册服务,向注册中心注册
        register(registryUrl, registeredProviderUrl);
    }
    

    // 不建议使用!订阅2.6.x或之前版本中的重写规则。
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    // 设置注册和订阅地址
    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);
    //通知RegistryProtocolListener,服务导出
    notifyExport(exporter);
    //创建DestroyableExporter实例,确保每次导出时都返回一个新的导出器实例
    return new DestroyableExporter<>(exporter);
}

3.3.1、RegistryProtocol.#register

public void register(URL registryUrl, URL registeredProviderUrl) {
    //获取注册Registry实例
    Registry registry = registryFactory.getRegistry(registryUrl);
    //执行注册
    registry.register(registeredProviderUrl);
}

getRegistry方法由org.apache.dubbo.registry.support.AbstractRegistryFactory实现,具体逻辑如下:

public Registry getRegistry(URL url) {
    	//AtomicBoolean destroyed,已经执行过销毁则抛出异常
        if (destroyed.get()) {
            LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                    "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
            return DEFAULT_NOP_REGISTRY;
        }
  
        // 组装URL(路径、接口等参数)
        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
  
        //创建注册缓存key = zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService
        String key = createRegistryCacheKey(url);
  
        // 锁定注册表访问进程以确保注册表的单个实例
        LOCK.lock();
        try {
        	//从缓存中获取,获取到则直接返回
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
          
            //缓存中不存在则通过SPI反方式创建,创建失败抛出异常
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            //放入缓存并返回
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // 释放锁
            LOCK.unlock();
        }
    }

image-20201110194524012

根据上面源码分析可知,getRegistry 方法先访问缓存,缓存未命中则调用 createRegistry 创建 Registry,然后写入缓存。这里的 createRegistry 由具体的子类实现,这里以ZookeeperRegistryFactory 为例解析。

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

	private ZookeeperTransporter zookeeperTransporter;

	/**
	 * zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptive
	 */
	public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
		this.zookeeperTransporter = zookeeperTransporter;
	}

	@Override
	public Registry createRegistry(URL url) {
		// 创建 ZookeeperRegistry
		return new ZookeeperRegistry(url, zookeeperTransporter);
	}

}

ZookeeperRegistryFactory的实现很简单就一句代码,直接创建ZookeeperRegistry实例。因此下面将分析ZookeeperRegistry的构造方法源码。

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    	//调用父类构造(获取一些例如重试的时间)
        super(url);
        //如果anyhost=true或者主机地址是0.0.0.0,则抛出异常
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // 获取组名,默认为 dubbo
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        //不是/开头,则加上
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        //设置路径为group
        this.root = group;
        // 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
        zkClient = zookeeperTransporter.connect(url);
        // 添加状态监听器
        zkClient.addStateListener((state) -> {
            if (state == StateListener.RECONNECTED) {//重新连接状态
                logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                        " Since ephemeral ZNode will not get deleted for a connection lose, " +
                        "there's no need to re-register url of this instance.");
                //当zookeeper连接从连接丢失中恢复时,它需要获取最新的提供程序列表。重新注册观察者只是一个副作用,不是强制性的。
                ZookeeperRegistry.this.fetchLatestAddresses();
            } else if (state == StateListener.NEW_SESSION_CREATED) {//新会话创建
                logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
                try {
                	//尝试重新注册URL并将此实例的侦听器重新订阅到注册表
                    ZookeeperRegistry.this.recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            } else if (state == StateListener.SESSION_LOST) {
                logger.warn("Url of this instance will be deleted from registry soon. " +
                        "Dubbo client will try to re-register once a new session is created.");
            } else if (state == StateListener.SUSPENDED) {

            } else if (state == StateListener.CONNECTED) {

            }
        });
    }

上面的源码分析可知,zkClient是通过zookeeperTransporter的connect方法创建的,这里的 zookeeperTransporter 类型为自适应拓展类,默认为 CuratorZookeeperTransporter。因此下面我们分析下 CuratorZookeeperTransporter的源码。

@SPI("curator")
public interface ZookeeperTransporter {

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);

}
public class CuratorZookeeperTransporter extends AbstractZookeeperTransporter {
    @Override
    public ZookeeperClient createZookeeperClient(URL url) {
    	//创建CuratorZookeeperClient实例
        return new CuratorZookeeperClient(url);
    }
}

可以看到CuratorZookeeperTransporter逻辑很简单,直接创建了CuratorZookeeperClient实例。

	public CuratorZookeeperClient(URL url) {
		// 调用父类构造,对url属性赋值
		super(url);
		try {
			// 获取超时时间,默认5秒
			int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
			// 获取会话过期时间,默认1分钟
			int sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS);
			// 创建 CuratorFramework 构造器
			CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
					.connectString(url.getBackupAddress()).retryPolicy(new RetryNTimes(1, 1000))
					.connectionTimeoutMs(timeout).sessionTimeoutMs(sessionExpireMs);
			// 获取权限信息
			String authority = url.getAuthority();
			if (authority != null && authority.length() > 0) {
				builder = builder.authorization("digest", authority.getBytes());
			}
			// 构建 CuratorFramework 实例
			client = builder.build();
			// 添加CuratorConnectionStateListener监听器
			client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
			// 启动客户端
			client.start();
			// 阻塞连接,直到连接成功或者超时
			boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
			if (!connected) {
				// 连接失败抛出异常
				throw new IllegalStateException("zookeeper not connected");
			}
		} catch (Exception e) {
			throw new IllegalStateException(e.getMessage(), e);
		}
	}

3.3.2、FailbackRegistry #register

上面已经分析了获取·Registry·实例的源码。接下来任然以·zookeeper·为例分析注册方法register的源码,这个方法定义在 FailbackRegistry 抽象类中。

public class ZookeeperRegistry extends FailbackRegistry {


}
 public void register(URL url) {
    	//不接受此协议类型的服务,直接返回
        if (!acceptable(url)) {
            logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
            return;
        }
        //调用父类方法,添加URL到registered缓存中
        super.register(url);
        //删除url失败注册任务缓存
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // 执行注册,子类实现
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

           // 获取 check 参数,若 check = true 将会直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // 将失败的注册请求记录到失败列表,定期重试
            addFailedRegistered(url);
        }
    }

上面源码分析可知,执行服务注册方法需要子类实现,这里分析下zookeeper的实现org.apache.dubbo.registry.zookeeper.ZookeeperRegistry.doRegister(URL)

    @Override
    public void doRegister(URL url) {
        try {
        	 // 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
            //   /${group}/${serviceInterface}/providers/${url}
        	// 是否是临时节点由参数dynamic决定,默认为true
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }


toUrlPath(url) = /dubbo/com.healerjean.proj.service.MailClientService/providers/dubbo%3A%2F%2F10.236.122.217%3A20880%2Fcom.healerjean.proj.service.MailClientService%3Fanyhost%3Dtrue%26application%3Dhlj-server-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26group%3D163%26interface%3Dcom.healerjean.proj.service.MailClientService%26methods%3DsentMail%26pid%3D14248%26release%3D2.7.7%26revision%3D0.0.1%26side%3Dprovider%26timestamp%3D1605009104328%26version%3D0.0.1  

ZookeeperRegistry doRegister 中调用了 Zookeeper 客户端创建服务节点。节点路径由 toUrlPath 方法生成,org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient.create(String, boolean)方法源码如下

private final Set<String>  persistentExistNodePath = new ConcurrentHashSet<>();


@Override
public void create(String path, boolean ephemeral) {
    // 不是临时节点
    if (!ephemeral) {
        // 检测持久节点缓存是否存在该节点,若存在则返回
        if (persistentExistNodePath.contains(path)) {
            return;
        }
        // zookeeper是否存在该节点,若存在则放入缓存并返回
        if (checkExists(path)) {
            persistentExistNodePath.add(path);
            return;
        }
    }
    int i = path.lastIndexOf('/');
    if (i > 0) {
        // 递归创建
        create(path.substring(0, i), false);
    }
    if (ephemeral) {
        // 创建临时节点
        createEphemeral(path);
    } else {
        // 创建永久节点并放入缓存
        createPersistent(path);
        persistentExistNodePath.add(path);
    }
}

上面方法会根据节点类型调用创建临时节点和永久节点方法,由子类实现,主要看org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperClient的实现。

	@Override
	public void createPersistent(String path) {
		try {
			client.create().forPath(path);
		} catch (NodeExistsException e) {
			logger.warn("ZNode " + path + " already exists.", e);
		} catch (Exception e) {
			throw new IllegalStateException(e.getMessage(), e);
		}
	}

	@Override
	public void createEphemeral(String path) {
		try {
			client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
		} catch (NodeExistsException e) {
			logger.warn("ZNode " + path
					+ " already exists, since we will only try to recreate a node on a session expiration"
					+ ", this duplication might be caused by a delete delay from the zk server, which means the old expired session"
					+ " may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, "
					+ "we can just try to delete and create again.", e);
			deletePath(path);
			createEphemeral(path);
		} catch (Exception e) {
			throw new IllegalStateException(e.getMessage(), e);
		}
	}

ContactAuthor