Nacos配置中心
基于版本1.0.0
Nacos Client
使用Client的Demo代码
public class ClientDemo {
public static void main(String[] args) {
try {
String serverAddr = "127.0.0.1:8848";
String dataId = "nacos-demo.properties";
String group = "DEFAULT_GROUP";
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
ConfigService configService = NacosFactory.createConfigService(properties);
//没有添加listener 直接获取服务器配置
String content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
} catch (NacosException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
整个过程创建了ConfigService,查看NacosFactory.createConfigService(properties);
public class NacosFactory {
public static ConfigService createConfigService(Properties properties)
throws NacosException {
//查看方法
return ConfigFactory.createConfigService(properties);
}
//.....
}
可以得知通过ConfigFactory工厂类创建ConfigService,继续查看,通过反射创建实例
public class ConfigFactory {
public static ConfigService createConfigService(Properties properties)
throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
//反射创建实例方法
ConfigService vendorImpl =
(ConfigService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(-400, e.getMessage());
}
}
}
通过反射形式创建了NacosConfigService
实例。ConfigService
是个接口,其构造如下
public interface ConfigService {
//获取配置信息
String getConfig(String dataId, String group, long timeoutMs)
throws NacosException;
//添加配置更新的监听器
void addListener(String dataId, String group, Listener listener)
throws NacosException;
//发布配置信息
boolean publishConfig(String dataId, String group, String content)
throws NacosException;
//删除配置
boolean removeConfig(String dataId, String group)
throws NacosException;
//移除监听器
void removeListener(String dataId, String group, Listener listener);
String getServerStatus();
}
NacosConfigService
实现如下:
public class NacosConfigService implements ConfigService {
private final long POST_TIMEOUT = 3000L;
private static final String EMPTY = "";
private HttpAgent agent;
private ClientWorker worker;// 负责执行长轮询任务
private String namespace;
private String encode;
private ConfigFilterChainManager configFilterChainManager =
new ConfigFilterChainManager();
public NacosConfigService(Properties properties) throws NacosException {
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE;
} else {
encode = encodeTmp.trim();
}
initNamespace(properties);
//服务信息,用于鉴权、服务地址等的配置信息设置
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
//启动了ServerHttpAgent,作用是监听服务器列表是否变化。同步方法。具体查看阅读源码了解
agent.start();
//长轮询任务实例,监听配置更新的关键
worker = new ClientWorker(agent, configFilterChainManager);
}
//...
}
ClientWorker
源码
public ClientWorker(final HttpAgent agent,
final ConfigFilterChainManager configFilterChainManager) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
executorService = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName());
t.setDaemon(true);
return t;
}
});
executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
//检查配置是否变化,通过cacheData(linstener)记录配置文件md5信息比较
checkConfigInfo();
} catch (Throwable e) {
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);//每10毫秒执行一次
}
public void checkConfigInfo() {
// cacheMap保存的 groupKey -> cacheData。groupkey以dataId+group定义
int listenerSize = cacheMap.get().size();
int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
// 执行配置信息更新线程
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
LongPollingRunnable
结构
class LongPollingRunnable implements Runnable {
private int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
public void run() {
try {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
// 本地缓存检查
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
// 更新linstener记录的md5值
cacheData.checkListenerMd5();
}
} catch (Exception e) {
}
}
}
List<String> inInitializingCacheList = new ArrayList<String>();
//获取配置中心变化的配置信息,对inInitializingCacheList初始化
List<String> changedGroupKeys =
checkUpdateDataIds(cacheDatas, inInitializingCacheList);
// 配置发生变化则获取服务器最新配置
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
//获取服务器配置信息
String content = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);//设置listener变化配置内容
} catch (NacosException ioe) {
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
//判断监听器md5是否相同,不同更新最新md5
//有变化这发起listener的receiveConfigInfo通知
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
} catch (Throwable e) {
} finally {
//再次执行任务,实现循环
executorService.execute(this);
}
}
}
checkUpdateDataIds
代码中调用 checkUpdateConfigStr
/**
* 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
*/
//probeUpdateString以dataId group md5 tenant命名
List<String> checkUpdateConfigStr(String probeUpdateString,
boolean isInitializingCacheList) {
List<String> params =
Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
long timeout = TimeUnit.SECONDS.toMillis(30L);// 30秒超时
List<String> headers = new ArrayList<String>(2);
headers.add("Long-Pulling-Timeout");
headers.add("" + timeout);
// told server do not hang me up if new initializing cacheData added in
// 初始化时直接返回不等待,具体看服务端实现
if (isInitializingCacheList) {
headers.add("Long-Pulling-Timeout-No-Hangup");
headers.add("true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
//调用nacos Api Constants.CONFIG_CONTROLLER_PATH + "/listener 获取服务器配置更新信息
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
agent.getEncode(), timeout);
if (HttpURLConnection.HTTP_OK == result.code) {
setHealthServer(true);
// 返回有跟新的groupkey
return parseUpdateDataIdResponse(result.content);
} else {
setHealthServer(false);
}
} catch (IOException e) {
}
return Collections.emptyList();
}
checkLocalConfig
方法
private void checkLocalConfig(CacheData cacheData) {
final String dataId = cacheData.dataId;
final String group = cacheData.group;
final String tenant = cacheData.tenant;
//获取本地缓存的文件
//LOCAL_SNAPSHOT_PATH = //System.getProperty("JM.SNAPSHOT.PATH",System.getProperty("user.home")) + //File.separator+ "nacos" + File.separator + "config/data"下是否有缓存;
File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
String md5 = MD5.getInstance().getMD5String(content);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
return;
}
// 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
cacheData.setUseLocalConfigInfo(false);
return;
}
// 有变更
if (cacheData.isUseLocalConfigInfo() && path.exists()
&& cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
String md5 = MD5.getInstance().getMD5String(content);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
return;
}
}
服务端代码导读
API接口/nacos/v1/cs/configs/listener,接口代码
@RequestMapping(value = "/listener", method = RequestMethod.POST)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
// client传过来的groupkey
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// do long-polling
//inner为ConfigServletInner
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
ConfigServletInner
实现
public class ConfigServletInner {
@Autowired
private LongPollingService longPollingService;
@Autowired
private PersistService persistService;
private static final int TRY_GET_LOCK_TIMES = 9;
private static final int START_LONGPOLLING_VERSION_NUM = 204;
/**
* 轮询接口
*/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize)
throws IOException, ServletException {
// 长轮询
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
// ...省略短轮询代码
return HttpServletResponse.SC_OK + "";
}
longPollingService.addLongPollingClient
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
/**
* 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance
*/
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
} else {
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// 一定要由HTTP线程调用,否则离开后容器会立即发送响应
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
asyncContext.setTimeout(0L);
// 延迟30s-delayTime执行,再看ClientLongPolling代码
scheduler.execute(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
ClientLongPolling
代码
ClientLongPolling(AsyncContext ac, Map<String, String> clientMd5Map, String ip, int probeRequestSize,long timeoutTime, String appName, String tag) {
this.asyncContext = ac;
this.clientMd5Map = clientMd5Map;
this.probeRequestSize = probeRequestSize;
this.createTime = System.currentTimeMillis();
this.ip = ip;
this.timeoutTime = timeoutTime;
this.appName = appName;
this.tag = tag;
}
/**
*其中final Queue<ClientLongPolling> allSubs; 是一个队列
* 这个队列维持客户端的请求任务,当调用配置修改api时会触发入队
*
*/
public void run() {
asyncTimeoutFuture = scheduler.schedule(new Runnable() {
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
//删除订阅关系
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
// 通过比较md5值获取配置变更groupkey
List<String> changedGroups = MD5Util.compareMd5(
(HttpServletRequest)asyncContext.getRequest(),
(HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
sendResponse(null);
}
} catch (Throwable t) {
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this);
}
配置修改API
@RequestMapping(method = RequestMethod.POST)
@ResponseBody
public Boolean publishConfig(...)
throws NacosException {
//...
final Timestamp time = TimeUtils.getCurrentTime();
String betaIps = request.getHeader("betaIps");
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
// 关键查看这个方法
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
} else { // beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
}
ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
return true;
}
EventDispatcher.fireEvent
static public void fireEvent(Event event) {
if (null == event) {
throw new IllegalArgumentException();
}
//从linstener中获取变更事件触发实例
for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
log.error(e.toString(), e);
}
}
}
而listeners维护在Entry中,Entry重LISTENER_HUB中获取,而LISTENER_HUB则通过addEventListener添加linstener
// AbstractEventListener的构造方法会触发 EventDispatcher.addEventListener。 因此查看
//AbstractEventListener的实现
static public abstract class AbstractEventListener {
public AbstractEventListener() {
EventDispatcher.addEventListener(this);
}
}
查看LongPollingService
,当客户端添加了linstener时allSub维护的订阅关系,而配置变更时会触发onEvent事件,onEvent执行DataChangeTask任务。
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// ignore
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
DataChangeTask
实现
public void run() {
try {
ConfigService.getContentBetaMd5(groupKey);
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
//找到客户端订阅的groupkey任务
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// ...
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // 删除订阅关系
//通过长轮询,返回变更消息通知客户端
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Throwable t) {
}
}