1、消息队列简介

一、系统之间的通信技术

  • 分布式系统之间并不独立存在的,各个系统之间往往需要共同完成某一个功能,这样就涉及到系统之间的通信,业界通常有两种通信方式,一种是远程过程调用(RPC),一种就是消息队列方式(MQ)
  • 消息队列通信:指有应用中的某一个系统发送信息,由关心这条信息的系统负责接收,并在接收到消息后进行各个系统之间的业务逻辑开发。其中这里的消息可以是简单的字符串,也可以是复杂的流或者是对象。
  • 消息队列的实现方式:消息在被发送后就立即返回,由消息队列来负责消息的传递,消息发布者只是管将消息发布到消息队列而不用管消息的接受,消息使用者只是管从消息队列中取出消息而不用管谁来发布的。

二、消息队列由来(使用场合)

  • 使用消息队列的典型场景是异步,同时也可以解决解耦、削峰、日志收集、事物最终一致性等问题。概括起来消息队列最大功能就是八个字:异步解耦,削峰填谷
1、解耦

紧耦合的缺陷:一个模块的改动将会导致其他关联模块发生变化,各个模块难以完美独立演化。
解耦:所谓解耦就是一个模块只关心自己的事情就可以,而依赖该模块的其他模块如果做得不是很重要的事,有通知即可,无需等待结果。
解耦实现:要在各个系统之间实现解耦,只要加一个中间件就行。通常可以加以一个消息中间件来完成系统之间的解耦,基于消息中间件实现的解耦,只关心通知,而非结果。

2、流量削峰

在流量来临的时候,通常通过增加机器来面对,提高系统的可用性。但是有的流量是突然来的(比如说一天的流量是波动变化的,存在波峰情况),如果机器一直加着,高峰时候可行,但是平时确实浪费机器。

  • 面对这种情况,可以考虑使用消息队列先将请求持久化,然后逐步的进行处理,从而削平高峰流量,改善系统性能。
3、日志收集

不多少,经典的kafka

4、事物最终一致性

三、消息队列功能特点

消息:指应用之间 传递的数据,消息的表现形式多样,可以是简单的字符串等也可以是复杂的对象等。
队列:抽象的指消息的进和出,但是消息的进出并不一定是同步进行的,因此需要一个容器来暂存和处理消息。

Broker:消息处理中心,负责消息的接受、存储、路由、转发等
Producer:消息生产者,负责产生和发送消息到消息处理中心
Consumer:消息消费者,负责从消息处理中心获取消息,并进行处理。

但真实的生产环境,消息队列往往不止是要有基本的消息发送、接受、存储。往往还需要解决诸如消息堆积、消息持久化、消息可靠投递、消息重复、严格有序性、集群高可用等等问题

1、消息堆积

消息堆积:某一个时间段内,消费者处理速度没有跟上生产者发送消息的速度,消息在消息处理中心积压的过程。

因此需要消息队列能够处理这样的场景,比如设置一个阈值,一旦消息堆积超过这个阈值就触发消息不在存储进入消息中心直接丢弃。

2、消息持久化

消息持久化是消息中心必备的一个功能。通常作用是将消息进行暂存下来,合适的时机在进行消费。持久化的方案很多,可以存储到内存,可以存储到磁盘,通常两者都要。

3、可靠投递

可靠投递:不允许存在消息丢失的场景发生
消息丢失可发生在:

1、从生产者到消息中心过程
2.、从消息中心到消费者
3、消息中心处理持久化消息过程

4、消息重复

上面讲了消息的可靠投递,为了满足消息的可靠投递会把消息持久化,然后定时轮询持久化中的消息,有可能会造成消息的重复消费,有时候消息的重复消费比消息丢失更加影响更大。

5、严格有序

有时候要求消息是按顺序消费的,比如购买商品过程中先下订单,在支付。

6、高可用

作为中间件,高可用方案是必须的

四、简单的消息中间系统实现

消息队列至少包含的的三个角色:
Broker:消息处理中心,负责消息的接受、存储、路由、转发等
Producer:消息生产者,负责产生和发送消息到消息处理中心
Consumer:消息消费者,负责从消息处理中心获取消息,并进行处理。

其中,消息处理中心是核心,必须具备消息发送、消息接收和消息暂存的功能。下面就用socket结合多线程知识实现一个简单的消息队列应用。

1、消息处理中心
  • 消息处理消息主要实现的功能有存储消息、消息接收、消息发送。并要对外暴露相应的端口和ip,用户请求。
public class Broker {
    //存储的最大消息条数
    private final static int max_size = 10;
    //保存消息的容器
    private static BlockingQueue<String> messageQueue = new ArrayBlockingQueue(max_size);

    public static void produce(String message) {
        if (messageQueue.offer(message)) {
            System.out.println("成功向消息中心投递消息:" + message + ",当前的消息总数为:" + messageQueue.size());
        } else {
            System.out.println("当前消息容易已经满了,不能在进行消息投递!");
        }
        System.out.println("===============================");
    }

    public static String consume() {
        String message = messageQueue.poll();
        if (message != null) {
            System.out.println("已经消费消息:" + message + ",当前消息总数:" + messageQueue.size());
        } else {
            System.out.println("当前消息容器中没有消息可以消费!");
        }
        System.out.println("===================================");
        return message;
    }
}

public class BrokerServer implements Runnable {

    public static int server_port = 9999;
    private final Socket socket;

    public BrokerServer(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream());
            while (true) {
                String str = bufferedReader.readLine();
                if (str == null) {
                    continue;
                }
                System.out.println("接收到原始信息:" + str);
                if (str.equalsIgnoreCase("consume")) {
                    //消费一条消息
                    String message = Broker.consume();
                    printWriter.println(message);
                    printWriter.flush();
                } else {
                    //生产消息
                    Broker.produce(str);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
  //消息处理中心的启动
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(server_port);
        while (true) {
            BrokerServer brokerServer = new BrokerServer(serverSocket.accept());
            new Thread(brokerServer).start();
        }
    }
}
2、生产者和消费者
//封装了生产和消费的方法客户端
public class MqClient {
    public static void produce(String msg) throws IOException {
        Socket socket=new Socket(InetAddress.getLocalHost(),BrokerServer.server_port);
        PrintWriter out =new PrintWriter(socket.getOutputStream());
        out.println(msg);
        out.flush();
    }
    public static String consume() throws IOException {
        Socket socket=new Socket(InetAddress.getLocalHost(),BrokerServer.server_port);
        BufferedReader bufferedReader=new BufferedReader(new 
InputStreamReader(socket.getInputStream()));
        PrintWriter printWriter=new PrintWriter(socket.getOutputStream());
        printWriter.println("consume");
        printWriter.flush();

        String message=bufferedReader.readLine();
        return message;
    }

}
//生产者
public class Produce1 {
    public static void main(String[] args) throws IOException, InterruptedException {
        int i=0;
        while (true){
            MqClient.produce("hello wolrd  "+i);
            i++;
            Thread.sleep(3000);
        }
    }
}
//消费者
public class Consume1 {
    public static void main(String[] args) throws IOException, InterruptedException {
        while (true){
            String consume = MqClient.consume();
            if (consume!=null){
                System.out.println("consum1-->获取消费信息为:"+consume);
            }
            Thread.sleep(100000);
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容

  • 消息队列 什么是消息队列(Message Queue,MQ)呢? 首先回忆下生活中在餐馆点餐的场景,当你点完餐之后...
    JunChow520阅读 3,031评论 0 29
  • 以下是消息队列以下的大纲,本文主要介绍消息队列概述,消息队列应用场景和消息中间件示例(电商,日志系统)。 本次分享...
    文档随手记阅读 1,880评论 0 28
  • 一、 消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。实现高性能...
    步积阅读 56,835评论 10 138
  • 转自:https://tech.meituan.com/2016/07/01/mq-design.html 一、何...
    小manong阅读 456评论 0 0
  • 1.什么是消息队列 消息队列允许应用间通过消息的发送与接收的方式进行通信,当消息接收方服务忙或不可用时,其提供了一...
    zhuke阅读 4,445评论 0 12