一、系统之间的通信技术
- 分布式系统之间并不独立存在的,各个系统之间往往需要共同完成某一个功能,这样就涉及到系统之间的通信,业界通常有两种通信方式,一种是远程过程调用(RPC),一种就是消息队列方式(MQ)。
- 消息队列通信:指有应用中的某一个系统发送信息,由关心这条信息的系统负责接收,并在接收到消息后进行各个系统之间的业务逻辑开发。其中这里的消息可以是简单的字符串,也可以是复杂的流或者是对象。
-
消息队列的实现方式:消息在被发送后就立即返回,由消息队列来负责消息的传递,消息发布者只是管将消息发布到消息队列而不用管消息的接受,消息使用者只是管从消息队列中取出消息而不用管谁来发布的。
二、消息队列由来(使用场合)
- 使用消息队列的典型场景是异步,同时也可以解决解耦、削峰、日志收集、事物最终一致性等问题。概括起来消息队列最大功能就是八个字:异步解耦,削峰填谷
1、解耦
紧耦合的缺陷:一个模块的改动将会导致其他关联模块发生变化,各个模块难以完美独立演化。
解耦:所谓解耦就是一个模块只关心自己的事情就可以,而依赖该模块的其他模块如果做得不是很重要的事,有通知即可,无需等待结果。
解耦实现:要在各个系统之间实现解耦,只要加一个中间件就行。通常可以加以一个消息中间件来完成系统之间的解耦,基于消息中间件实现的解耦,只关心通知,而非结果。
2、流量削峰
在流量来临的时候,通常通过增加机器来面对,提高系统的可用性。但是有的流量是突然来的(比如说一天的流量是波动变化的,存在波峰情况),如果机器一直加着,高峰时候可行,但是平时确实浪费机器。
- 面对这种情况,可以考虑使用消息队列先将请求持久化,然后逐步的进行处理,从而削平高峰流量,改善系统性能。
3、日志收集
不多少,经典的kafka
4、事物最终一致性
三、消息队列功能特点
消息:指应用之间 传递的数据,消息的表现形式多样,可以是简单的字符串等也可以是复杂的对象等。
队列:抽象的指消息的进和出,但是消息的进出并不一定是同步进行的,因此需要一个容器来暂存和处理消息。
Broker:消息处理中心,负责消息的接受、存储、路由、转发等
Producer:消息生产者,负责产生和发送消息到消息处理中心
Consumer:消息消费者,负责从消息处理中心获取消息,并进行处理。
但真实的生产环境,消息队列往往不止是要有基本的消息发送、接受、存储。往往还需要解决诸如消息堆积、消息持久化、消息可靠投递、消息重复、严格有序性、集群高可用等等问题
1、消息堆积
消息堆积:某一个时间段内,消费者处理速度没有跟上生产者发送消息的速度,消息在消息处理中心积压的过程。
因此需要消息队列能够处理这样的场景,比如设置一个阈值,一旦消息堆积超过这个阈值就触发消息不在存储进入消息中心直接丢弃。
2、消息持久化
消息持久化是消息中心必备的一个功能。通常作用是将消息进行暂存下来,合适的时机在进行消费。持久化的方案很多,可以存储到内存,可以存储到磁盘,通常两者都要。
3、可靠投递
可靠投递:不允许存在消息丢失的场景发生
消息丢失可发生在:
1、从生产者到消息中心过程
2.、从消息中心到消费者
3、消息中心处理持久化消息过程
4、消息重复
上面讲了消息的可靠投递,为了满足消息的可靠投递会把消息持久化,然后定时轮询持久化中的消息,有可能会造成消息的重复消费,有时候消息的重复消费比消息丢失更加影响更大。
5、严格有序
有时候要求消息是按顺序消费的,比如购买商品过程中先下订单,在支付。
6、高可用
作为中间件,高可用方案是必须的
四、简单的消息中间系统实现
消息队列至少包含的的三个角色:
Broker:消息处理中心,负责消息的接受、存储、路由、转发等
Producer:消息生产者,负责产生和发送消息到消息处理中心
Consumer:消息消费者,负责从消息处理中心获取消息,并进行处理。
其中,消息处理中心是核心,必须具备消息发送、消息接收和消息暂存的功能。下面就用socket结合多线程知识实现一个简单的消息队列应用。
1、消息处理中心
- 消息处理消息主要实现的功能有存储消息、消息接收、消息发送。并要对外暴露相应的端口和ip,用户请求。
public class Broker {
//存储的最大消息条数
private final static int max_size = 10;
//保存消息的容器
private static BlockingQueue<String> messageQueue = new ArrayBlockingQueue(max_size);
public static void produce(String message) {
if (messageQueue.offer(message)) {
System.out.println("成功向消息中心投递消息:" + message + ",当前的消息总数为:" + messageQueue.size());
} else {
System.out.println("当前消息容易已经满了,不能在进行消息投递!");
}
System.out.println("===============================");
}
public static String consume() {
String message = messageQueue.poll();
if (message != null) {
System.out.println("已经消费消息:" + message + ",当前消息总数:" + messageQueue.size());
} else {
System.out.println("当前消息容器中没有消息可以消费!");
}
System.out.println("===================================");
return message;
}
}
public class BrokerServer implements Runnable {
public static int server_port = 9999;
private final Socket socket;
public BrokerServer(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter printWriter = new PrintWriter(socket.getOutputStream());
while (true) {
String str = bufferedReader.readLine();
if (str == null) {
continue;
}
System.out.println("接收到原始信息:" + str);
if (str.equalsIgnoreCase("consume")) {
//消费一条消息
String message = Broker.consume();
printWriter.println(message);
printWriter.flush();
} else {
//生产消息
Broker.produce(str);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//消息处理中心的启动
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(server_port);
while (true) {
BrokerServer brokerServer = new BrokerServer(serverSocket.accept());
new Thread(brokerServer).start();
}
}
}
2、生产者和消费者
//封装了生产和消费的方法客户端
public class MqClient {
public static void produce(String msg) throws IOException {
Socket socket=new Socket(InetAddress.getLocalHost(),BrokerServer.server_port);
PrintWriter out =new PrintWriter(socket.getOutputStream());
out.println(msg);
out.flush();
}
public static String consume() throws IOException {
Socket socket=new Socket(InetAddress.getLocalHost(),BrokerServer.server_port);
BufferedReader bufferedReader=new BufferedReader(new
InputStreamReader(socket.getInputStream()));
PrintWriter printWriter=new PrintWriter(socket.getOutputStream());
printWriter.println("consume");
printWriter.flush();
String message=bufferedReader.readLine();
return message;
}
}
//生产者
public class Produce1 {
public static void main(String[] args) throws IOException, InterruptedException {
int i=0;
while (true){
MqClient.produce("hello wolrd "+i);
i++;
Thread.sleep(3000);
}
}
}
//消费者
public class Consume1 {
public static void main(String[] args) throws IOException, InterruptedException {
while (true){
String consume = MqClient.consume();
if (consume!=null){
System.out.println("consum1-->获取消费信息为:"+consume);
}
Thread.sleep(100000);
}
}
}