開發與維運

RabbitMQ核心思想

RabbitMQ核心思想

在這裡插入圖片描述

MQ是幹什麼用的?

應用解耦、異步、流量削鋒、數據分發、錯峰流控、日誌收集等等...

  • 當前最主流的消息中間件。
  • 高可用性,支持發送確認,投遞確認等特性
  • 高可用,支持鏡像隊列
  • 支持插件

優點:

  1. 基於 Erlang, 支持高併發
  2. 支持多種平臺,多種客戶端,文檔齊全
  3. 可靠性高
  4. 在互聯網公司有較大規模等使用,社區活躍度高

1. AMQP協議介紹

在這裡插入圖片描述

Broker :接受和分發消息等的應用,RabbitMQ就是Message

Virtual Host : 虛擬機Broker , 將多個單元隔離開

Connection : publisher / consumer 和 broker之間的TCP連接

Channel : connection內部建立的邏輯連接,通常沒個線程創建單獨的channel

Rounting Key : 路由鍵,用來只是消息的路由轉發,相當於快遞的地址

Exchange : 交換機 ,相當於快遞的分撥中心

Queue : 消息隊列,消息最終被送到這裡等待consumer 取走

Binding : exchange 和 queue之間的虛擬連接,用於message的分發依據


AMQP協議的核心概念-Exchange

  • 在AMQP協議或者是RabbitMQ實現中,最核心的組件是Exchange
  • Exchange 承擔 RabbitMQ 的核心功能 --- 路由轉發
  • Exchange 有多個種類,配置多變,需要詳細講解

RabbitMQ核心 -- Exchange解析

  1. Exchange是 AMQP 協議和RabbitMQ的 核心組件
  2. Exchange的功能是根據 綁定關係 和 路由鍵為消息提供路由,將消息轉發至相應的隊列
  3. Exchange有4種類型 :Direct / Topic / Fabout /Headers

Direct Exchange (直接路由)

  • Message中的Routing Key 如果和 Binding Key 一致, Direct Exchange 則將 message 發到對應的 queue中

在這裡插入圖片描述

Fanout Exchange (廣播路由)

  • 每個發到 Fanout Exchange 的 message 都會分發到所有綁定到queue上去

在這裡插入圖片描述

Topit Exchange (話題路由)

  • 根據 Routing Key 及通配規則,Topic Exchange 將消息分發目標 Queue中
  • 全匹配 :與Direct 類似
  • Binding Key 中的 #:匹配任意個數的word

在這裡插入圖片描述

2. Docker 安裝 RabbitMQ


    docker pull rabbitmq
    

這裡是直接安裝最新的


    docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

訪問 : http://IP地址:15672

在這裡插入圖片描述
用戶名和密碼默認都是guest


3. RabbitMQ保證消息的可靠性

  • 需要使用RabbitMQ發送端確認機制,確認消息成功發送到RabbitMQ並被處理
  • 需要使用RabbitMQ消息返回機制,若沒發現目標隊列,中間件會通知發送方
  • 需要使RabbitMQ消息端確認消息,確認消息沒有發生異常
  • 需要使用RabbitMQ消費端限流機制,限制消息推送速度 ,保障接受端服務穩定
  • 大量到堆積消息會給RabbitMQ產生很大到壓力,需要使用RabbitMQ消息過期時間,防止消息大量積壓
  • 過期後會直接丟棄, 不符合業務邏輯,需要使用RabbitMQ死信隊列,收集過期消息,以供分析

4. 發送確認機制原理

消息真的發出去了嗎?

  • 消息發送後,發送端不知道RabbitMQ是否真的收到了消息,若RabbitMQ異常,消息丟失,業務異常,這個時候我們就需要使用RabbitMQ發送端確認機制,確認消息發送

三種確認機制

1. 單條同步確認
  • 配置channel,開啟確認模式:channel.confirmSelect()
  • 每發送一條消息,調用channel.waitForConfirms()方法等待確認
         //建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String message = objectMapper.writeValueAsString(orderMessageDTO);
        channel.confirmSelect();
        channel.basicPublish(
                "exhange.order.restaurant",
                "key.restaurant",
                null,
                message.getBytes());
        if(channel.waitForConfirms()){
            //表示發送確認處理邏輯
        }else{
            //發送失敗
        }
2. 多條同步確認
  • 配置channel,開啟確認模式:channel.confirmSelect()
  • 發送多條消息後,調用channel.waitForConfirms()方法等待確認
         //建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String message = objectMapper.writeValueAsString(orderMessageDTO);
        channel.confirmSelect();
        channel.basicPublish(
                "exhange.order.restaurant",
                "key.restaurant",
                null,
                message.getBytes());
        if(channel.waitForConfirms()){
            //表示發送確認處理邏輯
        }else{
            //發送失敗
        }
3. 異步確認
  • 配置channel,開啟確認模式:channel.confirmSelect()
  • 在channel上添加監聽: addConfirmListener , 發送消息後,會回調此方法,通知是否發送成功
  • 異步確認有可能是單條,也有可能是多條,取決於MQ
        //建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String message = objectMapper.writeValueAsString(orderMessageDTO);
        channel.confirmSelect();
        channel.basicPublish(
                "exhange.order.restaurant",
                "key.restaurant",
                null,
                message.getBytes());
        ConfirmListener confirmListener = new ConfirmListener(){

            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Ack  " + deliveryTag + multiple);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Nack  " + deliveryTag + multiple);
            }
        };
        channel.addConfirmListener(confirmListener);

5. 消息返回機制

消息真被路由了嗎?

  • 消息發送後,發送端不知道消息是否被正確路由,若路由異常,消息會被丟棄,業務異常,需要使用RabbitMQ消息返回機制,確認消息被正確路由

消息的開啟方法:

  1. 在RabbitMQ基礎配置中又一個關鍵配置項:Mandatory
  2. Mandatory若為false,RabbitMQ將直接丟棄無法路由的消息
  3. Mandatory若為true,RabbitMQ才會處理無法路由的消息

在這裡插入圖片描述

DeliverCallback deliverCallback = ((consumerTag, message) -> {
        //拿到消息
        String messageBody = new String(message.getBody());
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        try {
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    log.info("Message Return:");
                    //處理失敗的業務邏輯
                }
            });
            channel.basicPublish(
                    "exhange.order.restaurant",
                    "key.restaurant",
                    true,
                    null,
                    messageBody.getBytes());

        }catch (Exception e){
            log.error(e.getMessage());
        }

    });

6. 消費端確認機制

消費端處理異常怎麼辦?

  • 默認情況下,消費端接收消息時,消息會被自動確認(ACK),發生異常時,發送端與消息中間件無法得知消息處理情況,需要使用RabbitMQ 消息端確認機制,確認消息被正確處理

消費端ACK類型

手動ACK:消費端收到消息後,不會自動簽收消息,需要我們在業務代碼中顯式簽收消息


- 單條手動ACK : multiple = false

- 多條手動ACK : multiple = true

- 推薦使用單條ACK

    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);


    channel.basicNack(message.getEnvelope().getDeliveryTag(),false,false);

自動ACK:消費端收到消息後,會自動簽收消息


7. 消費端限流機制

業務高峰期,可能出現發送端與接收端性能不一致,大量消息被同時推給接受端,造成接受端服務奔潰

在高併發端場景下,有個微服務奔潰了,本科期間隊列擠壓了大量消息,微服務上線後,收到大量併發消息。將同樣多端消息推給能力不同端副本,會導致部分副本異常

針對以上問題,RabbitMQ 開發了 Qos (服務質量保證) 功能,Qos功能保證了在一定樹木消息違背確認前,不消費新的消息

//這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
channel.basicQos(1);

8. RabbitMQ的過期時間(TTL)

  • RabbitMQ的過期時間稱為 TTL (time to live), 生存時間
  • RabbitMQ的過期時間分為消息TTL 和 隊列 TTL
  • 消息TTL設置了單條消息的過期時間
  • 隊列TTL設置了隊列中所有消息的過期時間
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
            .deliveryMode(2) //deliveryMode=1代表不持久化,deliveryMode=2代表持久化
            .contentEncoding("UTF-8") // 編碼方式
            .expiration("10000") // 過期時間
            .headers(headers) //自定義屬性
            .build();


String messageBody = "發送的消息"

//發送通道
channel.basicPublish(
                    "exhange.order.restaurant",
                    "key.restaurant",
                    true,
                    properties,
                    messageBody.getBytes());

9. 死信隊列

如何轉移過期的消息?

  • 消息被設置了過期時間,過期後會直接被丟棄,直接被丟棄的消息無法對系統運行異常發出警報,需要使用RabbitMQ死信隊列,收集過期消息,以供分析

什麼是死信隊列

  • 隊列被配置了DLX屬性 (Dead-Letter-Exchange) 當一個消息變成死信(dead message)後,能重新被髮布到另一個 Exchange , 這個Exchange也是一個普通交換機,死信被死信交換機路由後,一般進入一個固定隊列

在這裡插入圖片描述

怎麼變成死信
  • 消息被拒絕 (reject / nack) 並且 requeue = false
  • 消息過期(TTL到期)
  • 隊列達到最大長度

個人博客地址:http://blog.yanxiaolong.cn/

Leave a Reply

Your email address will not be published. Required fields are marked *