接着上一篇文章:java-RabbitMQ指导1
Advanced Connection options-连接的高级选项
Consumer thread pool-消费者线程池
消费线程会被默认自动分配在一个新的ExeutorService线程池中。如果想要更好的控制就需要提供一个ExecutorService给new Connection()方法,那么默认使用的线程池将会被替换使用。这里有一个例子,提供一个比默认分配的更大的线程池:
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
Executors和ExecutorService类都是在java.util.concurrent包中。
当connection关闭时,默认的ExecutorService会被关闭。但是用户提供的ExecutorService(或者类似的)将不会被自动关闭。所以最后客户端都必须保证它已经被关闭(通过调用它自身shutdown()方法),否则该线程池将会阻止虚拟机的结束。
同一个ExecutorService可以在多个连接之间共享使用,或者重复连接时重复使用,但是一旦它被关闭将不能使用。
使用这项特性应该考虑的是:在消费者回调的过程中,是否存在性能上的瓶颈。如果没有一个消费者回调或者说很少很少,默认的分配会有大量的剩余。使用的是很小的,总的线程资源分配是有界限的,即使偶尔有爆发性的消费情况发生。
Using Lists of Hosts-使用主机集合
传递地址数组给new Connection()方法是可以的,Address是一个简单方面的类,在com.rabbitmq.client包下,包含主机的端口号属性,如下:
Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
, new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);
将会尝试连接hostname1:portnumber1,如果连接失败将会连接hostname2:portnumber2。连接返回的是数组中第一个成功的值(不会返回IOException)。这是通过每次都调用factory.newConnection()完整平等的重复设置主机和端口号,直至其中个一个连接成功。
如果同样提供了ExecutorService(使用方法factory.netConnection(es,addrArr)),这个线程池将会第一个成功连接的相关联。
如果你想有更多的控制主机的方式连接,查看支持服务发现。
Service discovery with thw AddressResolver interface-带有AddressResolver接口的服务发现
在3.6.6版本中,在创建连接时,去实现AddressResolver接口选择实现的地方:
Connection conn = factory.newConnection(addressResolver);
这个AddressResolver接口类似于下面:
public interface AddressResolver {
List<Address> getAddresses() throws IOException;
}
就好像一组主机的集合一样,先尝试连接第一个的地址,如果失败了则连接第二个地址,依次类推。
如果同样提供了ExecutorService(使用方法factory.newConnection(es,addressResolver)),线程池将会和第一个连接成功的相关联。
AddressResolver是一个非常好的地方去实现自定义的服务发现逻辑,在动态的构造中尤其有用。和自动恢复连接结合使用,客户端在开始的时候将会自动连接到节点上。Affinity和载入平衡属于其它的情景,这些情景正是自定义AddressResolver有用的地方。
Java客户端带有下面的实现(细节请参考文档):
- DnsRecordIpAddressResolver:给定主机名,返回Ip地址(不能在DNS服务平台使用)。在负载均衡和失效备援方面可以为简单的DNS-based有用。
- DnsSrvRecordAddressResolver:给定服务名字,返回主机名字和端口号。作为一个DNS SRV请求被实现。当使用服务登录像HashiCorp Consul 时会很有用。
Heartbeat Timeout -心跳的超时时间
查看 Heartbeats guide 获取更多信息,关于心跳机制和在java客户端如何配置的问题。
Custom Thread Factories-自定义线程工厂
计算机环境系统像谷歌应用引擎可以严格管理线程的实例化情况。为了在这样的环境下使用RabbitMQjava版客户端。非常有必要配置一个自定义的ThreadFactory,用于实例化线程的提供方法,例如GAE的ThreadManager。下面是谷歌应用引擎的例子:
import com.google.appengine.api.ThreadManager;
ConnectionFactory cf = new ConnectionFactory();
cf.setThreadFactory(ThreadManager.backgroundThreadFactory());
Support for java non-blocking IO-支持java非阻塞IO
4.0版本的java版客户端带来实验性的支持java非阻塞式IO(java NIO)。NIO不应该被认为快于阻塞IO,它只是允许去更容易的控制资源(例如线程)
默认是阻塞式IO模式,每次连接都使用一个线程从网络套接宇读取数据。如果是NIO模式,你可以控制线程的数量,从网络套接宇中读取或者写入。
如果你的程序使用很多连接(十几个或者上百个),使用NIO模式。如果你使用少量的线程就使用默认的阻塞式模式,设置合适数量的线程,在性能上你不应该变差,尤其是连接并不是很忙的时候。
NIO必须明确的能够使用:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
NIO模式通过设置NioParams类:
connectionFactory.setNioParams(new NioParams().setNbIoThreads(4));
NIO模式使用合理的方式,但是你应该根据你自己工作量去改变它们。一些设置:总的IO线程使用,缓存的大小,服务执行使用在IO循环,内存中参数写入队列(在被发送到网上之前,将请求写入到队列上),详细情况请阅读文档。
Automatic Recovery From NetWork Failures-网络连接失败自动恢复
Connection Recovery-恢复连接
在客户端和RabbitMQ节点间网络连接可能会失败。RabbitMQ客户端支持自动恢复连接和拓展连接(队列,装换器,绑定和消费者),对于许多应用而言自动回复过程将如下步骤:
- 恢复连接
- 恢复连接监听
- 恢复通道
- 恢复通道监听
- 恢复通道basic.qos设置,发布确认和事务设置
拓展恢复包括以下行为,针对每一个通道:
- 恢复声明装换器(除了默认定义的)
- 恢复声明队列
- 恢复所有绑定
- 恢复所有消费者
在java版客户端4.0.0版本中,自动恢复是默认的(拓展恢复也是)。
禁止或者允许自动恢复,使用factory.setAutomaticRecoveryEnabled(boolean)方法,下面的片段显示了如何明确的自动恢复(对于java客户端4.0.0):
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
factory.setAutomaticRecoveryEnabled(true);
// connection that will recover automatically
Connection conn = factory.newConnection();
如果恢复失败导致一个错误(RabbitMQ节点没有存在),在修复时间间隔(默认5秒)后将会再次尝试,这个间隔可以配置:
ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.setNetworkRecoveryInterval(10000);
当提供一组地址时,这组地址没有顺序,所有的地址就一个接着一个尝试的连接:
ConnectionFactory factory = new ConnectionFactory();
Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};
factory.newConnection(addresses);
Recovery Listeners -恢复监听类
在合理的连接和通道中注册一个或者多个恢复监听是可能,当恢复连接是允许的,通过ConnectionFactory#newConnection和Connection#createChannel方法获取Connection类实现com.rabbitmq.client.Recoverable类,提供两个可描述的名字的方法:
- addRecoveryListener
- removeRecoveryListener
注意,当前你需要转变connections和channels为Recoverable才可以使用这些方法。
Effects on Publishing-发布的影响
使用Channel.basicPublish发布的消息在连接异常时将会被丢失。客户端不会把它们放到队列中等恢复连接时在分发。为了保证发布的消息到达RabbitMQ应用,需要使用发布确认机制和连接失败的统计。
Topology Recovery-拓展恢复
拓展恢复包括恢复转换器,队列,绑定和消费者。当自动恢复是允许的它的也是默认允许的。这样在java4.0.0版本中拓展恢复也是允许的。
如果需要拓展恢复也可以禁止:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
// enable automatic recovery (e.g. Java client prior 4.0.0)
factory.setAutomaticRecoveryEnabled(true);
// disable topology recovery
factory.setTopologyRecoveryEnabled(false);
Failure Detection and Recovery Limitations-失败检测和恢复限制
自动恢复连接有许多限制和有意图的设计目的,因此应用开发者们应该注意。
当连接停止或者丢失,它需要一定的时间才能检测到。因此这里会有一个空窗期,在依赖和应用之间没有意识到实际的连接已经失败的情况。在这段时间的框架下,如何消息的发布都是连续的并且跟平常一样地写入到TCP套接宇。通过发布者确认机制才能够保证它们分发的消息传递给了中间件:在AMQP0-9-1中发布消息被设计为异步的。
如果允许自动恢复,当一个套接宇或者I/O操作错误在连接中被检测到,一段配置延迟时间后将会开始恢复连接,默认是5s。这个设计的目的是即使许多次网络连接失败是短暂的并且存活的时间很短,它们也不会立刻停止恢复。恢复连接将在一段明确的时间间隔中保持继续直至新的连接被成功打开。
当一个连接处于恢复状态时,其中通道尝试任务消息的发布都将导致一个错误产生,客户端目前也不会缓存任何内部的无效的消息。应用开发者的责任之一是保证这些消息的记录,然后等恢复连接成功时在发送它们。发布确认机制是一个协议扩展,发布者应该使用,就不会导致消息的丢失。
当一个通道关闭因为一个通道等级的错误时恢复连接将不会收影响,这样的错误经常表示应用级别的问题,这个依赖库不能做一些信息方面的决定关于这样实例的时间。
关闭通道将不会被恢复,即使恢复连接之后,这些包括明确的关闭通道和通道等级的错误发生。
Manual Acknowledgements and Automatic Recovery-手动确认和自动恢复
当手动应答被使用时,有可能在消息传递和应答中间发生网络连接RabbitMQ节点失败的情况。在恢复连接之后,RabbitMQ会重设传递的标签在所有的通道上,这意味着basic.ack,basic.nack和basic.reject带有老的传递标签将会草纸一个连接错误。为了避免这种情况,RabbitMQ的java版客户端保留痕迹和跟新传递的标签,在恢复时使它们自动跟新。Channel.basicAck,Channel.basicNack和Channel.basicReject将转换成合适的传递标签,使之可以被RabbitMQ使用。稳定的传递标签的应答机制将不会被发送,使用手动应答机制和自动恢复功能的应一定有能力处理重新传递消息。
Unhandled Exceptions-未处理的错误
关联到连接,通道,恢复,消费者声明周期的未处理错误将被授权给错误处理者。错误处理者是一个实现了ExceptionHandler接口的对象。默认的情况使用DefaultExceptionHandler对象处理,它将按标准方式输出错误的详情。
可以重写ConnectionFactory#setExceptionHandler,通过Factory创建的connections都可以使用。
ConnectionFactory factory = new ConnectionFactory();
cf.setExceptionHandler(customHandler);
错误处理者应该用于打印错误日志。
Metrics and monitoring-测量和监视器
作为4.0.0版本,客户端可以手收集运行时各种度量(大量发布的消息),度量的收集是可以选择的,并且通过ConnectionFactory级别来创建,使用setMetricsCollector(metricsCollector)方法,这个方法期待着一个MetricsCollector的实例,它可以在代码中好几个地方调用。
4.3版本客户端支持Micrometer和Dropwizard Metrics
下面是收集的metrics:
- 许多打开的connections
- 许多开发的channels
- 许多发布的消息
- 许多消费的消息
- 许多应答的消息
- 许多被拒绝的消息
Micrometer和Dropwizard Metric都提供了数量,还包含速率,最后五分钟速率等等。对于消息关联的度量,它们同样支持类似的工具去监视和报道(JMX,Graphite,Ganglia,Datadog,etc),下面片段专注与更多的细节。
请记住下面关于度量的收集:
不要忘记添加依赖路(Maven,Gradle或者JAR文件)径在JVM路径中,当使用Micrometer或者Dropwizard Metrics。这些可选择的依赖将不会自动在java客户端加载出来,你需要添加其它的独立依赖后台使用,Metrics收集是扩展,实现自定义的MetricsCollector是为了特殊的需求,是被鼓励的。
MetricsCollector被设置到ConnectionFactory等级,但是也可以被分享到不同的实例中。
Metrics手机不支持事务,举例,如果应答被发送事务当中,事务然后在回滚,应答机制在客户端被计算度量(而不是中间件)。注意,应答机制实际上应该被发送给中间件,然后由事务回滚取消。因此客户端度量可以纠正大部分发送的应答消息。总计一句话,不要使用客户端度量严格的业务逻辑,它们不会保证表现的很完美,它们以为会使用简单的运行系统和使操作更有效率。
Micrometer supprot-支持千分尺
你可以使用度量收集千分尺的数据,如下方式:
ConnectionFactory connectionFactory = new ConnectionFactory();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // get Micrometer's Counter object
千分尺支持多种后台:Netflix Atlas,Prometheus,Datadog,Influx,JMX等等。
你可以传递一个MeterRegistry的实例给MicrometerMetricsCollector,下面是JMX的例子:
JmxMeterRegistry registry = new JmxMeterRegistry();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector(registry);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setMetricsCollector(metrics);
Dropwizard Metrics Supprot-支持逐一度量
你可以使用度量收集Dropwiard,如下方式:
ConnectionFactory connectionFactory = new ConnectionFactory();
StandardMetricsCollector metrics = new StandardMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // get Metrics' Meter object
Dropwizard Metrics支持多种报告后台:console,JMX,HTTP,Graphite,Ganglia等等。
你可以传递MetricsRegistry的实例给StandardMetricsCollector,下面是JMX的例子:
ConnectionFactory connectionFactory = new ConnectionFactory();
StandardMetricsCollector metrics = new StandardMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // get Metrics' Meter object
RabbitMQ java Client on Google App Engine-客户端使用Google应用引擎
使用java版RabbitMQ客户端在谷歌应用引擎上需要使用自定义的线程工厂,即使用GAE's ThreadManager实例化线程。另外,非常有必要设置一个低心跳机制间隔(4-5秒)去避免超时进行流读取的行为在GAE上
ConnectionFactory factory = new ConnectionFactory();
cf.setRequestedHeartbeat(5);
Caveats and Limitations-警告和限制
为了使拓展恢复变成可能,java版RabbitMQ客户端维护了声明队列,装换器和绑定的缓存。这个缓存是先前连接的数据,确保RabbitMQ特性是客户端去观察到这些拓展的变化是不可能的。例如:当一个队列被删除,因为TTL,RabbitMQ客户端将视图缓存整个相同的例子:
- 当队列被删
- 当装换器被删
- 当绑定被删
- 在一个自动删除的队列上消费者被取消
- 队列或者转换器未绑定一个会自动删除的装换器
无论如何,客户端在简单的连接之后不会记录这些拓展的变化,应用依赖与自动删除的队列或者装换器,同样队列TTL(而不是,消息TTL),使用自动恢复连接,应该明确的删除实体,并且指导无用的或者删除的,清除客户端拓展缓存。这个将是有促进的,Channel#queueDelete,Channel#exchangeDelete,Channel#queueUnbind和Channel#exchangeUnbind将自动恢复在RabbitMQ3.3.x中(删除将不会导致错误的东西)。
The RPC(Request/Reply) Pattern-RPC模型
作为一门边界程序,java客户端API提供了一个RpcClient类,它使用临响应的队列去提供简单的RPC样式通讯通过协议。
这个类不会强制任务特殊的类型在RPC参数和返回值中。它简单的提供一套机制去发送消息给一个给定的装换器用一个特定的routingkey,并且在回复队列中等待响应,
import com.rabbitmq.client.RpcClient;
RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);
(这个类使用AMQP0-9-1实现的细节如下:请求信息被发送到basic.correlation_id属性,作为一个独一无二的值设置在RpcClient实例中,和使用basic.reply_to设置到回复队列的名字中)。
一旦你创建了这个类的实例,你可以使用下面的方法发送RPC请求:
byte[] primitiveCall(byte[] message);
String stringCall(String message)
Map mapCall(Map message)
Map mapCall(Object[] keyValuePairs)
这个primitiveCall方法将字节数组作为请求参数和响应体,这个stringCall是方面围绕primitiveCall,对待消息体像String实例
一样,使用默认的字符编码方法。
mapCall不同参数有一点复杂,它编码了java.util.map包含的不同java值到AMQP0-9-1协议的二进制中,并且以同样的方式反编码了响应(注意,这里有许多的关系在值类型中使用,详情细节请看文档)
所有编组的或者未编组的简易方法使用primitiveCall作为一种转换,就好像提供了一层包裹在它上面。
TLS Support
在客户端和中间件通讯时使用TLS加密是可能的,客户端和服务端的认证都是被支持的,下面是最简单的方式在客户端加密:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);
factory.useSslProtocol();
注意,客户端不会强制任何服务端(认证链)以上述默认的方式。信任所有的认证,TrustManager被用时,对于本地的发展是方面的,但是在一般的攻击中或者开发环境中不推荐使用。想要学习更多的TLS支持使用RabbitMQ。请参考TLS指南,如果你仅仅想配置客户端,可以阅读TLS指南路径片段。