要分析Nacos源码,好歹我们也通过源码启动起来,这样也方便我们debug代码。
注:nacos1.1.3
文章篇幅较长,一定要有耐心;如果有疑问欢迎咨询讨论
1.启动服务
源码下载好了根据我下面的步骤先启动起来再说:
注:我们配置中心按照mysql存储配置,如果用默认derby的话,直接按照第4步修改启动即可
1.找到config模块中-resource/META-INF/nacos-db.sql
2.mysql数据库创建nacos库,然后执行上面的nacos-db.sql
3.修改 console模块的application.properties,加入如下内容:spring.datasource.platform=mysql
db.num=1
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true
db.user=root
db.password=root
- 启动参数添加
-Dnacos.standalone=true
表示单机启动- 本地访问127.0.0.1:8848/nacos/index.html就好了,默认账号密码都是nacos
然后我们就看到这样一个页面:
其实这个页面的对应的代码就是我们的console模块,
模块也很简单,就是基于Spring-Security做校验,然后对于这个页面做的一些CRUD
如下:
2.配置中心解析
上面说了那么多的启动内容,终于到了我们这篇文章核心解析点了。
简单的提一下我们服务端的配置管理方式,服务端对于config除了基本的配置存储;另外还有一个历史存储,每一次修改都有数据存储,UI界面也是可以查看的,还可以打标。解读源码前先要对于数据模型有个概念,如下:
官网对于模型的描述:
脑袋里一定要构建这两个模型,这两个模型第一个构建数据模型的key,另外一个构建具体的数据内容;配置中心的key主要是DataId
Nacos 数据模型 Key 由三元组唯一确定, Namespace默认是空串,公共命名空间(public),分组默认是 DEFAULT_GROUP。
配置领域模型围绕配置,主要有两个关联的实体,一个是配置变更历史,一个是服务标签(用于打标分类,方便索引),由 ID 关联。
根据官方的例子来看配置中心相关内容
public class ConfigExample {
public static void main(String[] args) throws NacosException, InterruptedException {
String serverAddr = "localhost";
String dataId = "dubbo.properties";
String group = "DEFAULT_GROUP";
String namespace = "b1092a4a-3b8d-4e33-8874-55cee3839c1f";
Properties properties = new Properties();
properties.put(PropertyKeyConst.NAMESPACE, namespace);
properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
//这里创建了配置服务,个人猜想这里应该是把该初始化的线程服务等你都启动好了
ConfigService configService = NacosFactory.createConfigService(properties);
//根据dataId,group获取配置信息
String content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
//添加监听器
configService.addListener(dataId, group, new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println("receive:" + configInfo);
}
@Override
public Executor getExecutor() {
return null;
}
});
boolean isPublishOk = configService.publishConfig(dataId, group, "content");
System.out.println(isPublishOk);
Thread.sleep(3000);
content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
boolean isRemoveOk = configService.removeConfig(dataId, group);
System.out.println(isRemoveOk);
Thread.sleep(3000);
content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
Thread.sleep(300000);
}
}
2.1 创建ConfigService
先理解ConfigSerice中干了些什么?
//1.通过源码可以发现这里是 NacosConfigService
ConfigService configService = NacosFactory.createConfigService(properties);
//2.如下,这里调用NacosConfigService的构造方法
public static ConfigService createConfigService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService)constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(-400, e.getMessage());
}
}
//3.NacosConfigService的构造方法如下
public NacosConfigService(Properties properties) throws NacosException {
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE;
} else {
encode = encodeTmp.trim();
}
String namespaceTmp = properties.getProperty(PropertyKeyConst.NAMESPACE);
if (StringUtils.isBlank(namespaceTmp)) {
namespace = TenantUtil.getUserTenant();
properties.put(PropertyKeyConst.NAMESPACE, namespace);
} else {
namespace = namespaceTmp;
properties.put(PropertyKeyConst.NAMESPACE, namespace);
}
//这里创建代理连接服务器
agent = new ServerHttpAgent(properties);
//这个其实是针对endpoint设置才会起作用,这里简单说一下;
//线程异步的通过nameServer命名服务获取serverList
agent.start();
//这里就是客户端主要后台工作
//configFilterChainManager就是拦截器管理器,持有所有拦截器,我们在客户端可以配置相应的拦截器
worker = new ClientWorker(agent, configFilterChainManager);
}
我们继续往下看,看这个ClientWorker主要干了些什么?
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties);
//初始化一个客户端工作线程池,可以忽略展示不看
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
//通过下面的命名longPolling,其实就有点来头了;后面我们分析,这也是初识话一个线程池
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
//简单的通过名称看,这里就是检查配置,这是一个定时任务,10ms会执行一次
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
//5.这里分批处理任务,检查配置信息,并更新数据信息,暂时先提到这里,后面具体分析;
public void checkConfigInfo() {
// 分任务 ; cacheMap是一个全局变量和我们添加的Listener有关;
int listenerSize = cacheMap.get().size();
// 向上取整为批数 (ParamUtil.getPerTaskConfigSize()默认数量3000)
int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
executorService.execute(new LongPullingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
总结一下创建NacosConfigServie做了些什么?
- 1.初始化我们配置properties,解析我们一些配置参数,创建我们NacosCofigService服务用于我们客户端执行操作
- 2.创建http代理类,如果我们基于endPoint的命名服务获取服务列表,会有定时线程跑获取serverList这里不展开,可以自己阅读源码
- 3.初始化了两个线程池,一个线程池executor(单线程池)10ms定时执行一次检查配置信息,检查配置如果要执行又通过另外一个线程池executorService来执行
- 4.具体执行又要通过cacheMap(主要功能就是监听器存储者)的数量来决定分几批来执行,检查配置后面在详细讲解(和长轮训有关哦)
2.2 获取配置信息getConfig
下面这一句简单的代码,具体有发生了什么了,下面我们娓娓道来;
String content = configService.getConfig(dataId, group, 5000);
2.2.1 客户端分析
//直接点击进源码,来到核心点NacosConfigSercie这个方法
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
group = null2defaultGroup(group);
ParamUtils.checkKeyParam(dataId, group);//判断不能为空
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
// 优先使用本地配置 (这个就是客户端本地磁盘写的文件)
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
if (content != null) {
LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
dataId, group, tenant, ContentUtils.truncateContent(content));
cr.setContent(content);
//获取到配置信息后执行过滤器过滤
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
try {
//通过远程server获取配置内容,我们主要分析这里
content = worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(content);
//获取到配置信息后执行过滤器过滤
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
//直接返回
return content;
} catch (NacosException ioe) {
if (NacosException.NO_RIGHT == ioe.getErrCode()) {
throw ioe;
}
LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
agent.getName(), dataId, group, tenant, ioe.toString());
}
LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
dataId, group, tenant, ContentUtils.truncateContent(content));
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
通过上面获取代码发现,这里委托给ClientWorker去实现远程配置信心的拉取,继续看源码;
public String getServerConfig(String dataId, String group, String tenant, long readTimeout)
throws NacosException {
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
HttpResult result = null;
try {
List<String> params = null;
if (StringUtils.isBlank(tenant)) {
params = Arrays.asList("dataId", dataId, "group", group);
} else {
params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);
}
//代理类这里就是我们最开始初始化 new MetricsHttpAgent(new ServerHttpAgent(properties))的代理;这里的路径是/configs;记住这是要去服务端看的
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
} catch (IOException e) {
String message = String.format(
"[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),
dataId, group, tenant);
LOGGER.error(message, e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
switch (result.code) {
case HttpURLConnection.HTTP_OK:
//没开启本地缓存会存储快照
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
return result.content;
case HttpURLConnection.HTTP_NOT_FOUND:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
return null;
}
}
其实我们客户端分析到这里就差不多了,再看一下远程怎么获取业务就好了;不过我还是想看一下,在集群模式下,我们客户端怎么请求 哪一个服务,请求失败了又怎么处理的;所以我在往下走了一步。
@Override
public HttpResult httpGet(String path, List<String> headers, List<String> paramValues, String encoding,
long readTimeoutMs) throws IOException {
//这个就是请求超时时间,下面一个do循环,超出这个时间则推出远程请求
final long endTime = System.currentTimeMillis() + readTimeoutMs;
final boolean isSSL = false;
//这里就是我们需要请求的远程url,这里其实是按照权重选择的一个,特别是在集群中的时候
String currentServerAddr = serverListMgr.getCurrentServerAddr();
int maxRetry = this.maxRetry;
do {
try {
List<String> newHeaders = getSpasHeaders(paramValues);
if (headers != null) {
newHeaders.addAll(headers);
}
//发起请求
HttpResult result = HttpSimpleClient.httpGet(
getUrl(currentServerAddr, path), newHeaders, paramValues, encoding,
readTimeoutMs, isSSL);
if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR
|| result.code == HttpURLConnection.HTTP_BAD_GATEWAY
|| result.code == HttpURLConnection.HTTP_UNAVAILABLE) {
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}",
serverListMgr.getCurrentServerAddr(), result.code);
} else {
//有可能有服务请求失败,会更新为最新的远程server地址
// Update the currently available server addr
serverListMgr.updateCurrentServerAddr(currentServerAddr);
return result;
}
} catch (ConnectException ce) {
LOGGER.error("[NACOS ConnectException httpGet] currentServerAddr:{}, err : {}", serverListMgr.getCurrentServerAddr(), ce.getMessage());
} catch (SocketTimeoutException stoe) {
LOGGER.error("[NACOS SocketTimeoutException httpGet] currentServerAddr:{}, err : {}", serverListMgr.getCurrentServerAddr(), stoe.getMessage());
} catch (IOException ioe) {
LOGGER.error("[NACOS IOException httpGet] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe);
throw ioe;
}
if (serverListMgr.getIterator().hasNext()) {
currentServerAddr = serverListMgr.getIterator().next();
} else {
maxRetry--;
if (maxRetry < 0) {
throw new ConnectException("[NACOS HTTP-GET] The maximum number of tolerable server reconnection errors has been reached");
}
//serverlist;请求失败的情况下这个list实现了Iterator功能,通过这个来获取下一个服务
serverListMgr.refreshCurrentServerAddr();
}
} while (System.currentTimeMillis() <= endTime);
LOGGER.error("no available server");
throw new ConnectException("no available server");
}
客户端我们请求的流程就先到这里了,也先来一个小总结:
- 1.先通过本地缓存文件获取,如果存在则直接通过本地文件拉取
- 2.如果本地文件没有则通过远程服务拉取,如果还是没有在本地缓存没有开启的情况下通过本地快照文件拉取
- 3.我们在请求远程服务端的时候会选择循环用某个 服务请求,其中请求失败会换一个连接请求;在超时时间内没有结果会直接返回
2.2.2 服务器处理配置请求
如下就是一个简单的get请求
@GetMapping
public void getConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
String tenant,
@RequestParam(value = "tag", required = false) String tag)
throws IOException, ServletException, NacosException {
// check params
ParamUtils.checkParam(dataId, group, "datumId", "content");
ParamUtils.checkParam(tag);
final String clientIp = RequestUtil.getRemoteIp(request);
inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
}
服务端真正处理的实现来了:
/**
* 同步配置获取接口
*/
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,
String tenant, String tag, String clientIp) throws IOException, ServletException {
//通过dataId,group,tenant拼接为字符串作为group key
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
String autoTag = request.getHeader("Vipserver-Tag");
String requestIpApp = RequestUtil.getAppName(request);
//这是nacos自己实现的自旋锁,超级简单的锁
int lockResult = tryConfigReadLock(groupKey);
final String requestIp = RequestUtil.getRemoteIp(request);
boolean isBeta = false;
if (lockResult > 0) {
// 。。。会判断是否开启beta,tag走不同的逻辑感觉一大对重复代码,这里提取了部分代码
//一般没有开启beta,也没有tag走这里
md5 = cacheItem.getMd5();
lastModified = cacheItem.getLastModifiedTs();
//这里就是单机模式,并且没有使用mysql才会从derby中获取
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
} else {
//我们这种恰好是这种,单机+mysql所以会通过磁盘获取,是不是会很怪,为啥mysql还要从文件中读取,那么什么时候会写呢?是不是好多疑问?
file = DiskUtil.targetFile(dataId, group, tenant);
}
if (configInfoBase == null && fileNotExist(file)) {
// FIXME CacheItem
// 不存在了无法简单的计算推送delayed,这里简单的记做-1
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);
// pullLog.info("[client-get] clientIp={}, {},
// no data",
// new Object[]{clientIp, groupKey});
//而且文件没有直接返回文件不存在,why?小朋友是不是一大堆问好????别着急;后面会解析
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
}
//文件存在的话会走这里的内容;返回内容以及内容的md5,最后一次修改时间等,
response.setHeader(Constants.CONTENT_MD5, md5);
/**
* 禁用缓存
*/
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
response.setDateHeader("Last-Modified", lastModified);
} else {
fis = new FileInputStream(file);
response.setDateHeader("Last-Modified", file.lastModified());
}
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
out = response.getWriter();
out.print(configInfoBase.getContent());
out.flush();
out.close();
} else {
fis.getChannel().transferTo(0L, fis.getChannel().size(),
Channels.newChannel(response.getOutputStream()));
}
LogUtil.pullCheckLog.warn("{}|{}|{}|{}", groupKey, requestIp, md5, TimeUtils.getCurrentTimeStr());
final long delayed = System.currentTimeMillis() - lastModified;
// TODO distinguish pull-get && push-get
// 否则无法直接把delayed作为推送延时的依据,因为主动get请求的delayed值都很大,发布事件这里就是记录了一个traceLog
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,
ConfigTraceService.PULL_EVENT_OK, delayed,
requestIp);
。。。
}else if (lockResult == 0) { //获取锁失败
// FIXME CacheItem 不存在了无法简单的计算推送delayed,这里简单的记做-1
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
} else {
pullLog.info("[client-get] clientIp={}, {}, get data during dump", clientIp, groupKey);
response.setStatus(HttpServletResponse.SC_CONFLICT);
response.getWriter().println("requested file is being modified, please try later.");
return HttpServletResponse.SC_CONFLICT + "";
}
return HttpServletResponse.SC_OK + "";
}
服务端处理获取配置请求总结一下:
- 1.加锁获取内存中的缓存信息,根据是否是beta,是否含有tag等走下面的逻辑
- 2.判断单机+不是mysql;读取数据库数据,否则读取本地文件缓存数据是否存在
- 3.不存在直接返回文件不存在响应,并记录日志,否则返回内容信息
当时看到这个有点懵逼了。。为啥有mysql不读,反而去读一个磁盘?磁盘的数据又是什么时候写的?why?感觉顿时怀疑人生了,别着急;后面会解析
先简单的个人分析下:
除了单机+非mysql才查询sql;而且nacos默认的配置存储默认是derby,还要一个就是mysql的实现;那么结论就是:
- 只有单机+derby存储才会查sql;其他都查sql,而derby又是nacos内置数据库,存储在本地文件中,说白了就是本地文件;
那么这里获取的信息其实就是本地文件,就算是mysql也不会从远程拉取服务,降低远程请求消耗;而且在集群的情况下肯定是本地文件来拉取的; 个人观点理解,有误请欢迎批评指正
上面说了那么多,其实都是比较简单的;就是简单的获取请求,那么在更新配置后,客户端的配置怎么更新呢,是客户端主动拉取,还是服务端推送呢?这个才是核心关键
2.3 配置更新
我们可以通过某一个客户端更新配置,或者UI界面更新配置;
客户端
boolean isPublishOk = configService.publishConfig(dataId, group, "content");
客户端这个请求流程和获取配置一样;只是发起一个Post请求;
重点看服务端的配置处理ConfigController
@PostMapping
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
String tenant,
@RequestParam("content") String content,
@RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema)
throws NacosException {
final String srcIp = RequestUtil.getRemoteIp(request);
String requestIpApp = RequestUtil.getAppName(request);
ParamUtils.checkParam(dataId, group, "datumId", content);
ParamUtils.checkParam(tag);
Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
//去掉了一些不重要检查
if (AggrWhitelist.isAggrDataId(dataId)) {
log.warn("[aggr-conflict] {} attemp to publish single data, {}, {}",
RequestUtil.getRemoteIp(request), dataId, group);
throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
}
final Timestamp time = TimeUtils.getCurrentTime();
String betaIps = request.getHeader("betaIps");
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
//这里就是简单的 数据库持久化(derby/mysql)
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
//主要是这个事件触发器,
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
} else { // beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
}
ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(),
LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
return true;
}
重点就是事件触发:
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
这个事件触发会把监听ConfigDataChangeEvent事件的监听器执行了
/**
* fire event, notify listeners.
*/
static public void fireEvent(Event event) {
if (null == event) {
throw new IllegalArgumentException();
}
//遍历执行即可,遍历的是直接感兴趣的listerner
for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
log.error(e.toString(), e);
}
}
}
对于AbstractEventListener 我们只有两个实现;
- AsyncNotifyService:通过名字就是一个异步通知服务
这个实现是监听的ConfigDataChangeEvent;所以触发这个 - LongPollingService:长轮训服务,有点意思,
不过这个感兴趣的是LocalDataChangeEvent,所以这个不会触发哦
//这个是AysncNotifyServive实现的接口,而且创建的时会自动添加到ConfigDataChangeEvent监听列表中
@Override
public List<Class<? extends Event>> interest() {
List<Class<? extends Event>> types = new ArrayList<Class<? extends Event>>();
// 触发配置变更同步通知
types.add(ConfigDataChangeEvent.class);
return types;
}
所以我们先分析到这里会触发 AysncNotifyService监听器,下面继续看
2.3.1 AysncNotifyService监听器
AysncNotifyService的事件触发后执行的逻辑,
@Override
public void onEvent(Event event) {
// 并发产生 ConfigDataChangeEvent
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
List<?> ipList = serverListService.getServerList();
// 其实这里任何类型队列都可以
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
//这里会根据server集群的个数添加几个task
for (int i = 0; i < ipList.size(); i++) {
//这里记得看一下url构成会有 /communication/dataChange
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta));
}
//线程异步执行,AsyncTask任务
EXECUTOR.execute(new AsyncTask(httpclient, queue));
}
}
具体异步任务执行逻辑
class AsyncTask implements Runnable {
public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) {
this.httpclient = httpclient;
this.queue = queue;
}
@Override
public void run() {
executeAsyncInvoke();
}
private void executeAsyncInvoke() {
while (!queue.isEmpty()) {
NotifySingleTask task = queue.poll();
String targetIp = task.getTargetIP();
if (serverListService.getServerList().contains(
targetIp)) {
// 启动健康检查且有不监控的ip则直接把放到通知队列,否则通知
if (serverListService.isHealthCheck()
&& ServerListService.getServerListUnhealth().contains(targetIp)) {
// target ip 不健康,则放入通知列表中
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(),
LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);
// get delay time and set fail count to the task,这会重试
asyncTaskExecute(task);
} else {
HttpGet request = new HttpGet(task.url);
request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
String.valueOf(task.getLastModified()));
request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP);
if (task.isBeta) {
request.setHeader("isBeta", "true");
}
//发送通知,通知的是各个服务这个接口,communication/dataChange
//这里注意有一个回调函数,作用很简单如果服务端通知失败,会做一个次记录,另外这里设置了有一个重试最多次数,还有时长组将增大,避免无限重试增大服务器开销
httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task));
}
}
}
}
private Queue<NotifySingleTask> queue;
private CloseableHttpAsyncClient httpclient;
}
异步通知其实又发起来一个广播通知/ communication/dataChange接口调用,所以辗转到CommnicationController
了,这也是集群下通知所有的服务器的流程
/**
* 通知配置信息改变
*/
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
String tenant,
@RequestParam(value = "tag", required = false) String tag) {
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
String isBetaStr = request.getHeader("isBeta");
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
//这里就是主要的执行逻辑了,主要是异步处理
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
//返回true表示执行成功了
return true;
}
最后我们进源码发现,只是投建了一个DumpTask放到taskManager的队列中
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
//这里使用的是dumpTaskMgr,所以默认的处理器就是DumpProcessor
dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
}
/**
* 用于处理一定要执行成功的任务 单线程的方式处理任务,保证任务一定被成功处理
*
* @author huali
*/
public final class TaskManager implements TaskManagerMBean {
/**
* 将任务加入到任务Map中
*
* @param type
* @param task
*/
public void addTask(String type, AbstractTask task) {
this.lock.lock();
try {
//放入队列中
AbstractTask oldTask = tasks.put(type, task);
MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
if (null != oldTask) {
task.merge(oldTask);
}
} finally {
this.lock.unlock();
}
}
}
上面可以发现放入队列中就直接返回结果了;这里放入队列了不用猜就知道肯定有一个后台线程来执行队列;
TaskManager
//服务启动会创建这个后台线程跑服务
class ProcessRunnable implements Runnable {
@Override
public void run() {
while (!TaskManager.this.closed.get()) {
try {
//直接100ms一次的执行一次
Thread.sleep(100);
TaskManager.this.process();
} catch (Throwable e) {
}
}
}
}
protected void process() {
for (Map.Entry<String, AbstractTask> entry : this.tasks.entrySet()) {
AbstractTask task = null;
this.lock.lock();
try {
// 获取任务
task = entry.getValue();
if (null != task) {
if (!task.shouldProcess()) {
// 任务当前不需要被执行,直接跳过
continue;
}
// 先将任务从任务Map中删除
this.tasks.remove(entry.getKey());
MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}
} finally {
this.lock.unlock();
}
if (null != task) {
// 获取任务处理器
TaskProcessor processor = this.taskProcessors.get(entry.getKey());
if (null == processor) {
// 如果没有根据任务类型设置的处理器,使用默认处理器
processor = this.getDefaultTaskProcessor();
}
if (null != processor) {
boolean result = false;
try {
// 处理任务,
result = processor.process(entry.getKey(), task);
} catch (Throwable t) {
log.error("task_fail", "处理task失败", t);
}
if (!result) {
// 任务处理失败,设置处理时间
task.setLastProcessTime(System.currentTimeMillis());
// 失败了,将任务重新加入到任务Map中,下次再次存储
this.addTask(entry.getKey(), task);
}
}
}
}
if (tasks.isEmpty()) {
this.lock.lock();
try {
this.notEmpty.signalAll();
} finally {
this.lock.unlock();
}
}
}
上面的任务处理又会交给DumpProcessor去处理
@Override
public boolean process(String taskType, AbstractTask task) {
DumpTask dumpTask = (DumpTask)task;
String[] pair = GroupKey2.parseKey(dumpTask.groupKey);
String dataId = pair[0];
String group = pair[1];
String tenant = pair[2];
long lastModified = dumpTask.lastModified;
String handleIp = dumpTask.handleIp;
boolean isBeta = dumpTask.isBeta;
String tag = dumpTask.tag;
//省略了一些判断逻辑,这里是主要的数据处理
if (StringUtils.isBlank(tag)) {
//通过数据库获取配置信息,这个数据库肯定是最新的
ConfigInfo cf = dumpService.persistService.findConfigInfo(dataId, group, tenant);
//重新加载聚合白名单,先不管
if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
if (null != cf) {
AggrWhitelist.load(cf.getContent());
} else {
AggrWhitelist.load(null);
}
}
//加载客户端ip白名单,也不管
if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
if (null != cf) {
ClientIpWhiteList.load(cf.getContent());
} else {
ClientIpWhiteList.load(null);
}
}
if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
if (null != cf) {
SwitchService.load(cf.getContent());
} else {
SwitchService.load(null);
}
}
boolean result;
if (null != cf) {
//主要是这一步,这一步主要做了些什么内容
//1.更新内存中的缓存
//2.dump磁盘更新
result = ConfigService.dump(dataId, group, tenant, cf.getContent(), lastModified);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
cf.getContent().length());
}
} else {
result = ConfigService.remove(dataId, group, tenant);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
}
}
return result;
}
上面使用ConfigService.dump执行的任务才是重点,这里一共做了几件事
- 如果不是单机,或者使用了mysql,那么更新本地文件缓存
- 构建CacheItem缓存在内存中
- 发布一个LocalDataChangeEvent事件;
/**
* 保存配置文件,并缓存md5.
*/
static public boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
makeSure(groupKey);
final int lockResult = tryWriteLock(groupKey);
assert (lockResult != 0);
if (lockResult < 0) {
dumpLog.warn("[dump-error] write lock failed. {}", groupKey);
return false;
}
try {
final String md5 = MD5.getInstance().getMD5String(content);
if (md5.equals(ConfigService.getContentMd5(groupKey))) {
dumpLog.warn(
"[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
+ "lastModifiedNew={}",
groupKey, md5, ConfigService.getLastModifiedTs(groupKey), lastModifiedTs);
} else if (!STANDALONE_MODE || PropertyUtil.isStandaloneUseMysql()) {
//其实这里也是解密了;会把更新的内容使用本地文件缓存起来;
//这也是我们在get配置信息的时候直接从文件获取的地方
DiskUtil.saveToDisk(dataId, group, tenant, content);
}
updateMd5(groupKey, md5, lastModifiedTs);
return true;
} catch (IOException ioe) {
dumpLog.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
if (ioe.getMessage() != null) {
String errMsg = ioe.getMessage();
if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN)
|| errMsg.contains(DISK_QUATA_EN)) {
// 磁盘写满保护代码
fatalLog.error("磁盘满自杀退出", ioe);
System.exit(0);
}
}
return false;
} finally {
releaseWriteLock(groupKey);
}
}
//更新md5
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs;
//这里触发LocalDataChangeEvent事件,涉及到我们之前所有的长轮训监听器,哈哈,是不是有点通了;
EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));
}
}
基本上这里可以再次总结下服务端配置更新后流程:
- 1.某一个服务器收到更新请求后;先自己更新本地数据库的数据;然后发布一个ConfigDataChangeEvent事件
- 2.该事件会让每一个服务收到某一个配置更改了,然后每一个服务开始执行流程
- 3.每一个服务会新建一个task任务放入自己的任务队列中
- 4.每一个服务后台的线程会从队列中执行该任务
- 任务执行包含,如果非单机或者mysql会刷新本地缓存文件;这个也是我们前面分析的获取配置的文件
- 会更新内存中CacheItem缓存内容信息
- 然后触发 LocalDataChangeEvent 这个涉及长轮训情况,下面解析
注意一点配置中心数据一致性的问题:
- nacos的配置中心在集群条件下配置数据依赖于第三方mysql做数据库存储,因为默认的derby是服务内置的存储,难以满足集群条件共享
- 数据变更后,变更节点更新mysql后会广播的消息给其他节点,失败后也只会重试几次,多次失败了就没有做其他处理了不过是由日志记录的;节点收到广播消息会添加到自己的队列里,不断的处理,失败了在添加会队列中即可。而且没有服务器没有其他定时任务去比较服务器配置内容
所以配置中心应该属于采用去中心化的思想设计的。
疑问???如果服务器发布数据,其他节点更新没有成功怎么办?肯定有解决方案,后文揭秘
其实到这里内容基本上都差不多了,内容篇幅过长,可以休息下分批阅读;
2.3.2 LongPollingService长轮训服务
上面我们解析到服务端配置变化后会触发LocalDataChangeEvent事件,也就是LongPollingService的onEvent方法;那么具体整个流程会是怎么样的呢;我们得结合客户端请求来看;
之前给出一个疑问:配置更新后,我们是客户端主动去拉还是服务端推送?
1)客户端分析
先从客户端分析,我们还记得我们创建NacosConfigService会开启后台线程检查配置更新;ClientWorker,先回忆下代码
public void checkConfigInfo() {
// 分任务,这里就是监听器的个数
int listenerSize = cacheMap.get().size();
// 向上取整为批数 (perTaskCofigSize 默认是3000)
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
第一步先分析这里如果longingTaskCount <=currentLongingTaskCount是不会执行LongPollingRunnable的;
所以需要知道
- currentLongingTaskCount(初始化是0)在执行后会更新为longTaskCount的值
- longingTaskCount 是我们本地监听器的数量除以3000向上取整;所以如果没有监听器,我觉得这里的长轮训根本不会执行;而且都是3000个监听器由一个线程来轮训;
在看LongPollingRunnable具体执行逻辑
class LongPollingRunnable implements Runnable {
private int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
//本地配置检查,这个可以自己查看
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// check server config 这里只返回更改后的dataId,group,tenant(namespace)具体内容还要去重新拉取一次 请求路径 configs/listener
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
//重新拉取代码更新
String content = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(content));
} catch (NacosException ioe) {
String message = String.format(
"[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
//检查本地MD5,如果更新会触发本地监听器
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
//执行完了会再次执行该流程
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}
2) 服务端处理轮训
客户端一次会把少于3000个实例data发送给服务端检查MD5
请求路径 configs/listener;实例ConfigController;
/**
* 比较MD5
*/
@PostMapping("/listener")
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
ConfigServletInner执行
/**
* 轮询接口
*/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize)
throws IOException {
// 长轮询
if (LongPollingService.isSupportLongPolling(request)) {
//重点就是这个方法执行了
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
// else 兼容短轮询逻辑
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
// 兼容短轮询result
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);
。。。。
// 禁用缓存
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}
LongPollingService这里添加客户端长轮训任务,有服务端通过线程池持有
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
/**
* 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance
*/
//这里就是服务端hang住的时间,这里是30s-500ms=29.5s
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// do nothing but set fix polling timeout
} else {
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
clientMd5Map.size(), probeRequestSize, changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// 一定要由HTTP线程调用,否则离开后容器会立即发送响应
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
asyncContext.setTimeout(0L);
//这里就是执行的延迟timeOut的任务返回当前的长连接结果
scheduler.execute(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
任务具体内容,就是检查服务端的md5是否修改,修改了就结果;
class ClientLongPolling implements Runnable {
@Override
public void run() {
asyncTimeoutFuture = scheduler.schedule(new Runnable() {
@Override
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
/**
* 删除订阅关系
*/
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - createTime),
"fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
"polling",
clientMd5Map.size(), probeRequestSize);
//MD5比较,
List<String> changedGroups = MD5Util.compareMd5(
(HttpServletRequest)asyncContext.getRequest(),
(HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - createTime),
"timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
"polling",
clientMd5Map.size(), probeRequestSize);
sendResponse(null);
}
} catch (Throwable t) {
LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
//这里会把当前放在队列中,以备后用
allSubs.add(this);
}
这里长轮训就是这样实现的,客户端请求后被服务端给hang住29.5s后返回具体结果;但是如果中途有数据更改了,真的会等待29.5后返回结果吗;
还记得我们之前数据更改了触发了一个LocalDataChangeEvent事件吗,而这个事件正好是LongPollingService来处理
public void onEvent(Event event) {
if (isFixedPolling()) {
// ignore
} else {
//这里处理dataChangeTask
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
最终本地发送配置变化会直接通过如下方式解决
/**
* 长轮询订阅关系
*/
final Queue<ClientLongPolling> allSubs;
// =================
class DataChangeTask implements Runnable {
@Override
public void run() {
try {
ConfigService.getContentBetaMd5(groupKey);
//遍历客户端包含这个group的请求
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// 如果beta发布且不在beta列表直接跳过
if (isBeta && !betaIps.contains(clientSub.ip)) {
continue;
}
// 如果tag发布且不在tag列表直接跳过
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue;
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // 删除订阅关系
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - changeTime),
"in-advance",
RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),
"polling",
clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
//这里直接拿到在服务端hang住的请求返回更改的配置groupKey
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Throwable t) {
LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());
}
}
到这里也是基本都是完成这个配置的分析;不过这里也留给你一个问题:
分析途中有一个判定isFixedPolling()
这个影响和目前是否即可返回结果还是保持服务端hang任务;
2.4 服务启动数据加载
上面我们也是发现了,集群模式下服务器之间的数据是没有做数据定时检查的,那么启动的数据肯定要从最新的数据去获取,也就是我们的mysql拿,然后dump到服务器本地中,后面客户端请求就从本地获取了。。具体我们通过代码来验证
@Service
public class DumpService {
@Autowired
private Environment env;
@Autowired
PersistService persistService;
//启动初始化方法,作为服务器对于配置文件的处理
@PostConstruct
public void init() {
LogUtil.defaultLog.warn("DumpService start");
//构建不同的处理器
DumpProcessor processor = new DumpProcessor(this);
DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);
DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this);
dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");
dumpTaskMgr.setDefaultTaskProcessor(processor);
dumpAllTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpAllTaskManager");
dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
//清除超过30天的历史配置
Runnable clearConfigHistory = () -> {
log.warn("clearConfigHistory start");
if (ServerListService.isFirstIp()) {
try {
Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());
int totalCount = persistService.findConfigHistoryCountByTime(startTime);
if (totalCount > 0) {
int pageSize = 1000;
int removeTime = (totalCount + pageSize - 1) / pageSize;
log.warn("clearConfigHistory, getBeforeStamp:{}, totalCount:{}, pageSize:{}, removeTime:{}",
new Object[] {startTime, totalCount, pageSize, removeTime});
while (removeTime > 0) {
// 分页删除,以免批量太大报错
persistService.removeConfigHistory(startTime, pageSize);
removeTime--;
}
}
} catch (Throwable e) {
log.error("clearConfigHistory error", e);
}
}
};
try {
//启动的时候,更新本地磁盘为最新的数据。
dumpConfigInfo(dumpAllProcessor);
// 更新beta缓存
LogUtil.defaultLog.info("start clear all config-info-beta.");
DiskUtil.clearAllBeta();
if (persistService.isExistTable(BETA_TABLE_NAME)) {
dumpAllBetaProcessor.process(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
}
// 更新Tag缓存
LogUtil.defaultLog.info("start clear all config-info-tag.");
DiskUtil.clearAllTag();
if (persistService.isExistTable(TAG_TABLE_NAME)) {
dumpAllTagProcessor.process(DumpAllTagTask.TASK_ID, new DumpAllTagTask());
}
// add to dump aggr
List<ConfigInfoChanged> configList = persistService.findAllAggrGroup();
if (configList != null && !configList.isEmpty()) {
total = configList.size();
List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);
for (List<ConfigInfoChanged> list : splitList) {
MergeAllDataWorker work = new MergeAllDataWorker(list);
work.start();
}
log.info("server start, schedule merge end.");
}
} catch (Exception e) {
LogUtil.fatalLog.error(
"Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),
e.getCause());
throw new RuntimeException(
"Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage());
}
//集群条件
if (!STANDALONE_MODE) {
Runnable heartbeat = () -> {
String heartBeatTime = TimeUtils.getCurrentTime().toString();
// write disk
try {
DiskUtil.saveHeartBeatToDisk(heartBeatTime);
} catch (IOException e) {
LogUtil.fatalLog.error("save heartbeat fail" + e.getMessage());
}
};
//周期性心跳检查,日志记录心跳时间
TimerTaskService.scheduleWithFixedDelay(heartbeat, 0, 10, TimeUnit.SECONDS);
long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
LogUtil.defaultLog.warn("initialDelay:{}", initialDelay);
//周期性任务 全量dump所有配置信息,时间间隔是6*60min=6h
TimerTaskService.scheduleWithFixedDelay(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
TimeUnit.MINUTES);
TimerTaskService.scheduleWithFixedDelay(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
TimeUnit.MINUTES);
}
//定时任务执行清理
TimerTaskService.scheduleWithFixedDelay(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
}
首次dump本地文件
private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor)
throws IOException {
int timeStep = 6;
Boolean isAllDump = true;
// initial dump all
FileInputStream fis = null;
Timestamp heartheatLastStamp = null;
try {
//默认我们没有配置快速启动,这里可以自己分析下,看起来也是通过心跳文件读取,如果最后心跳的时间和当前时间不超过6小时就不加载数据库文件做持久化
if (isQuickStart()) { //是否快速启动,默认不是
File heartbeatFile = DiskUtil.heartBeatFile();
if (heartbeatFile.exists()) {
fis = new FileInputStream(heartbeatFile);
String heartheatTempLast = IoUtils.toString(fis, Constants.ENCODE);
heartheatLastStamp = Timestamp.valueOf(heartheatTempLast);
if (TimeUtils.getCurrentTime().getTime()
- heartheatLastStamp.getTime() < timeStep * 60 * 60 * 1000) {
isAllDump = false;
}
}
}
if (isAllDump) {
//如果启动全量dump,也是默认就直接服务数据库的数据,全部给dump存储在服务器磁盘中
LogUtil.defaultLog.info("start clear all config-info.");
DiskUtil.clearAll();
dumpAllProcessor.process(DumpAllTask.TASK_ID, new DumpAllTask());
} else {
//如果是快速启动满足小于6h,那么部分检查文件md5然后更新
Timestamp beforeTimeStamp = getBeforeStamp(heartheatLastStamp,
timeStep);
DumpChangeProcessor dumpChangeProcessor = new DumpChangeProcessor(
this, beforeTimeStamp, TimeUtils.getCurrentTime());
dumpChangeProcessor.process(DumpChangeTask.TASK_ID, new DumpChangeTask());
Runnable checkMd5Task = () -> {
LogUtil.defaultLog.error("start checkMd5Task");
List<String> diffList = ConfigService.checkMd5();
for (String groupKey : diffList) {
String[] dg = GroupKey.parseKey(groupKey);
String dataId = dg[0];
String group = dg[1];
String tenant = dg[2];
ConfigInfoWrapper configInfo = persistService.queryConfigInfo(dataId, group, tenant);
ConfigService.dumpChange(dataId, group, tenant, configInfo.getContent(),
configInfo.getLastModified());
}
LogUtil.defaultLog.error("end checkMd5Task");
};
TimerTaskService.scheduleWithFixedDelay(checkMd5Task, 0, 12,
TimeUnit.HOURS);
}
} catch (IOException e) {
LogUtil.fatalLog.error("dump config fail" + e.getMessage());
throw e;
} finally {
if (null != fis) {
try {
fis.close();
} catch (IOException e) {
LogUtil.defaultLog.warn("close file failed");
}
}
}
}
从类,方法的命名也应该很容易猜测出来,我们直接看dumpAllProcesso处理流程
6h执行一次的全量更新
class DumpAllProcessor implements TaskProcessor {
DumpAllProcessor(DumpService dumpService) {
this.dumpService = dumpService;
this.persistService = dumpService.persistService;
}
@Override
public boolean process(String taskType, AbstractTask task) {
long currentMaxId = persistService.findConfigMaxId();
long lastMaxId = 0;
//基于上次id来查找数据,优化数据分页
while (lastMaxId < currentMaxId) {
//分页查出配置信息数据,默认1000条
Page<PersistService.ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId,
PAGE_SIZE);
if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {
for (PersistService.ConfigInfoWrapper cf : page.getPageItems()) {
long id = cf.getId();
lastMaxId = id > lastMaxId ? id : lastMaxId;
if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(cf.getContent());
}
if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
ClientIpWhiteList.load(cf.getContent());
}
if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {
SwitchService.load(cf.getContent());
}
//dump到本地服务器磁盘上
boolean result = ConfigService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(),
cf.getLastModified(), cf.getType());
final String content = cf.getContent();
final String md5 = Md5Utils.getMD5(content, Constants.ENCODE);
LogUtil.dumpLog.info("[dump-all-ok] {}, {}, length={}, md5={}",
GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(), md5);
}
defaultLog.info("[all-dump] {} / {}", lastMaxId, currentMaxId);
} else {
lastMaxId += PAGE_SIZE;
}
}
return true;
}
static final int PAGE_SIZE = 1000;
final DumpService dumpService;
final PersistService persistService;
}
简单的总结:
- 1.默认会把数据库的配置提取出来缓存到服务器磁盘文件中
- 2.集群模式下DumpService在启动初始化的时候启动周期任务做心跳日志,主要是为下次启动是否全量dump做记录
- 3.集群模式下默认6小时执行从数据库中执行一起全量配置dump到本地磁盘
- 4.周期检查历史配置,超过30天的做删除
- ...
之前的数据性一致性问题:如果数据更新的时候更新失败,会在6h后定时任务从数据库中拉取数据然后更新本地文件。
2.5 服务之间健康检查
集群中服务肯定要做一个健康检查,如果有服务不可用要做相关的处理服务主要看 ServerListService类
同样的道理在类初始化会解析集群下的节点信息,如果开启了地址服务器还会定时从远程服务器更新拿到server列表信息。
@PostConstruct
public void init() {
String envDomainName = System.getenv("address_server_domain");
if (StringUtils.isBlank(envDomainName)) {
domainName = System.getProperty("address.server.domain", "jmenv.tbsite.net");
} else {
domainName = envDomainName;
}
String envAddressPort = System.getenv("address_server_port");
if (StringUtils.isBlank(envAddressPort)) {
addressPort = System.getProperty("address.server.port", "8080");
} else {
addressPort = envAddressPort;
}
addressUrl = System.getProperty("address.server.url",
servletContext.getContextPath() + "/" + RunningConfigUtils.getClusterName());
addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;
envIdUrl = "http://" + domainName + ":" + addressPort + "/env";
defaultLog.info("ServerListService address-server port:" + addressPort);
defaultLog.info("ADDRESS_SERVER_URL:" + addressServerUrl);
isHealthCheck = PropertyUtil.isHealthCheck();
maxFailCount = PropertyUtil.getMaxHealthCheckFailCount();
fatalLog.warn("useAddressServer:{}", isUseAddressServer);
GetServerListTask task = new GetServerListTask();
task.run();
if (CollectionUtils.isEmpty(serverList)) {
fatalLog.error("########## cannot get serverlist, so exit.");
throw new RuntimeException("cannot get serverlist, so exit.");
} else {
TimerTaskService.scheduleWithFixedDelay(task, 0L, 5L, TimeUnit.SECONDS);
}
}
在web容器初始化完成后触发事件
@Override
public void onApplicationEvent(WebServerInitializedEvent event) {
if (port == 0) {
port = event.getWebServer().getPort();
List<String> newList = new ArrayList<String>();
for (String serverAddrTmp : serverList) {
newList.add(getFormatServerAddr(serverAddrTmp));
}
setServerList(new ArrayList<String>(newList));
}
httpclient.start();
//很容易发现我们有一个周期任务5s检查一次服务器健康;
CheckServerHealthTask checkServerHealthTask = new CheckServerHealthTask();
TimerTaskService.scheduleWithFixedDelay(checkServerHealthTask, 0L, 5L, TimeUnit.SECONDS);
}
继续看CheckServerHealthTask执行逻辑
private void checkServerHealth() {
long startCheckTime = System.currentTimeMillis();
for (String serverIp : serverList) {
//请求其他节点节点
// Compatible with old codes,use status.taobao
String url = "http://" + serverIp + servletContext.getContextPath() + Constants.HEALTH_CONTROLLER_PATH;
// 路径url:"/nacos/health";
HttpGet request = new HttpGet(url);
//我们也有回调函数,如果节点信息返回成功就从不健康列表移除,否则需要添加上
httpclient.execute(request, new AsyncCheckServerHealthCallBack(serverIp));
}
long endCheckTime = System.currentTimeMillis();
long cost = endCheckTime - startCheckTime;
defaultLog.debug("checkServerHealth cost: {}", cost);
}
看看其他节点怎么处理的
@GetMapping
public String getHealth() {
// TODO UP DOWN WARN
StringBuilder sb = new StringBuilder();
//数据库的监控 信息
String dbStatus = dataSourceService.getHealth();
//如果有远程地址服务器,还有远程地址服务器是否健康信息
if (dbStatus.contains(heathUpStr) && ServerListService.isAddressServerHealth() && ServerListService
.isInIpList()) {
sb.append(heathUpStr);
} else if (dbStatus.contains(heathWarnStr) && ServerListService.isAddressServerHealth() && ServerListService
.isInIpList()) {
sb.append("WARN:");
sb.append("slave db (").append(dbStatus.split(":")[1]).append(") down. ");
} else {
sb.append("DOWN:");
//对于配置中心有主db判别
if (dbStatus.contains(heathDownStr)) {
sb.append("master db (").append(dbStatus.split(":")[1]).append(") down. ");
}
if (!ServerListService.isAddressServerHealth()) {
sb.append("address server down. ");
}
if (!ServerListService.isInIpList()) {
sb.append("server ip ").append(LOCAL_IP).append(" is not in the serverList of address server. ");
}
}
//返回数据
return sb.toString();
}
服务器节点间的监控信息比较简单,就周期发送心跳检测即可。
配置中心集群大总结:
- 1.配置中心依赖第三方数据库mysql做存储
- 数据更新数据库,然后广播通知其他节点更新,使用了多个异步操作事件通知完成更新
- 3.所有的客户端获取的数据都是服务节点本地缓存信息获取的,不会去数据库拉数据
- 配置中心一致性的问题也是依赖6h一次全量从数据库同步到服务器磁盘缓存文件中做到的
- 5.客户端监听的配置会使用长轮训去拉取服务器配置变化,如果没有变化服务端会hang住29.5s的时间避免服务器压力。
-...