写在前面
apollo是携程开源的配置中心中间件,目前在github上已经拥有23k+的star,非常多的公司也引用了apollo作为配置中心。
首先放上apollo的架构图,网上有人认为apollo是实习生用来练手写的,这当然是玩笑话,但也说明了apollo架构的复杂与难懂。当然了,如果你对apollo的源码理解清楚后,一定能理解此架构的用意。
app cluster namespace item的创建与release发布
用过apollo的都知道,发布一个配置,都需要先经过app->cluster->namespace->item的创建。
先看下app的创建逻辑,是在portal的AppController控制器里面,显而易见的是AppController是一个spring mvc的controller,create方法对应了app的创建,会通过注解进行权限的校验,将app数据保存在数据库,发送app创建消息。
@PreAuthorize(value = "@permissionValidator.hasCreateApplicationPermission()")
@PostMapping
public App create(@Valid @RequestBody AppModel appModel) {
App app = transformToApp(appModel);
App createdApp = appService.createAppInLocal(app);
publisher.publishEvent(new AppCreationEvent(createdApp));
Set<String> admins = appModel.getAdmins();
if (!CollectionUtils.isEmpty(admins)) {
rolePermissionService
.assignRoleToUsers(RoleUtils.buildAppMasterRoleName(createdApp.getAppId()),
admins, userInfoHolder.getUser().getUserId());
}
return createdApp;
}
在CreationListener里面,可以看到向spring注册了消息监听器,因此当app创建时发送的消息,调用onAppCreationEvent方法,进而调用appAPI.createApp向adminservice发送http请求。
@EventListener
public void onAppCreationEvent(AppCreationEvent event) {
AppDTO appDTO = BeanUtils.transform(AppDTO.class, event.getApp());
List<Env> envs = portalSettings.getActiveEnvs();
for (Env env : envs) {
try {
appAPI.createApp(env, appDTO);
} catch (Throwable e) {
logger.error("Create app failed. appId = {}, env = {})", appDTO.getAppId(), env, e);
Tracer.logError(String.format("Create app failed. appId = %s, env = %s", appDTO.getAppId(), env), e);
}
}
}
来到adminservice的AppController,http请求使用create方法处理,一样的,会创建app,默认cluster,默认namespace。
@PostMapping("/apps")
public AppDTO create(@Valid @RequestBody AppDTO dto) {
App entity = BeanUtils.transform(App.class, dto);
App managedEntity = appService.findOne(entity.getAppId());
if (managedEntity != null) {
throw new BadRequestException("app already exist.");
}
entity = adminService.createNewApp(entity);
return BeanUtils.transform(AppDTO.class, entity);
}
@Transactional
public App createNewApp(App app) {
String createBy = app.getDataChangeCreatedBy();
App createdApp = appService.save(app);
String appId = createdApp.getAppId();
appNamespaceService.createDefaultAppNamespace(appId, createBy);
clusterService.createDefaultCluster(appId, createBy);
namespaceService.instanceOfAppNamespaces(appId, ConfigConsts.CLUSTER_NAME_DEFAULT, createBy);
return app;
}
到了cluster的创建,由portal的ClusterController处理,createCluster->clusterService.createCluster->clusterAPI.create会向adminservice发送http请求。
@PreAuthorize(value = "@consumerPermissionValidator.hasCreateClusterPermission(#request, #appId)")
@PostMapping(value = "apps/{appId}/clusters")
public OpenClusterDTO createCluster(@PathVariable String appId, @PathVariable String env,
@Valid @RequestBody OpenClusterDTO cluster, HttpServletRequest request) {
if (!Objects.equals(appId, cluster.getAppId())) {
throw new BadRequestException(String.format(
"AppId not equal. AppId in path = %s, AppId in payload = %s", appId, cluster.getAppId()));
}
String clusterName = cluster.getName();
String operator = cluster.getDataChangeCreatedBy();
RequestPrecondition.checkArguments(!StringUtils.isContainEmpty(clusterName, operator),
"name and dataChangeCreatedBy should not be null or empty");
if (!InputValidator.isValidClusterNamespace(clusterName)) {
throw new BadRequestException(
String.format("Invalid ClusterName format: %s", InputValidator.INVALID_CLUSTER_NAMESPACE_MESSAGE));
}
if (userService.findByUserId(operator) == null) {
throw new BadRequestException("User " + operator + " doesn't exist!");
}
ClusterDTO toCreate = OpenApiBeanUtils.transformToClusterDTO(cluster);
ClusterDTO createdClusterDTO = clusterService.createCluster(Env.fromString(env), toCreate);
return OpenApiBeanUtils.transformFromClusterDTO(createdClusterDTO);
}
public List<ClusterDTO> findClusters(Env env, String appId) {
return clusterAPI.findClustersByApp(appId, env);
}
public ClusterDTO createCluster(Env env, ClusterDTO cluster) {
if (!clusterAPI.isClusterUnique(cluster.getAppId(), env, cluster.getName())) {
throw new BadRequestException(String.format("cluster %s already exists.", cluster.getName()));
}
ClusterDTO clusterDTO = clusterAPI.create(env, cluster);
Tracer.logEvent(TracerEventType.CREATE_CLUSTER, cluster.getAppId(), "0", cluster.getName());
return clusterDTO;
}
再到adminservice的ClusterController,http请求由create方法处理,同样是将cluster写入数据库。
@PostMapping("/apps/{appId}/clusters")
public ClusterDTO create(@PathVariable("appId") String appId,
@RequestParam(value = "autoCreatePrivateNamespace", defaultValue = "true") boolean autoCreatePrivateNamespace,
@Valid @RequestBody ClusterDTO dto) {
Cluster entity = BeanUtils.transform(Cluster.class, dto);
Cluster managedEntity = clusterService.findOne(appId, entity.getName());
if (managedEntity != null) {
throw new BadRequestException("cluster already exist.");
}
if (autoCreatePrivateNamespace) {
entity = clusterService.saveWithInstanceOfAppNamespaces(entity);
} else {
entity = clusterService.saveWithoutInstanceOfAppNamespaces(entity);
}
return BeanUtils.transform(ClusterDTO.class, entity);
}
继续来到namespace的创建,由portal的NamespaceController处理,createNamespace->namespaceService.createNamespace->namespaceAPI.createNamespace会向adminservice发送http请求。
@PreAuthorize(value = "@permissionValidator.hasCreateNamespacePermission(#appId)")
@PostMapping("/apps/{appId}/namespaces")
public ResponseEntity<Void> createNamespace(@PathVariable String appId,
@RequestBody List<NamespaceCreationModel> models) {
checkModel(!CollectionUtils.isEmpty(models));
String namespaceName = models.get(0).getNamespace().getNamespaceName();
String operator = userInfoHolder.getUser().getUserId();
roleInitializationService.initNamespaceRoles(appId, namespaceName, operator);
roleInitializationService.initNamespaceEnvRoles(appId, namespaceName, operator);
for (NamespaceCreationModel model : models) {
NamespaceDTO namespace = model.getNamespace();
RequestPrecondition.checkArgumentsNotEmpty(model.getEnv(), namespace.getAppId(),
namespace.getClusterName(), namespace.getNamespaceName());
try {
namespaceService.createNamespace(Env.valueOf(model.getEnv()), namespace);
} catch (Exception e) {
logger.error("create namespace fail.", e);
Tracer.logError(
String.format("create namespace fail. (env=%s namespace=%s)", model.getEnv(),
namespace.getNamespaceName()), e);
}
}
namespaceService.assignNamespaceRoleToOperator(appId, namespaceName,userInfoHolder.getUser().getUserId());
return ResponseEntity.ok().build();
}
public NamespaceDTO createNamespace(Env env, NamespaceDTO namespace) {
if (StringUtils.isEmpty(namespace.getDataChangeCreatedBy())) {
namespace.setDataChangeCreatedBy(userInfoHolder.getUser().getUserId());
}
namespace.setDataChangeLastModifiedBy(userInfoHolder.getUser().getUserId());
NamespaceDTO createdNamespace = namespaceAPI.createNamespace(env, namespace);
Tracer.logEvent(TracerEventType.CREATE_NAMESPACE,
String.format("%s+%s+%s+%s", namespace.getAppId(), env, namespace.getClusterName(),
namespace.getNamespaceName()));
return createdNamespace;
}
来到adminservice的NamespaceController,http请求由create方法处理,将namespace落库。
@PostMapping("/apps/{appId}/clusters/{clusterName}/namespaces")
public NamespaceDTO create(@PathVariable("appId") String appId,
@PathVariable("clusterName") String clusterName,
@Valid @RequestBody NamespaceDTO dto) {
Namespace entity = BeanUtils.transform(Namespace.class, dto);
Namespace managedEntity = namespaceService.findOne(appId, clusterName, entity.getNamespaceName());
if (managedEntity != null) {
throw new BadRequestException("namespace already exist.");
}
entity = namespaceService.save(entity);
return BeanUtils.transform(NamespaceDTO.class, entity);
}
最后是item的创建,由portal的ItemController处理,createItem->itemService.createItem->itemAPI.createItem会向adminservice发送http请求。
@PreAuthorize(value = "@consumerPermissionValidator.hasModifyNamespacePermission(#request, #appId, #namespaceName, #env)")
@PostMapping(value = "/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/items")
public OpenItemDTO createItem(@PathVariable String appId, @PathVariable String env,
@PathVariable String clusterName, @PathVariable String namespaceName,
@RequestBody OpenItemDTO item, HttpServletRequest request) {
RequestPrecondition.checkArguments(
!StringUtils.isContainEmpty(item.getKey(), item.getDataChangeCreatedBy()),
"key and dataChangeCreatedBy should not be null or empty");
if (userService.findByUserId(item.getDataChangeCreatedBy()) == null) {
throw new BadRequestException("User " + item.getDataChangeCreatedBy() + " doesn't exist!");
}
if(!StringUtils.isEmpty(item.getComment()) && item.getComment().length() > 64){
throw new BadRequestException("Comment length should not exceed 64 characters");
}
ItemDTO toCreate = OpenApiBeanUtils.transformToItemDTO(item);
//protect
toCreate.setLineNum(0);
toCreate.setId(0);
toCreate.setDataChangeLastModifiedBy(toCreate.getDataChangeCreatedBy());
toCreate.setDataChangeLastModifiedTime(null);
toCreate.setDataChangeCreatedTime(null);
ItemDTO createdItem = itemService.createItem(appId, Env.fromString(env),
clusterName, namespaceName, toCreate);
return OpenApiBeanUtils.transformFromItemDTO(createdItem);
}
public ItemDTO createItem(String appId, Env env, String clusterName, String namespaceName, ItemDTO item) {
NamespaceDTO namespace = namespaceAPI.loadNamespace(appId, env, clusterName, namespaceName);
if (namespace == null) {
throw new BadRequestException(
"namespace:" + namespaceName + " not exist in env:" + env + ", cluster:" + clusterName);
}
item.setNamespaceId(namespace.getId());
ItemDTO itemDTO = itemAPI.createItem(appId, env, clusterName, namespaceName, item);
Tracer.logEvent(TracerEventType.MODIFY_NAMESPACE, String.format("%s+%s+%s+%s", appId, env, clusterName, namespaceName));
return itemDTO;
}
来到adminservice的ItemController,http请求由create方法处理,将item落库。
@PreAcquireNamespaceLock
@PostMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/items")
public ItemDTO create(@PathVariable("appId") String appId,
@PathVariable("clusterName") String clusterName,
@PathVariable("namespaceName") String namespaceName, @RequestBody ItemDTO dto) {
Item entity = BeanUtils.transform(Item.class, dto);
ConfigChangeContentBuilder builder = new ConfigChangeContentBuilder();
Item managedEntity = itemService.findOne(appId, clusterName, namespaceName, entity.getKey());
if (managedEntity != null) {
throw new BadRequestException("item already exists");
}
entity = itemService.save(entity);
builder.createItem(entity);
dto = BeanUtils.transform(ItemDTO.class, entity);
Commit commit = new Commit();
commit.setAppId(appId);
commit.setClusterName(clusterName);
commit.setNamespaceName(namespaceName);
commit.setChangeSets(builder.build());
commit.setDataChangeCreatedBy(dto.getDataChangeLastModifiedBy());
commit.setDataChangeLastModifiedBy(dto.getDataChangeLastModifiedBy());
commitService.save(commit);
return dto;
}
好了,可以看到流程大同小异,都是各种落库,完成了app->cluster->namespace->item的创建。别觉得无聊,接下来走向关键的发布流程。
还是先到portal的ReleaseController,看到发布是由createRelease进行处理的,首先一样会通过releaseService.publish->releaseAPI.createRelease会向adminservice发送http请求。
@PreAuthorize(value = "@permissionValidator.hasReleaseNamespacePermission(#appId, #namespaceName, #env)")
@PostMapping(value = "/apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces/{namespaceName}/releases")
public ReleaseDTO createRelease(@PathVariable String appId,
@PathVariable String env, @PathVariable String clusterName,
@PathVariable String namespaceName, @RequestBody NamespaceReleaseModel model) {
model.setAppId(appId);
model.setEnv(env);
model.setClusterName(clusterName);
model.setNamespaceName(namespaceName);
if (model.isEmergencyPublish() && !portalConfig.isEmergencyPublishAllowed(Env.valueOf(env))) {
throw new BadRequestException(String.format("Env: %s is not supported emergency publish now", env));
}
ReleaseDTO createdRelease = releaseService.publish(model);
ConfigPublishEvent event = ConfigPublishEvent.instance();
event.withAppId(appId)
.withCluster(clusterName)
.withNamespace(namespaceName)
.withReleaseId(createdRelease.getId())
.setNormalPublishEvent(true)
.setEnv(Env.valueOf(env));
publisher.publishEvent(event);
return createdRelease;
}
public ReleaseDTO publish(NamespaceReleaseModel model) {
Env env = model.getEnv();
boolean isEmergencyPublish = model.isEmergencyPublish();
String appId = model.getAppId();
String clusterName = model.getClusterName();
String namespaceName = model.getNamespaceName();
String releaseBy = StringUtils.isEmpty(model.getReleasedBy()) ?
userInfoHolder.getUser().getUserId() : model.getReleasedBy();
ReleaseDTO releaseDTO = releaseAPI.createRelease(appId, env, clusterName, namespaceName,
model.getReleaseTitle(), model.getReleaseComment(),
releaseBy, isEmergencyPublish);
Tracer.logEvent(TracerEventType.RELEASE_NAMESPACE,
String.format("%s+%s+%s+%s", appId, env, clusterName, namespaceName));
return releaseDTO;
}
来到adminservice的ReleaseController,http请求由publish方法处理,将release落库。然后发送此release的releaseMessage。
@Transactional
@PostMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases")
public ReleaseDTO publish(@PathVariable("appId") String appId,
@PathVariable("clusterName") String clusterName,
@PathVariable("namespaceName") String namespaceName,
@RequestParam("name") String releaseName,
@RequestParam(name = "comment", required = false) String releaseComment,
@RequestParam("operator") String operator,
@RequestParam(name = "isEmergencyPublish", defaultValue = "false") boolean isEmergencyPublish) {
Namespace namespace = namespaceService.findOne(appId, clusterName, namespaceName);
if (namespace == null) {
throw new NotFoundException(String.format("Could not find namespace for %s %s %s", appId,
clusterName, namespaceName));
}
Release release = releaseService.publish(namespace, releaseName, releaseComment, operator, isEmergencyPublish);
//send release message
Namespace parentNamespace = namespaceService.findParentNamespace(namespace);
String messageCluster;
if (parentNamespace != null) {
messageCluster = parentNamespace.getClusterName();
} else {
messageCluster = clusterName;
}
messageSender.sendMessage(ReleaseMessageKeyGenerator.generate(appId, messageCluster, namespaceName),
Topics.APOLLO_RELEASE_TOPIC);
return BeanUtils.transform(ReleaseDTO.class, release);
}
看一下messageSender.sendMessage的处理逻辑,release是包含了app->cluster->namespace的所有item,因此releaseMessageKey是由app、cluster、namespace拼接而成的,能够定位一条release数据。
public static String generate(String appId, String cluster, String namespace) {
return STRING_JOINER.join(appId, cluster, namespace);
}
继续,releaseMessage同样会落库获取到id,然后放入toClean队列进行处理。
@Override
@Transactional
public void sendMessage(String message, String channel) {
logger.info("Sending message {} to channel {}", message, channel);
if (!Objects.equals(channel, Topics.APOLLO_RELEASE_TOPIC)) {
logger.warn("Channel {} not supported by DatabaseMessageSender!", channel);
return;
}
Tracer.logEvent("Apollo.AdminService.ReleaseMessage", message);
Transaction transaction = Tracer.newTransaction("Apollo.AdminService", "sendMessage");
try {
ReleaseMessage newMessage = releaseMessageRepository.save(new ReleaseMessage(message));
toClean.offer(newMessage.getId());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
logger.error("Sending message to database failed", ex);
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
toclean其实就是将相同releaseMessageKey的releaseMessage删掉。让数据库只有一个releaseMessage对应最新的release。
private void cleanMessage(Long id) {
//double check in case the release message is rolled back
ReleaseMessage releaseMessage = releaseMessageRepository.findById(id).orElse(null);
if (releaseMessage == null) {
return;
}
boolean hasMore = true;
while (hasMore && !Thread.currentThread().isInterrupted()) {
List<ReleaseMessage> messages = releaseMessageRepository.findFirst100ByMessageAndIdLessThanOrderByIdAsc(
releaseMessage.getMessage(), releaseMessage.getId());
releaseMessageRepository.deleteAll(messages);
hasMore = messages.size() == 100;
messages.forEach(toRemove -> Tracer.logEvent(
String.format("ReleaseMessage.Clean.%s", toRemove.getMessage()), String.valueOf(toRemove.getId())));
}
}
client拉取配置
client拉取配置是从ConfigService开始的,getAppConfig可以获取默认namespace下的配置,当然获取其他namespace下的配置的话,传namespace进来即可。
public static Config getAppConfig() {
return getConfig(ConfigConsts.NAMESPACE_APPLICATION);
}
public static Config getConfig(String namespace) {
return s_instance.getManager().getConfig(namespace);
}
加下来是ConfigManager的初始化,使用了synchronized版的双重检查单例模式,最后调用ApolloInjector.getInstance获取ConfigManager。
private ConfigManager getManager() {
if (m_configManager == null) {
synchronized (this) {
if (m_configManager == null) {
m_configManager = ApolloInjector.getInstance(ConfigManager.class);
}
}
}
return m_configManager;
}
继续调用ConfigManager的getConfig方法,会根据namespace获取对应的Config,如果没有Config,会先加载ConfigFactory,最后调用ConfigFactory的create方法获取Config。
@Override
public Config getConfig(String namespace) {
Config config = m_configs.get(namespace);
if (config == null) {
synchronized (this) {
config = m_configs.get(namespace);
if (config == null) {
ConfigFactory factory = m_factoryManager.getFactory(namespace);
config = factory.create(namespace);
m_configs.put(namespace, config);
}
}
}
return config;
}
来到DefaultConfigFactory的create方法,会生成RemoteConfigRepository->LocalFileConfigRepository->PropertiesConfigFile->DefaultConfig返回。
@Override
public Config create(String namespace) {
ConfigFileFormat format = determineFileFormat(namespace);
if (ConfigFileFormat.isPropertiesCompatible(format)) {
return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
}
return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
}
@Override
public ConfigFile createConfigFile(String namespace, ConfigFileFormat configFileFormat) {
ConfigRepository configRepository = createLocalConfigRepository(namespace);
switch (configFileFormat) {
case Properties:
return new PropertiesConfigFile(namespace, configRepository);
case XML:
return new XmlConfigFile(namespace, configRepository);
case JSON:
return new JsonConfigFile(namespace, configRepository);
case YAML:
return new YamlConfigFile(namespace, configRepository);
case YML:
return new YmlConfigFile(namespace, configRepository);
case TXT:
return new TxtConfigFile(namespace, configRepository);
}
return null;
}
LocalFileConfigRepository createLocalConfigRepository(String namespace) {
if (m_configUtil.isInLocalMode()) {
logger.warn(
"==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
namespace);
return new LocalFileConfigRepository(namespace);
}
return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
}
RemoteConfigRepository createRemoteConfigRepository(String namespace) {
return new RemoteConfigRepository(namespace);
}
getProperty方法其实就是获取配置的方法,从System.getProperty->m_configProperties->System.getenv->m_resourceProperties.getProperty的顺序获取配置。
@Override
public String getProperty(String key, String defaultValue) {
// step 1: check system properties, i.e. -Dkey=value
String value = System.getProperty(key);
// step 2: check local cached properties file
if (value == null && m_configProperties.get() != null) {
value = m_configProperties.get().getProperty(key);
}
/**
* step 3: check env variable, i.e. PATH=...
* normally system environment variables are in UPPERCASE, however there might be exceptions.
* so the caller should provide the key in the right case
*/
if (value == null) {
value = System.getenv(key);
}
// step 4: check properties file from classpath
if (value == null && m_resourceProperties != null) {
value = m_resourceProperties.getProperty(key);
}
if (value == null && m_configProperties.get() == null && m_warnLogRateLimiter.tryAcquire()) {
logger.warn("Could not load config for namespace {} from Apollo, please check whether the configs are released in Apollo! Return default value now!", m_namespace);
}
return value == null ? defaultValue : value;
}
从系统配置、本地配置获取就不再详细分析,我们直接看怎么从configservice远程拉取配置。
看RemoteConfigRepository是通过一个定时拉取配置的任务保证配置的更新和一个长轮询检测配置保证配置更新的实时性。
public RemoteConfigRepository(String namespace) {
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
this.trySync();
this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh();
}
先看长轮询检测配置的实现,scheduleLongPollingRefresh会调用 remoteConfigLongPollService.submit方法。
private void scheduleLongPollingRefresh() {
remoteConfigLongPollService.submit(m_namespace, this);
}
最终会调用到doLongPollingRefresh方法,会通过本身的app,cluster,namespace,还有本地缓存的releaseMessageIdMap,发送http请求到configservice,configservice会返回需要更新的release,然后trySync拉取最新配置即可。
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
String url = null;
try {
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
}
url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
m_notifications);
logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}
transaction.addData("Url", url);
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpUtil.doGet(request, m_responseType);
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200 && response.getBody() != null) {
updateNotifications(response.getBody());
updateRemoteNotifications(response.getBody());
transaction.addData("Result", response.getBody().toString());
notify(lastServiceDto, response.getBody());
}
//try to load balance
if (response.getStatusCode() == 304 && random.nextBoolean()) {
lastServiceDto = null;
}
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
lastServiceDto = null;
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
try {
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
}
private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {
if (notifications == null || notifications.isEmpty()) {
return;
}
for (ApolloConfigNotification notification : notifications) {
String namespaceName = notification.getNamespaceName();
//create a new list to avoid ConcurrentModificationException
List<RemoteConfigRepository> toBeNotified =
Lists.newArrayList(m_longPollNamespaces.get(namespaceName));
ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);
ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();
//since .properties are filtered out by default, so we need to check if there is any listener for it
toBeNotified.addAll(m_longPollNamespaces
.get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));
for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {
try {
remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);
} catch (Throwable ex) {
Tracer.logError(ex);
}
}
}
}
public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
m_longPollServiceDto.set(longPollNotifiedServiceDto);
m_remoteMessages.set(remoteMessages);
m_executorService.submit(new Runnable() {
@Override
public void run() {
m_configNeedForceRefresh.set(true);
trySync();
}
});
}
config处理长轮询检测配置变化是在NotificationControllerV2.pollNotification处理,通过统一化watchKeys,获取到最新的ReleaseMessageList,通过对比client端的ReleaseMessageIdList,若当前已经有比client端更新的ReleaseMessage,则调用DeferredResult的setResult方法,直接返回结果。否则,先返回DeferredResult,等待直至超时,或者等待到有新的ReleaseMessage到来才返回结果。
@GetMapping
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
@RequestParam(value = "appId") String appId,
@RequestParam(value = "cluster") String cluster,
@RequestParam(value = "notifications") String notificationsAsString,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "ip", required = false) String clientIp) {
List<ApolloConfigNotification> notifications = null;
try {
notifications =
gson.fromJson(notificationsAsString, notificationsTypeReference);
} catch (Throwable ex) {
Tracer.logError(ex);
}
if (CollectionUtils.isEmpty(notifications)) {
throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
}
Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);
if (CollectionUtils.isEmpty(filteredNotifications)) {
throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
}
DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());
Set<String> namespaces = Sets.newHashSetWithExpectedSize(filteredNotifications.size());
Map<String, Long> clientSideNotifications = Maps.newHashMapWithExpectedSize(filteredNotifications.size());
for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
String normalizedNamespace = notificationEntry.getKey();
ApolloConfigNotification notification = notificationEntry.getValue();
namespaces.add(normalizedNamespace);
clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
}
}
Multimap<String, String> watchedKeysMap =
watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);
Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());
/**
* 1、set deferredResult before the check, for avoid more waiting
* If the check before setting deferredResult,it may receive a notification the next time
* when method handleMessage is executed between check and set deferredResult.
*/
deferredResultWrapper
.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
deferredResultWrapper.onCompletion(() -> {
//unregister all keys
for (String key : watchedKeys) {
deferredResults.remove(key, deferredResultWrapper);
}
logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
});
//register all keys
for (String key : watchedKeys) {
this.deferredResults.put(key, deferredResultWrapper);
}
logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
watchedKeys, appId, cluster, namespaces, dataCenter);
/**
* 2、check new release
*/
List<ReleaseMessage> latestReleaseMessages =
releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);
/**
* Manually close the entity manager.
* Since for async request, Spring won't do so until the request is finished,
* which is unacceptable since we are doing long polling - means the db connection would be hold
* for a very long time
*/
entityManagerUtil.closeEntityManager();
List<ApolloConfigNotification> newNotifications =
getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,
latestReleaseMessages);
if (!CollectionUtils.isEmpty(newNotifications)) {
deferredResultWrapper.setResult(newNotifications);
}
return deferredResultWrapper.getResult();
}
ReleaseMessageScanner会定时扫描ReleaseMessage的变化,调用ReleaseMessageListener的handleMessage处理变化,此方法会取出对应的DeferredResult,将变化设置到返回结果中返回。
private boolean scanAndSendMessages() {
//current batch is 500
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
return messageScanned == 500;
}
private void fireMessageScanned(List<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
for (ReleaseMessageListener listener : listeners) {
try {
listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
}
}
}
}
@Override
public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
String content = message.getMessage();
Tracer.logEvent("Apollo.LongPoll.Messages", content);
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
return;
}
String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
if (Strings.isNullOrEmpty(changedNamespace)) {
logger.error("message format invalid - {}", content);
return;
}
if (!deferredResults.containsKey(content)) {
return;
}
//create a new list to avoid ConcurrentModificationException
List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));
ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
configNotification.addMessage(content, message.getId());
//do async notification if too many clients
if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
largeNotificationBatchExecutorService.submit(() -> {
logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
bizConfig.releaseMessageNotificationBatch());
for (int i = 0; i < results.size(); i++) {
if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
try {
TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
} catch (InterruptedException e) {
//ignore
}
}
logger.debug("Async notify {}", results.get(i));
results.get(i).setResult(configNotification);
}
});
return;
}
logger.debug("Notify {} clients for key {}", results.size(), content);
for (DeferredResultWrapper result : results) {
result.setResult(configNotification);
}
logger.debug("Notification completed");
}
通知配置变更或者定时拉取都是通过RemoteConfigRepository的trySync->sync进行更新client配置的。
@Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
ApolloConfig previous = m_configCache.get();
ApolloConfig current = loadApolloConfig();
//reference equals means HTTP 304
if (previous != current) {
logger.debug("Remote Config refreshed!");
m_configCache.set(current);
this.fireRepositoryChange(m_namespace, this.getConfig());
}
if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
进入loadApolloConfig方法,也是通过app,clutster,namespace,变更的ReleaseMessageIdList向configservice发送http请求获取最新配置。
private ApolloConfig loadApolloConfig() {
if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter();
String secret = m_configUtil.getAccessKeySecret();
Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
long onErrorSleepTime = 0; // 0 means no sleep
Throwable exception = null;
List<ServiceDTO> configServices = getConfigServices();
String url = null;
retryLoopLabel:
for (int i = 0; i < maxRetries; i++) {
List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
Collections.shuffle(randomConfigServices);
//Access the server which notifies the client first
if (m_longPollServiceDto.get() != null) {
randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
}
for (ServiceDTO configService : randomConfigServices) {
if (onErrorSleepTime > 0) {
logger.warn(
"Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);
try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
} catch (InterruptedException e) {
//ignore
}
}
url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
dataCenter, m_remoteMessages.get(), m_configCache.get());
logger.debug("Loading config from {}", url);
HttpRequest request = new HttpRequest(url);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
transaction.addData("Url", url);
try {
HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);
m_configNeedForceRefresh.set(false);
m_loadConfigFailSchedulePolicy.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
if (response.getStatusCode() == 304) {
logger.debug("Config server responds with 304 HTTP status code.");
return m_configCache.get();
}
ApolloConfig result = response.getBody();
logger.debug("Loaded config for {}: {}", m_namespace, result);
return result;
} catch (ApolloConfigStatusCodeException ex) {
ApolloConfigStatusCodeException statusCodeException = ex;
//config not found
if (ex.getStatusCode() == 404) {
String message = String.format(
"Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
"please check whether the configs are released in Apollo!",
appId, cluster, m_namespace);
statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
message);
}
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(statusCodeException));
transaction.setStatus(statusCodeException);
exception = statusCodeException;
if(ex.getStatusCode() == 404) {
break retryLoopLabel;
}
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
exception = ex;
} finally {
transaction.complete();
}
// if force refresh, do normal sleep, if normal config load, do exponential sleep
onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
m_loadConfigFailSchedulePolicy.fail();
}
}
String message = String.format(
"Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s",
appId, cluster, m_namespace, url);
throw new ApolloConfigException(message, exception);
}
来到configservice,获取配置的请求在ConfigController的queryConfig方法处理。根据client的参数获取最新的Release,组成ApolloConfig返回。
@GetMapping(value = "/{appId}/{clusterName}/{namespace:.+}")
public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName,
@PathVariable String namespace,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "releaseKey", defaultValue = "-1") String clientSideReleaseKey,
@RequestParam(value = "ip", required = false) String clientIp,
@RequestParam(value = "messages", required = false) String messagesAsString,
HttpServletRequest request, HttpServletResponse response) throws IOException {
String originalNamespace = namespace;
//strip out .properties suffix
namespace = namespaceUtil.filterNamespaceName(namespace);
//fix the character case issue, such as FX.apollo <-> fx.apollo
namespace = namespaceUtil.normalizeNamespace(appId, namespace);
if (Strings.isNullOrEmpty(clientIp)) {
clientIp = tryToGetClientIp(request);
}
ApolloNotificationMessages clientMessages = transformMessages(messagesAsString);
List<Release> releases = Lists.newLinkedList();
String appClusterNameLoaded = clusterName;
if (!ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
Release currentAppRelease = configService.loadConfig(appId, clientIp, appId, clusterName, namespace,
dataCenter, clientMessages);
if (currentAppRelease != null) {
releases.add(currentAppRelease);
//we have cluster search process, so the cluster name might be overridden
appClusterNameLoaded = currentAppRelease.getClusterName();
}
}
//if namespace does not belong to this appId, should check if there is a public configuration
if (!namespaceBelongsToAppId(appId, namespace)) {
Release publicRelease = this.findPublicConfig(appId, clientIp, clusterName, namespace,
dataCenter, clientMessages);
if (!Objects.isNull(publicRelease)) {
releases.add(publicRelease);
}
}
if (releases.isEmpty()) {
response.sendError(HttpServletResponse.SC_NOT_FOUND,
String.format(
"Could not load configurations with appId: %s, clusterName: %s, namespace: %s",
appId, clusterName, originalNamespace));
Tracer.logEvent("Apollo.Config.NotFound",
assembleKey(appId, clusterName, originalNamespace, dataCenter));
return null;
}
auditReleases(appId, clusterName, dataCenter, clientIp, releases);
String mergedReleaseKey = releases.stream().map(Release::getReleaseKey)
.collect(Collectors.joining(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR));
if (mergedReleaseKey.equals(clientSideReleaseKey)) {
// Client side configuration is the same with server side, return 304
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
Tracer.logEvent("Apollo.Config.NotModified",
assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));
return null;
}
ApolloConfig apolloConfig = new ApolloConfig(appId, appClusterNameLoaded, originalNamespace,
mergedReleaseKey);
apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));
Tracer.logEvent("Apollo.Config.Found", assembleKey(appId, appClusterNameLoaded,
originalNamespace, dataCenter));
return apolloConfig;
}
最后总结
1.携程Apollo通过Release承载一个namespace的所有配置,给配置降维,带来了更好的维护性。
2.client端通过定时拉取和长轮询检测配置变化,做到近实时获取到最新配置,同时拥有了兜底获取配置的手段。
3.长轮询检测配置变化configservice使用DeferredResult做异步处理,内部的异步处理在外部看来是同步返回。
4.无论是检测配置变化的ReleaseMessage还是拉取最新配置的Release,都采用了一些巧妙的维护与使用缓存手段,避免流量打到数据库。