RabbitMQ核心思想
MQ是幹什麼用的?
應用解耦、異步、流量削鋒、數據分發、錯峰流控、日誌收集等等...
- 當前最主流的消息中間件。
- 高可用性,支持發送確認,投遞確認等特性
- 高可用,支持鏡像隊列
- 支持插件
優點:
- 基於 Erlang, 支持高併發
- 支持多種平臺,多種客戶端,文檔齊全
- 可靠性高
- 在互聯網公司有較大規模等使用,社區活躍度高
1. AMQP協議介紹
Broker :接受和分發消息等的應用,RabbitMQ就是Message
Virtual Host : 虛擬機Broker , 將多個單元隔離開
Connection : publisher / consumer 和 broker之間的TCP連接
Channel : connection內部建立的邏輯連接,通常沒個線程創建單獨的channel
Rounting Key : 路由鍵,用來只是消息的路由轉發,相當於快遞的地址
Exchange : 交換機 ,相當於快遞的分撥中心
Queue : 消息隊列,消息最終被送到這裡等待consumer 取走
Binding : exchange 和 queue之間的虛擬連接,用於message的分發依據
AMQP協議的核心概念-Exchange
- 在AMQP協議或者是RabbitMQ實現中,最核心的組件是Exchange
- Exchange 承擔 RabbitMQ 的核心功能 --- 路由轉發
- Exchange 有多個種類,配置多變,需要詳細講解
RabbitMQ核心 -- Exchange解析
- Exchange是 AMQP 協議和RabbitMQ的 核心組件
- Exchange的功能是根據 綁定關係 和 路由鍵為消息提供路由,將消息轉發至相應的隊列
- Exchange有4種類型 :Direct / Topic / Fabout /Headers
Direct Exchange (直接路由)
- Message中的Routing Key 如果和 Binding Key 一致, Direct Exchange 則將 message 發到對應的 queue中
Fanout Exchange (廣播路由)
- 每個發到 Fanout Exchange 的 message 都會分發到所有綁定到queue上去
Topit Exchange (話題路由)
- 根據 Routing Key 及通配規則,Topic Exchange 將消息分發目標 Queue中
- 全匹配 :與Direct 類似
- Binding Key 中的 #:匹配任意個數的word
2. Docker 安裝 RabbitMQ
docker pull rabbitmq
這裡是直接安裝最新的
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
訪問 : http://IP地址:15672
用戶名和密碼默認都是guest
3. RabbitMQ保證消息的可靠性
- 需要使用RabbitMQ發送端確認機制,確認消息成功發送到RabbitMQ並被處理
- 需要使用RabbitMQ消息返回機制,若沒發現目標隊列,中間件會通知發送方
- 需要使RabbitMQ消息端確認消息,確認消息沒有發生異常
- 需要使用RabbitMQ消費端限流機制,限制消息推送速度 ,保障接受端服務穩定
- 大量到堆積消息會給RabbitMQ產生很大到壓力,需要使用RabbitMQ消息過期時間,防止消息大量積壓
- 過期後會直接丟棄, 不符合業務邏輯,需要使用RabbitMQ死信隊列,收集過期消息,以供分析
4. 發送確認機制原理
消息真的發出去了嗎?
- 消息發送後,發送端不知道RabbitMQ是否真的收到了消息,若RabbitMQ異常,消息丟失,業務異常,這個時候我們就需要使用RabbitMQ發送端確認機制,確認消息發送
三種確認機制
1. 單條同步確認
- 配置channel,開啟確認模式:channel.confirmSelect()
- 每發送一條消息,調用channel.waitForConfirms()方法等待確認
//建立連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String message = objectMapper.writeValueAsString(orderMessageDTO);
channel.confirmSelect();
channel.basicPublish(
"exhange.order.restaurant",
"key.restaurant",
null,
message.getBytes());
if(channel.waitForConfirms()){
//表示發送確認處理邏輯
}else{
//發送失敗
}
2. 多條同步確認
- 配置channel,開啟確認模式:channel.confirmSelect()
- 發送多條消息後,調用channel.waitForConfirms()方法等待確認
//建立連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String message = objectMapper.writeValueAsString(orderMessageDTO);
channel.confirmSelect();
channel.basicPublish(
"exhange.order.restaurant",
"key.restaurant",
null,
message.getBytes());
if(channel.waitForConfirms()){
//表示發送確認處理邏輯
}else{
//發送失敗
}
3. 異步確認
- 配置channel,開啟確認模式:channel.confirmSelect()
- 在channel上添加監聽: addConfirmListener , 發送消息後,會回調此方法,通知是否發送成功
- 異步確認有可能是單條,也有可能是多條,取決於MQ
//建立連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String message = objectMapper.writeValueAsString(orderMessageDTO);
channel.confirmSelect();
channel.basicPublish(
"exhange.order.restaurant",
"key.restaurant",
null,
message.getBytes());
ConfirmListener confirmListener = new ConfirmListener(){
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Ack " + deliveryTag + multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack " + deliveryTag + multiple);
}
};
channel.addConfirmListener(confirmListener);
5. 消息返回機制
消息真被路由了嗎?
- 消息發送後,發送端不知道消息是否被正確路由,若路由異常,消息會被丟棄,業務異常,需要使用RabbitMQ消息返回機制,確認消息被正確路由
消息的開啟方法:
- 在RabbitMQ基礎配置中又一個關鍵配置項:Mandatory
- Mandatory若為false,RabbitMQ將直接丟棄無法路由的消息
- Mandatory若為true,RabbitMQ才會處理無法路由的消息
DeliverCallback deliverCallback = ((consumerTag, message) -> {
//拿到消息
String messageBody = new String(message.getBody());
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("Message Return:");
//處理失敗的業務邏輯
}
});
channel.basicPublish(
"exhange.order.restaurant",
"key.restaurant",
true,
null,
messageBody.getBytes());
}catch (Exception e){
log.error(e.getMessage());
}
});
6. 消費端確認機制
消費端處理異常怎麼辦?
- 默認情況下,消費端接收消息時,消息會被自動確認(ACK),發生異常時,發送端與消息中間件無法得知消息處理情況,需要使用RabbitMQ 消息端確認機制,確認消息被正確處理
消費端ACK類型
手動ACK:消費端收到消息後,不會自動簽收消息,需要我們在業務代碼中顯式簽收消息
- 單條手動ACK : multiple = false
- 多條手動ACK : multiple = true
- 推薦使用單條ACK
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
channel.basicNack(message.getEnvelope().getDeliveryTag(),false,false);
自動ACK:消費端收到消息後,會自動簽收消息
7. 消費端限流機制
業務高峰期,可能出現發送端與接收端性能不一致,大量消息被同時推給接受端,造成接受端服務奔潰
在高併發端場景下,有個微服務奔潰了,本科期間隊列擠壓了大量消息,微服務上線後,收到大量併發消息。將同樣多端消息推給能力不同端副本,會導致部分副本異常
針對以上問題,RabbitMQ 開發了 Qos (服務質量保證) 功能,Qos功能保證了在一定樹木消息違背確認前,不消費新的消息
//這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
channel.basicQos(1);
8. RabbitMQ的過期時間(TTL)
- RabbitMQ的過期時間稱為 TTL (time to live), 生存時間
- RabbitMQ的過期時間分為消息TTL 和 隊列 TTL
- 消息TTL設置了單條消息的過期時間
- 隊列TTL設置了隊列中所有消息的過期時間
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) //deliveryMode=1代表不持久化,deliveryMode=2代表持久化
.contentEncoding("UTF-8") // 編碼方式
.expiration("10000") // 過期時間
.headers(headers) //自定義屬性
.build();
String messageBody = "發送的消息"
//發送通道
channel.basicPublish(
"exhange.order.restaurant",
"key.restaurant",
true,
properties,
messageBody.getBytes());
9. 死信隊列
如何轉移過期的消息?
- 消息被設置了過期時間,過期後會直接被丟棄,直接被丟棄的消息無法對系統運行異常發出警報,需要使用RabbitMQ死信隊列,收集過期消息,以供分析
什麼是死信隊列
- 隊列被配置了DLX屬性 (Dead-Letter-Exchange) 當一個消息變成死信(dead message)後,能重新被髮布到另一個 Exchange , 這個Exchange也是一個普通交換機,死信被死信交換機路由後,一般進入一個固定隊列
怎麼變成死信
- 消息被拒絕 (reject / nack) 並且 requeue = false
- 消息過期(TTL到期)
- 隊列達到最大長度
個人博客地址:http://blog.yanxiaolong.cn/