RabbitMQ连接池——CachingConnectionFactory

因为上一个项目中使用了RabbitMQ,但是当时没有考虑过性能的问题,今天觉得好像不对劲,大量的重复建立连接,造成了很大的性能浪费,于是我就找呀找,发现Spring提供了一种RabbitMQ连接池,所以今天我们来看一下它是如何设计的。

AMQP(version~0-9-1)

什么是AMQP?

  • 首先它是一种高级消息队列协议,是使用 TCP 提供可靠投递的应用层协议。与HTTP不同的是,它定义了Server端的服务和行为(也就是AMQP模型。今天这里暂时不说),同时AMQP也是长连接的(此处不对比1.1)。简单的来理解(当然AMQP协议包含了很多东西),AMQP是一种RPC的传输机制,定义了很多消息类和方法,这些类和方法组成了AMQP命令,就像你所看到的RPC代码一样,也就是说我们可以通过调用这些方法来完成我们的任务。我给大家放一张图:


    image.png
  • Connection、Channel
    大家可以看到上面的Class,他们是做什么用的呢?我们简单来说一下:
    • Connection
      是一个socket连接,它封装了socket协议相关部分逻辑。你可以认为一个Connection就是一个Tcp连接。
    • Channel
      有些应用需要与 AMQP 代理建立多个连接。无论怎样,同时开启多个 TCP 连接都是不合适的,因为这样做会消耗掉过多的系统资源。AMQP 0-9-1 提供了通道(channels)来处理多连接,可以把通道理解成共享一个 TCP 连接的多个轻量化连接(通常每个thread创建单独的channel进行通讯)。
      下面给大家一段详细的资料,大家可以理解一下他们的不同。

A Connection represents a real TCP connection to the message broker, where as a Channel
is a virtual connection (AMPQ connection) inside it. This way you can use as many (virtual)
connections as you want inside your application without overloading the broker with TCP
connections.You can use one Channel for everything. However, if you have multiple threads,
it's suggested to use a different Channel for each thread.There is no direct relation between Channel and Queue. A Channel is used to send AMQP commands to the broker. This can be the creation of a queue or similar, but these concepts are not tied together.Consumerruns in its own thread allocated from the consumer thread pool. If multiple Consumers are subscribed to the same Queue, the broker uses round-robin to distribute the messages between them equally. It is also possible to attach the same Consumer to multiple Queues. You can understand Consumers as callbacks. These are called everytime a message arrives on a Queue the Consumer is bound to. For the case of the Java Client, each Consumers has a methodhandleDelivery(...), which represents the callback method. What you typically do is, subclassDefaultConsumer and override handleDelivery(...). Note: If you attach the same Consumer instance to multiple queues, this method will be called by different threads. So take care of synchronization if necessary.

  • AMQP建立连接
    大致了解一下之后,今天的重点,如何建立连接?
    Connection连接过程大致可以认为是三个同步的RPC调用,Connection.Start ---> Connection.StartOk ---> Connection.Open。 也就是说我们每次建立一个Connection的消耗是很大的,所以有没有一种方式来解决这个问题?
    我想大家可能都想到了,那就是建立连接池。
  • 连接池
    但是在这里有一个问题,我是应该缓存Channel还是Connection?这里给大家看两种方案:


    image.png
    • 方案一:只缓存Connection
    • 方案二:Connection和Channel都缓存
      这两种方案留给大家去思考。
      本来打算手写一个连接池,但是发现Spring提供了现成的,不用想,肯定比我写得好,
  • CachingConnectionFactory
    CachingConnectionFactory为我们提供了两种缓存的模式:
    • CHANNEL模式:这也是CachingConnectionFactory的默认模式,在这种模式下,所有的createConnection()方法实际上返回的都是同一个Connection,同样的Connection.close()方法是没用的,因为就一个,默认情况下,Connection中只缓存了一个Channel,在并发量不大的时候这种模式是完全够用的,当并发量较高的时候,我们可以setChannelCacheSize()来增加Connection中缓存的Channel的数量。
    • CONNECTION模式:在CONNECTION模式下,每一次调用createConnection()方法都会新建一个或者从缓存中获取,根据你设置的ConnectionCacheSize的大小,当小于的时候会采用新建的策略,当大于等于的时候会采用从缓存中获取的策略,与CHANNEL模式不同的是,CONNECTION模式对Connection和Channel都进行了缓存,最新版本的client中已经将Channel的缓存数量从1增加到了25,但是在并发量不是特别大的情况下,作用并不是特别明显。
      使用CachingConnectionFactory需要注意的一点是:所有你获取的Channel对象必须要显式的关闭,所以finally中一定不要忘记释放资源,如果忘记释放,则可能造成连接池中没有资源可用
      好了,我们来看一下创建Connection源码的实现:
  • createConnection()

    synchronized (this.connectionMonitor) {
// CHANNEL模式下,这里的connection是ChannelCachingConnectionProxy 代理对象
//这样做的目的是为Channel提供临时的存储空间(也就是缓存Channel),以便其他客户端调用 
            if (this.cacheMode == CacheMode.CHANNEL) {
//确保Connection对象不为null,target是真实的连接
                if (this.connection.target == null) {
//第一次调用 createConnection 方法时 connection.target 值为 null,因此会调用 createBareConnection 方法创建出 SimpleConnection 赋值给 connection.target
//SimpleConnection 中delegate属性是真正的RabbitMQ 连接(AMQConnection)
                    this.connection.target = super.createBareConnection();
                    // invoke the listener *after* this.connection is assigned
                    if (!this.checkoutPermits.containsKey(this.connection)) {
// Map<Connection, Semaphore> checkoutPermits 中存放了信道的许可数量,也就是默认的25,通过信号量来同步资源
                        this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
                    }
                    this.connection.closeNotified.set(false);
//向所有 ConnectionListener 发布 onCreate 事件
                    getConnectionListener().onCreate(this.connection);
                }
                return this.connection;
            }
            else if (this.cacheMode == CacheMode.CONNECTION) {
//直接从缓存中获取
                return connectionFromCache();
            }
        }

创建Channel的源码实现:

  • createChannel()
        Semaphore permits = null;
//大于0的情况下才会通过 Semaphore 限制当前连接下可用的信道数量
        if (this.channelCheckoutTimeout > 0) {
//获取许可
            permits = obtainPermits(connection);
        }
//获取当前Connection的Channel代理集合
        LinkedList<ChannelProxy> channelList = determineChannelList(connection, transactional);
        ChannelProxy channel = null;
        if (connection.isOpen()) {
//这里主要是从缓存中获取,在同步块中,先判断 channelList 是否为空,若不为空,则返回队列头部缓存的 ChannelProxy(要从队列中移除)。
//如果没有可用的缓存信道,则通过 getCachedChannelProxy 方法创建新的 ChannelProxy。
            channel = findOpenChannel(channelList, channel);
            if (channel != null) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Found cached Rabbit Channel: " + channel.toString());
                }
            }
        }
        if (channel == null) {
            try {
//创建新Channel 的过程
                channel = getCachedChannelProxy(connection, channelList, transactional);
            }
            catch (RuntimeException e) {
                if (permits != null) {
                    permits.release();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Could not get channel; released permit for " + connection + ", remaining:"
                                + permits.availablePermits());
                    }
                }
                throw e;
            }
        }
        return channel;
    }
    private ChannelProxy getCachedChannelProxy(ChannelCachingConnectionProxy connection,
            LinkedList<ChannelProxy> channelList, boolean transactional) { //NOSONAR LinkedList for addLast()
//通过Connection中delegate创建Channel对象
        Channel targetChannel = createBareChannel(connection, transactional);
        if (logger.isDebugEnabled()) {
            logger.debug("Creating cached Rabbit Channel from " + targetChannel);
        }
//向所有 ChannelListener 发布 onCreate 事件
        getChannelListener().onCreate(targetChannel, transactional);
        Class<?>[] interfaces;
//通过 Proxy.newProxyInstance创建一个实现了ChannelProxy接口的动态代理对象。
//所有对该实例的方法调用都会转交给CachedChannelInvocationHandler 的 invoke 方法处理
        if (this.publisherConfirms || this.publisherReturns) {
            interfaces = new Class<?>[] { ChannelProxy.class, PublisherCallbackChannel.class };
        }
        else {
            interfaces = new Class<?>[] { ChannelProxy.class };
        }
        return (ChannelProxy) Proxy.newProxyInstance(ChannelProxy.class.getClassLoader(),
                interfaces, new CachedChannelInvocationHandler(connection, targetChannel, channelList,
                        transactional));
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi阅读 7,279评论 0 10
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,860评论 2 11
  • 十月阳光如泻,到处浸透着欢聚和幸福的气息,处处鸟语花香,繁荣昌盛的景象。徜徉在祖国节日的牧野城里,五彩缤纷的秋菊,...
    流浪的云朵lsh阅读 510评论 0 1
  • 今年春节,我们回了娘家-广东韶关。在春节这段时间,与亲人、同学、朋友聚会时,所有人都离不开这些问题——你怎么跑那么...
    植秋莉阅读 492评论 0 2
  • 其实去年的时候就我知道了《正面管教》这本书,而且还输出了一篇读后感。当时一直记在心里的是——“和善而坚定”以及“关...
    彭海宇阅读 464评论 2 2