目标
- soul 目录结构介绍
- soul-admin 与 soul-bootstrap数据同步之websocket
- soul-admin websocket server 启动流程
- soul-bootstrap websocket client 启动流程
- Http Demo 运行步骤介绍
- 三方RestController接口如何上报到soul网关后台
- Http 自身服务访问验证
- 使用divide插件转发体验
- 负载均衡演示
- 防火墙waf插件演示
- 总结
soul 目录结构介绍
- soul-admin: soul 插件元信息配置管理服务,内部包含了soul-dashboard交互页面
- soul-bootstrap: soul 网关启动程序入口
- soul-client: soul 数据转发客户端包含dubbo(alibaba,apache)、http、sofa-rpc、tars
- soul-common: soul整个系统用到的一些基础工具配置异常和数据结构定义类
- soul-dashboard: soul-admin的前端项目
- soul-dist: soul-admin ,soul-bootstrap 打包脚本工具
- soul-example: 包含dubbo,eureka,http,sofa,springcloud,tars模拟三方真实测试服务
- soul-metrics: soul metircs 监控插件整合服务
- soul-plugin: soul核心接入的所有插件服务
- soul-register-center: soul注册中心的客户端与服务端目前包含http,zookeeper
- soul-spi: soul定义的spi接口
- soul-spring-boot-starter: soul自定义的boot starter,目前包含client,gateway(soul-web),plugin,sync-data-center
- soul-sync-data-center: soul数据同步中心目前包含http,nacos,websocket,zookeeper
- soul-web: soul gateway 核心处理请求相关的配置,包括WebHandler处理请求的入口,请求的Filter过滤等等
soul-admin 与 soul-bootstrap数据同步之websocket
- soul为了实现高吞吐,高性能,从方方面面进行优化,包括编码方面采用响应式异步流编程,数据流同步采用JVM缓存与数据库结合方式。至于同步采用的方式主要有以下几种,http长轮询、nacos、websocket、zookeeper。下面重点介绍下websocket。
- websocket是一种网络传输协议,可在单个TCP连接上进行全双工通信,位于OSI模型的应用层。
soul-admin websocket server 启动流程
- pom 文件中引入 websocket boot starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
- yml配置soul.sysnc.websocket.enabled = true 注入 websocket server Bean
@Configuration
@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {
/**
* Config event listener data changed listener.
*
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(WebsocketDataChangedListener.class)
public DataChangedListener websocketDataChangedListener() {
// 实例化 数据更变监听器
return new WebsocketDataChangedListener();
}
/**
* Websocket collector websocket collector.
*
* @return the websocket collector
*/
@Bean
@ConditionalOnMissingBean(WebsocketCollector.class)
public WebsocketCollector websocketCollector() {
// 实例化 webSocket Server
return new WebsocketCollector();
}
/**
* Server endpoint exporter server endpoint exporter.
*
* @return the server endpoint exporter
*/
@Bean
@ConditionalOnMissingBean(ServerEndpointExporter.class)
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
- 采用注解@ServerEndPoint("/websocket")配置WebSocket Server
- onOpen 有客户端连接保存会话
- onMessage 客户端有消息发送这里会触发onMessage进行接收数据
- onClose 只要C/S连接断开就会触发onClose,一般都会关闭连接
小结
soul-admin websocket server 实例化完成,等待网关客户端连接同时也在等基础数据发生变化执行同步操作,至于基础数据变化,到websocket服务端感知在发送到客户端,我们后续会介绍。
soul-bootstrap websocket client 启动流程
- 自定义boot starter 注入 bean
soul 的外围组件包括插件,各种数据同步方式都是采用boot starter 自动注入,所有的starter 都是放在soul-spring-boot-starter目录下面
根据配置的urls创建websocket客户端
分析下websocket客户端创建流程
@Configuration
// classPath 中存在 WebsocketSyncDataService.class才会初始化此类
@ConditionalOnClass(WebsocketSyncDataService.class)
// yml 配置了 soul.sync.websocket.urls 才会初始化此类,与上面条件成与关系
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {
/**
* Websocket sync data service.
*
* @param websocketConfig the websocket config
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @return the sync data service
*/
// 创建 websocket 客户端 可以支持多个服务端连接方式
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
/**
* Config websocket config.
*
* @return the websocket config
*/
// 读取用户配置的websocket urls,支持配置多个 soul admin url
@Bean
@ConfigurationProperties(prefix = "soul.sync.websocket")
public WebsocketConfig websocketConfig() {
return new WebsocketConfig();
}
}
@Data
public class WebsocketConfig {
/**
* if have more soul admin url,please config like this.
* 127.0.0.1:8888,127.0.0.1:8889
*/
private String urls;
}
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
// 取出不同的soul admin 服务端url
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
// 根据 urls 的 个数 使用soul 自封装的线程工厂 创建连接维护线程,包括断线自动重连,源码显示 系统启动之后10启动维护线程,按照固定间隔时间执行下一次任务。具体定时任务用法,可以了解scheduleAtFixedRate
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
图示可以看出来维护线程间隔30S触发重连一次
小结
上文分别介绍了 websocket 服务端与客户端的创建初始化并且连接成功,这里面客户端采用了定时任务线程工厂,和AutoCloseable 事后自动关闭连接的方式,这里代码编写优雅,思路简单。设计不臃肿,很好。
http demo 运行步骤介绍
三方RestController接口是如何上报到soul网关后台
这里我是用传统的springmvc client进行数据的转发
- pom 引入 soul-spring-boot-starter-client-springmvc
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-client-springmvc</artifactId>
<version>${soul.version}</version>
</dependency>
目的是将本服务的基础信息Url,Port,contextPath注册到 adminUrl中心
-
SpringMvcClientBeanPostProcessor
注入
@Bean
public SpringMvcClientBeanPostProcessor springHttpClientBeanPostProcessor(final SoulSpringMvcConfig soulSpringMvcConfig) {
return new SpringMvcClientBeanPostProcessor(soulSpringMvcConfig);
}
科普下BeanPostProcessor接口(因为SpringMvcClientBeanPostProcessor是实现它的一个类),所以等Spring容器完成初始化 SoulSpringMvcConfig 之后带着参数去实例化SpringMvcClientBeanPostProcessor。
BeanPostProcessor 接口是 Spring 提供的众多接口之一,他的作用主要是如果我们需要在Spring 容器完成 Bean 的实例化、配置和其他的初始化前后添加一些自己的逻辑处理,我们就可以定义一个或者多个 BeanPostProcessor 接口的实现
-
初始化发送注册信息的基础信息到soul-admin,SoulClientController 中包含了Springmvc,SpringCloud,dubbo,sofa,tars注册中心。
public static void validate(final SoulSpringMvcConfig soulSpringMvcConfig) { String contextPath = soulSpringMvcConfig.getContextPath(); String adminUrl = soulSpringMvcConfig.getAdminUrl(); Integer port = soulSpringMvcConfig.getPort(); if (StringUtils.isNotBlank(contextPath) && StringUtils.isNotBlank(adminUrl) && port != null) { return; } String errorMsg = "spring mvc param must config contextPath, adminUrl and port"; log.error(errorMsg); throw new RuntimeException(errorMsg); }
如果注册时候携带的参数为空则报错
-
postProcessAfterInitialization - BeanPostProcessor 后置处理
@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
if (soulSpringMvcConfig.isFull()) {
return bean;
}
Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);
RestController restController = AnnotationUtils.findAnnotation(bean.getClass(), RestController.class);
RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);
if (controller != null || restController != null || requestMapping != null) {
// 使用注解类,反射出注解变量的数据发送到admin进行注册绑定
SoulSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), SoulSpringMvcClient.class);
String prePath = "";
if (Objects.nonNull(clazzAnnotation)) {
if (clazzAnnotation.path().indexOf("*") > 1) {
String finalPrePath = prePath;
executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(clazzAnnotation, finalPrePath), url,
RpcTypeEnum.HTTP));
return bean;
}
prePath = clazzAnnotation.path();
}
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
for (Method method : methods) {
SoulSpringMvcClient soulSpringMvcClient = AnnotationUtils.findAnnotation(method, SoulSpringMvcClient.class);
if (Objects.nonNull(soulSpringMvcClient)) {
String finalPrePath = prePath;
executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(soulSpringMvcClient, finalPrePath), url,
RpcTypeEnum.HTTP));
}
}
}
return bean;
}
经过debug 三方 mvc 项目的所有Bean 类都会经过这个后置处理函数,也就是对Bean 的增强处理,soul 使用了这点利用反射技术将接口的方法和参数获取调用RegisterUtils.doRegister注册到soul-admin
上图是真正发送的函数.
- ContextRegisterListener 初始化
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
if (!registered.compareAndSet(false, true)) {
return;
}
if (soulSpringMvcConfig.isFull()) {
RegisterUtils.doRegister(buildJsonParams(), url, RpcTypeEnum.HTTP);
}
}
这里如果full为ture那么网关就会代理整个三方应用接口
小结
到这里我们很清楚的知道soul采用注解元编程方式加上bean注册流程特点将我们要代理的接口上报到我们admin 管理中心,至于接口到管理中心之后数据流怎么流向,我们后面继续介绍。
http 自身服务访问验证
三方服务的Port是8188我这里直接访问后台是通的
使用divide插件转发体验
后面使用代理服务器已/http作为contextPath向真实服务发送请求
-
soul-bootstrap 代理转发日志
通过soul网关代理转发成功。关于为什么我们改变了端口和请求URL 还可以命令真实的服务,等我后面讲解管理处理请求流程时候,再详细介绍。下面我们体验下负载均衡和防火墙功能
负载均衡的演示
-
本地演示,需要Intellij Idea -> Edit Configuration 把 Allow parallel run 打上勾,允许程序多开
我把端口改成8189启动成功日志输出。同时在会发现你的8189服务也注册到soul-admin上了。
配置参数中的weight就是代表权重,我们可以根据自己服务的资源来指定soul网关将请求分别命中到对应的真实服务上。下面我来演示:
- 8188服务器权重为1 ,8189权重为100
现象是请求都命中到权重为100的服务器上了
- 8188服务器权重为50 ,8189权重为50
随着访问次数的增加基本上请求是平分到各台服务器
防火墙waf插件演示
- 打开waf 防火墙配置
- 配置waf的selector
防火墙主要拦截header中a match a 的请求
- 配置规则,也就是匹配到请求之后应该怎么处理,所谓的handler
我们的处理就是拒绝转发请求,同时给调用方返回400错误码
- 结果验证
总结
今天介绍了soul admin 与 soul bootstrap采用websocket作为数据同步的方式。并分别介绍了各自的websocket启动原理与流程。然后重点介绍了soul 是如何结合Spring的Bean加载机制以及注解编程,将三方应用的接口注册到admin。最后演示了Divide插件实现Http转发,以及负载均衡和防火墙插件,后续详细深入如何实现负载均衡,和防火墙。下一章将给大家分析soul的数据流的操作。