Nacos配置中心原理
前言
Github:https://github.com/HealerJean
一、Nacos
简介
阿里开源项目:https://github.com/alibaba/nacos
Nacos
主要提供两种服务一是配置中心,支持配置注册、变更下发、层级管理等,意义是不停机就可以动态刷新服务内部的配置项;
二是作为命名服务,提供服务的注册和发现功能,通常用于在
RPC
框架的Client
和Server
中间充当媒介,还附带有健康监测、负载均衡等功能。
2、基本操作
获取配置:从Nacos
Config
Server
中读取配置。
监听配置:订阅感兴趣的配置,当配置发生变化的时候可以收到一个事件。
发布配置:将配置保存到 Nacos
Config
Server
中。
删除配置:删除配置中心的指定配置。
3、长短轮询
1)短轮询
短轮询也是拉模式。是指不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。如果配置中心使用这样的方式,会存在以下问题:
1、由于配置数据并不会频繁变更,若是一直发请求,势必会对服务端造成很大压力
2、造成推送数据的延迟,比如:每
10s
请求一次配置,如果在第11s时配置更新了,那么推送将会延迟9s,等待下一次请求;无法在推送延迟和服务端压力两者之间中和。降低轮询的间隔,延迟降低,压力增加;增加轮询的间隔,压力降低,延迟增高。
2)长轮询
长轮询为了解决短轮询存在的问题,客户端发起长轮询,如果服务端的数据没有发生变更,会
hold
住请求,直到服务端的数据发生变化,或者等待一定时间超时才会返回。返回后,客户端再发起下一次长轮询请求监听。 这样设计的好处:1、相对于低延时,客户端发起长轮询,服务端感知到数据发生变更后,能立刻返回响应给客户端。
2、服务端的压力减小,客户端发起长轮询,如果数据没有发生变更,服务端会
hold
住此次客户端的请求,hold
住请求的时间一般会设置到30s
或者60s
,并且服务端hold
住请求不会消耗太多服务端的资源。
a、配置中心厂轮询:
b、实现基本原理
1、首先客户端发送一个 HTTP
请求到服务端;服务端会开启一个异步线程,如果一直没有数据变更会挂起当前请求(一个 Tomcat
也就 200
个线程,长轮询也不应该阻塞 Tomcat
的业务线程,所以需要配置中心在实现长轮询时往往采用异步响应的方式来实现,而比较方便实现异步 HTTP
的常见手段便是 Servlet3.0
提供的 AsyncContext
机制。)
2、在服务端设置的超时时间内仍然没有数据变更,那就返回客户端一个没有变更的标识,客户端继续发起请求;
3、在服务端设置的超时时间内有数据变更了,就返回客户端变更的内容;
客户端实现:
@Slf4j
public class ConfigClientWorker {
private final CloseableHttpClient httpClient;
private final ScheduledExecutorService executorService;
public ConfigClientWorker(String url, String dataId) {
this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setName("client.worker.executor-%d");
thread.setDaemon(true);
return thread;
});
// ① httpClient 客户端超时时间要大于长轮询约定的超时时间
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(40000).build();
this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
executorService.execute(new LongPollingRunnable(url, dataId));
}
class LongPollingRunnable implements Runnable {
private final String url;
private final String dataId;
public LongPollingRunnable(String url, String dataId) {
this.url = url;
this.dataId = dataId;
}
@SneakyThrows
@Override
public void run() {
String endpoint = url + "?dataId=" + dataId;
log.info("endpoint: {}", endpoint);
HttpGet request = new HttpGet(endpoint);
CloseableHttpResponse response = httpClient.execute(request);
switch (response.getStatusLine().getStatusCode()) {
case 200: {
BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity()
.getContent()));
StringBuilder result = new StringBuilder();
String line;
while ((line = rd.readLine()) != null) {
result.append(line);
}
response.close();
String configInfo = result.toString();
log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo);
break;
}
// ② 304 响应码标记配置未变更
case 304: {
log.info("longPolling dataId: [{}] once finished, configInfo is unchanged,
longPolling again", dataId);
break;
}
default: {
throw new RuntimeException("unExcepted HTTP status code");
}
}
executorService.execute(this);
}
}
public static void main(String[] args) throws IOException {
new ConfigClientWorker("http://127.0.0.1:8080/listener", "user");
System.in.read();
}
}
服务端:
@RestController
@Slf4j
@SpringBootApplication
public class ConfigServer {
@Data
private static class AsyncTask {
// 长轮询请求的上下文,包含请求和响应体
private AsyncContext asyncContext;
// 超时标记
private boolean timeout;
public AsyncTask(AsyncContext asyncContext, boolean timeout) {
this.asyncContext = asyncContext;
this.timeout = timeout;
}
}
// guava 提供的多值 Map,一个 key 可以对应多个 value
private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(
HashMultimap.create());
private ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("longPolling-timeout-checker-%d")
.build();
private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);
// 配置监听接入点
@RequestMapping("/listener")
public void addListener(HttpServletRequest request, HttpServletResponse response) {
String dataId = request.getParameter("dataId");
// 开启异步!!!
AsyncContext asyncContext = request.startAsync(request, response);
AsyncTask asyncTask = new AsyncTask(asyncContext, true);
// 维护 dataId 和异步请求上下文的关联
dataIdContext.put(dataId, asyncTask);
// 启动定时器,30s 后写入 304 响应
timeoutChecker.schedule(() -> {
if (asyncTask.isTimeout()) {
dataIdContext.remove(dataId, asyncTask);
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
// 标志此次异步线程完成结束!!!
asyncContext.complete();
}
}, 30000, TimeUnit.MILLISECONDS);
}
// 配置发布接入点
@RequestMapping("/publishConfig")
@SneakyThrows
public String publishConfig(String dataId, String configInfo) {
log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo);
Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId);
for (AsyncTask asyncTask : asyncTasks) {
asyncTask.setTimeout(false);
HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(configInfo);
asyncTask.getAsyncContext().complete();
}
return "success";
}
public static void main(String[] args) {
SpringApplication.run(ConfigServer.class, args);
}
}
二、配置中心的架构
配置中心最基础的功能就是存储一个键值对
1、用户发布一个配置(
configKey
),然后客户端获取这个配置项(configValue
);2、当某个配置项发生变更时,将变更告知客户端刷新旧值。
1、CRUD
⬤ 对于服务端来说:需要考虑的是配置如何存储,是否需要持久化。
⬤ 对于客户端来说:需要考虑的是通过接口从服务器查询得到相应的数据然后返回。
2、配置的动态监听
1)PULL
模式
PULL
模式:表示客户端从服务端主动拉取数据.。
Pull
模式下,客户端需要定时从服务端拉取一次数据,由于定时带来的时间间隔,因此不能保证数据的实时性,并且在服务端配置长时间不更新的情况下,客户端的定时任务会做一些无效的Pull
操作。
2)PUSH
模式
Push
模式:服务端主动把数据推送到客户端。
Push
模式下,服务端需要维持与客户端的长连接,如果客户端的数量比较多,那么服务端需要耗费大量的内存资源来保存每个资源,并且为了检测连接的有效性,还需要心跳机制来维持每个连接的状态。
3)Nacos-Pull
模式
Nacos
采用的是Pull
模式(Kafka
也是如此),并且采用了一种长轮询机制。客户端采用长轮询的方式定时的发起Pull
请求,去检查服务端配置信息是否发生了变更,如果发生了变更,那么客户端会根据变更的数据获得最新的配置。
a、大概流程
1、如果客户端发起 Pull
请求后(带着配置和配置值的 MD5
值和后台对比),发现服务端的配置和客户端的配置是保持一致的,那么服务端会“ Hold
”住这个请求。(服务端拿到这个连接后在指定的时间段内不会返回结果,直到这段时间内的配置发生变化)
2、一旦配置发生了变化,服务端会把原来“ Hold
”住的请求进行返回。
b、流程图
Nacos
服务端收到请求后,会检查配置是否发生了变更,如果没有,那么设置一个定时任务,延期29.5
秒执行。同时并且把当前的客户端长轮询连接加入到allSubs
队列。 这时候有两种方式触发该连接结果的返回:
第一种:等待 29.5
秒(长连接保持的时间)后触发自动检查机制,这时候不管配置有无发生变化,都会把结果返回给客户端。
第二种:在 29.5
秒内的任意一个时刻,通过 Nacos
控制台或者 API
的方式对配置进行了修改,那么触发一个事件机制,监听到该事件的任务会遍历 allSubs
队列,找到发生变更的配置项对应的 ClientLongPolling
任务,将变更的数据通过该任务中的连接进行返回,即完成了一次推送操作。
c、客户端原理
实时变更:
1、当 SpringBoot
项目启动的时候,会执行”准备上下文“的这么一个事情。此时 NacosContextRefresher
会监听到这个事件,**并且注册一个负责监听配置变更回调的监听器 **registerNacosListener
2、registerNacosListener
一旦收到配置变更的回调,则发布一个RefreshEvent
事件,对应的 RefreshEventListener
监听器检测到该事件后,将调用refresh.refresh()
方法来完成配置的更新。
d、服务端原理
1、首先 ConfigController
下有一个监听器相关的接口,是客户端发起数据监听的接口,主要做两件事:获取客户端需要监听的可能发生变化的配置,并计算其MD5值。执行长轮询的请求。
2、将长轮询请求封装成 ClientLongPolling
,交给线程池去执行。执行过程中,可以理解为一个配置为一个长轮询请求,也就对应一个 ClientLongPolling
,将其放在一个队列 allSub
s当中,并且任务总共有29.5秒的等待时间。
3、如果某一个配置发生改变,会调用 LongPollingService的onEvent
方法来通知客户端服务端的数据已经发生了变更。这里所谓的通知也就是从队列中找到变更配置对应的 ClientLongPolling
对象,将发生变更的 groupKey
通过该 ClientLongPolling
写入到响应对象 response
中。
4、那么客户端收到了 respones
后,自然可以得到更改配置的 groupKey
,然后去 Nacos
上查询即可。