前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

一、Nacos简介

阿里开源项目:https://github.com/alibaba/nacos

Nacos 主要提供两种服务

一是配置中心,支持配置注册、变更下发、层级管理等,意义是不停机就可以动态刷新服务内部的配置项;

二是作为命名服务,提供服务的注册和发现功能,通常用于在 RPC 框架的 ClientServer 中间充当媒介,还附带有健康监测、负载均衡等功能。

2、基本操作

获取配置:Nacos Config Server 中读取配置。

监听配置:订阅感兴趣的配置,当配置发生变化的时候可以收到一个事件。

发布配置:将配置保存到 Nacos Config Server 中。

删除配置:删除配置中心的指定配置。

3、长短轮询

1)短轮询

短轮询也是拉模式。是指不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。如果配置中心使用这样的方式,会存在以下问题:

1、由于配置数据并不会频繁变更,若是一直发请求,势必会对服务端造成很大压力

2、造成推送数据的延迟,比如:每 10s 请求一次配置,如果在第11s时配置更新了,那么推送将会延迟9s,等待下一次请求;

无法在推送延迟和服务端压力两者之间中和。降低轮询的间隔,延迟降低,压力增加;增加轮询的间隔,压力降低,延迟增高。

2)长轮询

长轮询为了解决短轮询存在的问题,客户端发起长轮询,如果服务端的数据没有发生变更,会 hold 住请求,直到服务端的数据发生变化,或者等待一定时间超时才会返回。返回后,客户端再发起下一次长轮询请求监听。 这样设计的好处:

1、相对于低延时,客户端发起长轮询,服务端感知到数据发生变更后,能立刻返回响应给客户端。

2、服务端的压力减小,客户端发起长轮询,如果数据没有发生变更,服务端会 hold 住此次客户端的请求,hold 住请求的时间一般会设置到 30s 或者 60s ,并且服务端 hold 住请求不会消耗太多服务端的资源。

image-20240327210046316

a、配置中心厂轮询:

image-20240327210208926

b、实现基本原理

1、首先客户端发送一个 HTTP 请求到服务端;服务端会开启一个异步线程,如果一直没有数据变更会挂起当前请求(一个 Tomcat 也就 200 个线程,长轮询也不应该阻塞 Tomcat 的业务线程,所以需要配置中心在实现长轮询时往往采用异步响应的方式来实现,而比较方便实现异步 HTTP 的常见手段便是 Servlet3.0 提供的 AsyncContext 机制。)

2、在服务端设置的超时时间内仍然没有数据变更,那就返回客户端一个没有变更的标识,客户端继续发起请求;

3、在服务端设置的超时时间内有数据变更了,就返回客户端变更的内容;

客户端实现:

 @Slf4j
 public class ConfigClientWorker {
 
     private final CloseableHttpClient httpClient;
 
     private final ScheduledExecutorService executorService;
 
     public ConfigClientWorker(String url, String dataId) {
         this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
             Thread thread = new Thread(runnable);
             thread.setName("client.worker.executor-%d");
             thread.setDaemon(true);
             return thread;
         });
 
         // ① httpClient 客户端超时时间要大于长轮询约定的超时时间
         RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(40000).build();
         this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
 
         executorService.execute(new LongPollingRunnable(url, dataId));
     }
 
     class LongPollingRunnable implements Runnable {
 
         private final String url;
         private final String dataId;
 
         public LongPollingRunnable(String url, String dataId) {
             this.url = url;
             this.dataId = dataId;
         }
 
         @SneakyThrows
         @Override
         public void run() {
             String endpoint = url + "?dataId=" + dataId;
             log.info("endpoint: {}", endpoint);
             HttpGet request = new HttpGet(endpoint);
             CloseableHttpResponse response = httpClient.execute(request);
             switch (response.getStatusLine().getStatusCode()) {
                 case 200: {
                     BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity()
                             .getContent()));
                     StringBuilder result = new StringBuilder();
                     String line;
                     while ((line = rd.readLine()) != null) {
                         result.append(line);
                     }
                     response.close();
                     String configInfo = result.toString();
                     log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo);
                     break;
                 }
                 // ② 304 响应码标记配置未变更
                 case 304: {
                     log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, 
                              longPolling again", dataId);
                     break;
                 }
                 default: {
                     throw new RuntimeException("unExcepted HTTP status code");
                 }
             }
             executorService.execute(this);
         }
     }
 
     public static void main(String[] args) throws IOException {
 
         new ConfigClientWorker("http://127.0.0.1:8080/listener", "user");
         System.in.read();
     }
 }

服务端:

 @RestController
 @Slf4j
 @SpringBootApplication
 public class ConfigServer {
 
     @Data
     private static class AsyncTask {
         // 长轮询请求的上下文,包含请求和响应体
         private AsyncContext asyncContext;
         // 超时标记
         private boolean timeout;
 
         public AsyncTask(AsyncContext asyncContext, boolean timeout) {
             this.asyncContext = asyncContext;
             this.timeout = timeout;
         }
     }
 
     // guava 提供的多值 Map,一个 key 可以对应多个 value
     private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(
       HashMultimap.create());
 
     private ThreadFactory threadFactory = new ThreadFactoryBuilder()
       											.setNameFormat("longPolling-timeout-checker-%d")
             								.build();
     private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);
 
   
   
   
   
     // 配置监听接入点
     @RequestMapping("/listener")
     public void addListener(HttpServletRequest request, HttpServletResponse response) {
         String dataId = request.getParameter("dataId");
 
         // 开启异步!!!
         AsyncContext asyncContext = request.startAsync(request, response);
         AsyncTask asyncTask = new AsyncTask(asyncContext, true);
 
         // 维护 dataId 和异步请求上下文的关联
         dataIdContext.put(dataId, asyncTask);
 
         // 启动定时器,30s 后写入 304 响应
         timeoutChecker.schedule(() -> {
             if (asyncTask.isTimeout()) {
                 dataIdContext.remove(dataId, asyncTask);
                 response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
               // 标志此次异步线程完成结束!!!
                 asyncContext.complete();
             }
         }, 30000, TimeUnit.MILLISECONDS);
     }
 
     // 配置发布接入点
     @RequestMapping("/publishConfig")
     @SneakyThrows
     public String publishConfig(String dataId, String configInfo) {
         log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo);
         Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId);
         for (AsyncTask asyncTask : asyncTasks) {
             asyncTask.setTimeout(false);
             HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
             response.setStatus(HttpServletResponse.SC_OK);
             response.getWriter().println(configInfo);
             asyncTask.getAsyncContext().complete();
         }
         return "success";
     }
 
     public static void main(String[] args) {
         SpringApplication.run(ConfigServer.class, args);
     }
 }
 

二、配置中心的架构

配置中心最基础的功能就是存储一个键值对

1、用户发布一个配置(configKey),然后客户端获取这个配置项(configValue);

2、当某个配置项发生变更时,将变更告知客户端刷新旧值。

1、CRUD

⬤ 对于服务端来说:需要考虑的是配置如何存储,是否需要持久化。

⬤ 对于客户端来说:需要考虑的是通过接口从服务器查询得到相应的数据然后返回。

2、配置的动态监听

1)PULL 模式

PULL 模式:表示客户端从服务端主动拉取数据.。

Pull 模式下,客户端需要定时从服务端拉取一次数据,由于定时带来的时间间隔,因此不能保证数据的实时性并且在服务端配置长时间不更新的情况下,客户端的定时任务会做一些无效的 Pull 操作。

2)PUSH 模式

Push 模式:服务端主动把数据推送到客户端。

Push 模式下,服务端需要维持与客户端的长连接如果客户端的数量比较多,那么服务端需要耗费大量的内存资源来保存每个资源,并且为了检测连接的有效性,还需要心跳机制来维持每个连接的状态。

3)Nacos-Pull 模式

Nacos 采用的是 Pull 模式(Kafka 也是如此),并且采用了一种长轮询机制。客户端采用长轮询的方式定时的发起 Pull 请求,去检查服务端配置信息是否发生了变更,如果发生了变更,那么客户端会根据变更的数据获得最新的配置。

a、大概流程

1、如果客户端发起 Pull 请求后(带着配置和配置值的 MD5 值和后台对比),发现服务端的配置和客户端的配置是保持一致的,那么服务端会“ Hold”住这个请求。(服务端拿到这个连接后在指定的时间段内不会返回结果,直到这段时间内的配置发生变化

2、一旦配置发生了变化,服务端会把原来“ Hold ”住的请求进行返回。

image-20240327200443382

b、流程图

Nacos 服务端收到请求后,会检查配置是否发生了变更,如果没有,那么设置一个定时任务,延期 29.5秒执行。同时并且把当前的客户端长轮询连接加入到 allSubs 队列。 这时候有两种方式触发该连接结果的返回:

第一种:等待 29.5 秒(长连接保持的时间)后触发自动检查机制,这时候不管配置有无发生变化,都会把结果返回给客户端。

第二种:在 29.5 秒内的任意一个时刻,通过 Nacos 控制台或者 API 的方式对配置进行了修改,那么触发一个事件机制,监听到该事件的任务会遍历 allSubs队列,找到发生变更的配置项对应的 ClientLongPolling 任务,将变更的数据通过该任务中的连接进行返回,即完成了一次推送操作。

image-20240327201022936

c、客户端原理

实时变更:

1、当 SpringBoot 项目启动的时候,会执行”准备上下文“的这么一个事情。此时 NacosContextRefresher 会监听到这个事件,**并且注册一个负责监听配置变更回调的监听器 **registerNacosListener

2、registerNacosListener 一旦收到配置变更的回调,则发布一个RefreshEvent事件对应的 RefreshEventListener 监听器检测到该事件后,将调用refresh.refresh()方法来完成配置的更新。

d、服务端原理

1、首先 ConfigController 下有一个监听器相关的接口,是客户端发起数据监听的接口,主要做两件事:获取客户端需要监听的可能发生变化的配置,并计算其MD5值。执行长轮询的请求。

2、将长轮询请求封装成 ClientLongPolling,交给线程池去执行。执行过程中,可以理解为一个配置为一个长轮询请求,也就对应一个 ClientLongPolling ,将其放在一个队列 allSub s当中,并且任务总共有29.5秒的等待时间。

3、如果某一个配置发生改变,会调用 LongPollingService的onEvent方法来通知客户端服务端的数据已经发生了变更。这里所谓的通知也就是从队列中找到变更配置对应的 ClientLongPolling 对象,将发生变更的 groupKey 通过该 ClientLongPolling 写入到响应对象 response 中。

4、那么客户端收到了 respones 后,自然可以得到更改配置的 groupKey,然后去 Nacos上查询即可。

ContactAuthor