vert.x源码解析之redis集成

@[toc]

前言

bala bala...

vertx是什么

vert.x是一个全异步框架,通过内部的event bus跟event loop做到了处处皆异步。关于里面的细节,后面我会出一篇文章详细跟大家探讨...

为什么要用vertx

充分利用机器性能,提升应用的吞吐量上限。

vertx-redis-client是什么

通俗来讲,vertx-redis提供了一个全异步的可配置的redis客户端

vertx-redis-client有哪些特性

相对常规的jedis或者Lettuce来说,这里的特性其实就是vertx贯穿全局的特性:异步。当我们调用api发送一条redis指令的时候,可以不用等待redis的响应,只需要绑定一个回调函数,程序就可以继续往下执行。当拿到redis的响应之后,就会触发这个回调。当然,像这种编程方式,需要把业务依赖关系处理得很明确,因为我们必须把依赖这个redis响应的所有处理都移到这个回调函数里去。

怎么使用vertx-redis-client

添加vertx-redis-client依赖

//gradle
compile("io.vertx:vertx-redis-client:$version")
//maven
<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-redis-client</artifactId>
  <vertion></version>
</dependency>

RedisConnection

RedisConnection是vertx-redis-client里暴露出来可操作redis的最基本的一组api了,使用如下

  Redis
    .createClient(Vertx.vertx(), "redis://ip:port")
    .connect(onConnect -> {
       //连接成功
       if (onConnect.succeeded()) {
           //获取连接实例
           RedisConnection conn = onConnect.result();
           //创建redis get指令. 
           Request command = Request.cmd(Command.GET);
           //添加指令参数aa. 最后到达redis的指令就是get aa
           command.arg("aa");
           //发送指令
           conn.send(command, resp -> {
               //执行成功
               if (resp.succeeded()) {
                   //拿到指令执行结果
                   Response result = resp.result();
               }
           });
       }
   });

这里就谈谈怎么使用吧,其实内部的实现就是通过netty包装的channel跟redis建立起连接,有兴趣的朋友可以点进去看看。

redisOptions

在介绍该redisClient各种模式之前,我们需要了解options是什么。
简单来说,redisOptions是我们用来定制该redisClient的一种方式,像之前我们通过==Redis.createClient(Vertx.vertx(), "redis://ip:port")== 这种仅仅提供一个地址的方式就建立起了一个redisClient,但是如果我们想要定制更复杂的redisClient呢?这个时候就需要用到redisOptions了

各项主要配置的含义

先大概看下redisOptions都有哪些属性:

public class RedisOptions {

  /**
   * The default redis endpoint = {@code redis://localhost:6379}
   */
  public static final String DEFAULT_ENDPOINT = "redis://localhost:6379";
  
  private RedisClientType type;
  private NetClientOptions netClientOptions;
  private List<String> endpoints;
  private int maxWaitingHandlers;
  private int maxNestedArrays;
  private String masterName;
  private RedisRole role;
  private RedisSlaves slaves;
  private String password;

  // pool related options
  private int poolCleanerInterval;
  private int maxPoolSize;
  private int maxPoolWaiting;
  private int poolRecycleTimeout;
}

type

该枚举用于指定redisClient以什么模式去连接server,这里的模式跟redis server的模式一一对应,我们在用的时候也必须注意,server是以什么模式部署的,client就以什么模式去配置。源码如下

public enum RedisClientType {

  /**
   * 默认值,单机模式
   */
  STANDALONE,

  /**
   * 哨兵
   */
  SENTINEL,

  /**
   * 集群
   */
  CLUSTER
}

这些配置决定了redisClient与server交互的方式,不同的类型会用不同的策略来应对,在我们调用Redis#creatClient的时候就会作出处理,源码如下

static Redis createClient(Vertx vertx, RedisOptions options) {
    switch (options.getType()) {
      case STANDALONE:
        return new RedisClient(vertx, options);
      case SENTINEL:
        return new RedisSentinelClient(vertx, options);
      case CLUSTER:
        return new RedisClusterClient(vertx, options);
      default:
        throw new IllegalStateException("Unknown Redis Client type: " + options.getType());
    }
  }

至于不同Client内部是怎么处理的,后面会一一介绍

masterName

这个只在Sentinel模式下有用,跟redisServer的redis_sentinel.conf里配置的masterName是一个东西,用来指明连接的是被哨兵监控的哪一个主从集群

role

这项配置只用于clientType为Sentinel的情况,主要用来指定当前的客户端到底是跟master连接还是只跟slave连接,如果配置为slave,则通过该客户端发起的所有指令都会被路由到slave节点,这意味着这个client对redisServer的所有操作都是只读的。
还有一个sentinel,这个意味着这个client只操作server端的sentinel,不会主动去获取到master或者slave的连接。按我目前看来,这个枚举值其实是给程序内部使用的,除非我们想手动实现跟哨兵的通信以及维护相应主从节点的一个failover情况,那么就可以使用这个配置。核心源码在RedisSentinelClient中,如下所示

private void createConnectionInternal(RedisOptions options, RedisRole role, Handler<AsyncResult<RedisConnection>> onCreate) {
    switch (role) {
      case SENTINEL:
        resolveClient(this::isSentinelOk, options, createAndConnect);
        break;

      case MASTER:
        resolveClient(this::getMasterFromEndpoint, options, createAndConnect);
        break;

      case SLAVE:
        resolveClient(this::getSlaveFromEndpoint, options, createAndConnect);
    }
  }

这里可以看到,根据不同的role,获取了不同类型的redis连接地址

private void getMasterFromEndpoint(String endpoint, RedisOptions options, Handler<AsyncResult<String>> handler) {
  final RedisURI uri = new RedisURI(endpoint);
  connectionManager.getConnection(context, getSentinelEndpoint(uri), null, onCreate -> {
    final RedisConnection conn = onCreate.result();
    final String masterName = options.getMasterName();
    
    // 根据masterName从sentinel获取相应主从集群的master节点信息
    conn.send(cmd(SENTINEL).arg("GET-MASTER-ADDR-BY-NAME").arg(masterName), getMasterAddrByName -> {
      //bala bala...
    });
  });
private void getSlaveFromEndpoint(String endpoint, RedisOptions options, Handler<AsyncResult<String>> handler) {
  final RedisURI uri = new RedisURI(endpoint);
    connectionManager.getConnection(context, getSentinelEndpoint(uri), null, onCreate -> {
      
      final RedisConnection conn = onCreate.result();
      final String masterName = options.getMasterName();
      // 获取masterName指定主从的slave节点
      conn.send(cmd(SENTINEL).arg("SLAVES").arg(masterName), sentinelSlaves -> {
        //...
      });
    });
  }

slaves

这项配置用来指定在集群模式下对server的读操作的行为,源码如下

public enum RedisSlaves {

  /**
   * 读操作只落在master
   */
  NEVER,

  /**
   * 读操作会随机落在master跟slave
   */
  SHARE,

  /**
   * 读操作只落在slave上
   */
  ALWAYS
}

其核心原理是通过对slave节点执行readonly命令来开启slave的查询功能,因为默认集群模式下slave是不对外提供服务的。核心源码如下

private void connect(List<String> endpoints, int index, Handler<AsyncResult<RedisConnection>> onConnect) {
    //如果RedisSlaves的值不是NEVER,就执行readonly
    connectionManager.getConnection(context, endpoints.get(index), RedisSlaves.NEVER != options.getUseSlave() ? cmd(READONLY) : null, getConnection -> {
    }
}

poolCleanerInterval

连接池空闲连接清理间隔,每次扫描时会直接将闲置的连接关闭。-1表示不开启空闲连接清理,核心源码在RedisConnectionManager#start()

synchronized void start() {
    long period = options.getPoolCleanerInterval();
    //延迟period时间后执行checkExpired()进行空闲连接清理
    this.timerID = period > 0 ? vertx.setTimer(period, id -> checkExpired(period)) : -1;
  }

poolRecycleTimeout

这个是用来控制连接池中连接在执行完命令后还能存活的时间,超过这个时间连接就会被关闭,核心源码在RedisStandaloneConnection。如下

public boolean isValid() {
    return expirationTimestamp > 0 && System.currentTimeMillis() <= expirationTimestamp;
  }

  @Override
  public void close() {
    // recycle this connection from the pool
    expirationTimestamp = recycleTimeout > 0 ? System.currentTimeMillis() + recycleTimeout : 0L;
    listener.onRecycle();
  }

maxPoolSize

连接池最大大小。只有当等待获取连接的请求数量达到maxPoolWaiting时才会创建新的连接,可以类比ThreadPoolExecutormaximumPoolSize

maxPoolWaiting

连接池等待队列大小。当我们通过RedisClient或者RedisApi去操作reids时,如果当前已无空闲的redis连接,那么这个请求就会进入连接池的等待队列中。可以类比ThreadPoolExecutorworkQueue

maxWaitingHandlers

等待执行的handler数的最大值。当我们发送完redis指令后,一般是需要对这个响应作出处理,这个过程会被包装成一个task添加到eventLoopwaitQueue中,这项配置控制的就是这个queuesize

RedisClient

redisClient本质是对上述redisConnection的再封装,提供了针对redisServer不同模式的不同处理,分为3种:单机,哨兵以及集群模式。对应到具体的class:RedisClientRedisSentinelClientRedisSentinelClient
接下来来看一下如何配置这些客户端

STANDALONE

这个是redisClient默认的一种配置模式,现在我们用前面描述的redisOptions实践一下。

    RedisOptions options = new RedisOptions();
    options.setType(RedisClientType.STANDALONE)
            //指定server地址
            .setConnectionString("redis://ip:port")
            .setMaxPoolSize(6)
            .setMaxWaitingHandlers(1024)
            .setPoolRecycleTimeout(15_000)
            .setMaxPoolWaiting(2);

    Redis client = Redis.createClient(Vertx.vertx(), options);
    //创建redis 指令. get
    Request command = Request.cmd(Command.GET);
    //添加指令参数aa. 最后到达redis的指令就是get aa
    command.arg("aa");
    //发送指令并指定一个匿名handler处理结果
    client.send(command).onComplete(event -> {
        System.out.println("the value of redis key 'aa' is"+event.result().toString());
    });

SENTINEL

这里请原谅我秀了一下我那蹩脚的英语水平...

    RedisOptions options = new RedisOptions();
    options.setType(RedisClientType.SENTINEL)
            //the master name which are monitored by sentinels
            .setMasterName("myMaster")
            .setRole(RedisRole.MASTER)
            .setMaxPoolSize(6)
            .setMaxWaitingHandlers(1024)
            .setPoolRecycleTimeout(15_000)
            .setMaxPoolWaiting(2);

    //这里只填哨兵的地址就行了
    //程序会自动从哨兵处获取节点信息
    List<String> sentinels = new ArrayList<>();
    sentinels.add("redis://ip:port");
    sentinels.add("redis://ip:port");
    options.setEndpoints(sentinels);
    
    ...

这里可以只填一个哨兵地址,由于client不会主动去连接监控同一个master的哨兵节点,所以一旦这个唯一的哨兵挂了,那么server就会处于不可用的状态。

CLUSTER

    RedisOptions options = new RedisOptions();
    options.setType(RedisClientType.CLUSTER)
            .setUseSlave(RedisSlaves.NEVER)
            .setMaxPoolSize(6)
            .setMaxWaitingHandlers(1024)
            .setPoolRecycleTimeout(15_000)
            .setMaxPoolWaiting(20);
    //添加集群节点,越多越好...
    List<String> clusters = new ArrayList<>();
    sentinels.add("redis://ip:port");
    sentinels.add("redis://ip:port");
    sentinels.add("redis://ip:port");
    options.setEndpoints(clusters);

这里也需要注意最好将cluster中所有的节点信息给添加进来。虽然连接任意一个节点,client都会通过执行cluster slots获取cluster中所有节点的信息,但是长久保持连接的只有我们在这里设置进去的节点。这意味着一旦我们手动设置的节点挂了,那么server对我们来说就不可用了

RedisApi

这个是基于RedisClient的封装,隐藏了Command的复杂细节,提供了一套可以直接调用的api。使用如下:

    Redis client = Redis.createClient(Vertx.vertx(), options);
    RedisAPI api = RedisAPI.api(client);
    api.get("aa").onComplete(event -> {
        System.out.println("the value of redis key 'aa' is" + event.result().toString());
    });

总结

全文描述了vertx-client是什么以及其相对某些redis-client所具备的特点。同时深挖了这个客户端各项配置的作用以及不同模式下的表现。
总的来说,这篇文章可以帮助大家去建立一个基于vertx的redis客户端,并且对这个客户端能够做到基本的知其然且知其所以然

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容