年后不久换了部门,一直在改Bug和优化。。。终于有了点时间,把之前漏下没记录的点慢慢补上
gateway使用ribbon作为服务调用的负载均衡中间件,根据配置的 IRule 对拉取到的服务列表进行负载
而这些真正提供服务的实例是有动态上下线的情况存在的,为了保证轮询到的服务实例能正常访问,ribbon中有一个接口
ServerListUpdater 会定期对服务列表进行更新
在使用 Eureka 作为注册中心的时候,ServerListUpdater有两个实现类:
PollingServerListUpdater :定时从注册中心拉取服务列表,如果没有配置,默认为30秒
EurekaNotificationServerListUpdater :注册中心中的服务有变动时,通知客户端,EurekaNotificationServerListUpdater是通过添加了一个监听器,当收到注册中心的通知后,做出相应的动作
PollingServerListUpdater 也是 默认的 ServerListUpdater 配置
分别看一下它们的代码实现
PollingServerListUpdater
// 从注册中心拉取服务列表的间隔
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
// 这里封装了一个进行服务拉取的任务
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//这里是进行服务列表更新的动作
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
// 使用 scheduled 进行定时拉取
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,//这个就是刚才配置的拉取间隔
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
EurekaNotificationServerListUpdater
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
this.updateListener = new EurekaEventListener() {
// 监听 Eureka发布的事件,然后拉取最新的列表
@Override
public void onEvent(EurekaEvent event) {
if (event instanceof CacheRefreshedEvent) {
if (!updateQueued.compareAndSet(false, true)) { // if an update is already queued
logger.info("an update action is already queued, returning as no-op");
return;
}
if (!refreshExecutor.isShutdown()) {
try {
refreshExecutor.submit(new Runnable() {
@Override
public void run() {
try {
//这里是进行服务列表更新的动作
updateAction.doUpdate();
lastUpdated.set(System.currentTimeMillis());
} catch (Exception e) {
logger.warn("Failed to update serverList", e);
} finally {
updateQueued.set(false);
}
}
}); // fire and forget
} catch (Exception e) {
logger.warn("Error submitting update task to executor, skipping one round of updates", e);
updateQueued.set(false); // if submit fails, need to reset updateQueued to false
}
}
else {
logger.debug("stopping EurekaNotificationServerListUpdater, as refreshExecutor has been shut down");
stop();
}
}
}
};
...以下代码省略
} else {
logger.info("Update listener already registered, no-op");
}
}
可以看到,这两个 ServerListUpdater 实现类在更新服务列表的时候,都做了同一个动作updateAction.doUpdate()
进入这个方法,发现它是一个定义在ServerListUpdater 中的一个接口
public interface ServerListUpdater {
/**
* an interface for the updateAction that actually executes a server list update
*/
public interface UpdateAction {
void doUpdate();
}
...以下代码省略
}
而它的方法实现是在 负载均衡器 DynamicServerListLoadBalancer 中定义的
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);
......代码省略......
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
// UpdateAction的方法实现,调用了另一个方法
updateListOfServers();
}
};
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
//真正进行列表更新的地方
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
......代码省略......
}
可以看到,这个 serverListImpl 就是在负载均衡器的ServerList属性,使用这个接口的getUpdatedListOfServers方法进行列表更新,因为我的项目里使用的是自己的注册中心,没有用Eureka,所以也写了一个实现类,参照了使用Eureka的情况下的默认类 DiscoveryEnabledNIWSServerList,让我们看看这个类的代码
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
//其实在更新的时候,调用的也是从注册中心拉取列表的方法
return obtainServersViaDiscovery();
}
//从Eureka拉取服务
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
......代码省略......
return serverList;
}
由此我们知道,无论配置哪个 ServerListUpdater,在更新服务列表的时候,都是调用ServerList接口进行一次服务拉取,然后更新本地的列表,只是触发的时机不同:
- PollingServerListUpdater 30秒拉取一次(时间可以修改)
- EurekaNotificationServerListUpdater 当服务更新时,通知客户端
那么这两种方法分别有什么弊端呢?
先说说PollingServerListUpdater,如果在拉取的间隔中,有服务下线了,极端情况下,原来所有的实例都不可用,换成了新的一批实例,要等到下一次拉取的时间点才会更新,这样会造成最久30秒的时间服务不可用。 比如:原来5个实例 A、B、C、D、E变成了 F、G、H、I、J,但是由于并没有到更新的时间点,ribbon保存的还是老的服务实例,而这时它们都已下线,无法提供服务。
那如果我们换成,一旦服务变更(这里是上下线都会通知)就通知客户端的 EurekaNotificationServerListUpdater 会怎么样呢,这样好像可以实时的替换成最新的可用实例,保证服务不会打到失效的实例上。可是这会有另一个问题,也是在极端情况下,如果这次通知由于网络问题,没有通知到客户端,那么这次变动过后,如果一直没有服务变更,客户端就再也不会进行服务的拉取,这个时候造成不可用的时间就难以预估了。比如:在9:00的时候,A、B、C、D、E服务全部下线,F、G、H、I、J上线,Eureka通知客户端,但网络抖动,客户端没有收到,或者客户端收到了,拉取时候失败,并没有更新本地的列表,这样只有等到下次收到通知时才会去拉取,假设接下来服务很稳定,12:00的时候才有一次更新,这样就有3个小时的服务不可用。
那么有没有什么办法既可以拥有2个实现类的优势,又摒弃它们的弊端呢?有的!
小孩子才做选择,我们大人是全都要!那就是自己写一个ServerListUpdater的实现类,然后:
- 1 启动一个定时任务,定时从注册中心拉取,而这个拉取间隔也可以根据自己的预估进行修改。
- 2 注册一个监听器,可以收到注册中心的服务变更通知
- 3 在配置中,把默认的 ServerListUpdater改成自己写的 ServerListUpdater,这样ribbonConfig类在看到有ServerListUpdater的实现类的情况下,就不会加载 Eureka的 ServerListUpdater了。
实现类的代码太长,就不贴了,逻辑很清晰,可以参考PollingServerListUpdater 和 EurekaNotificationServerListUpdater,把他们的代码照着写就行
接下来就是配置,在配置类中添加上自己的实现类
/**
* 服务更新通知机制
* @param notificationService
* @param clientConfig
* @return
*/
@Bean
public ServerListUpdater ribbonServerListUpdater(NotificationServiceImpl notificationService, IClientConfig clientConfig) {
return new PollingNotificationServerListUpdater(notificationService, clientConfig);
}
这样就实现了轮询和通知并存的形式。
打完收工!