前言
消息隊列服務相信大家一定都不陌生了,在很多應用系統中,都有一些場景會使用到消息隊列服務,簡單來說,我們可以把消息隊列比作是一個存放消息的容器,上游發送端將消息發送到消息隊列,下游消費端從消息隊列裡消費消息。消息隊列是分佈式系統中重要的組件,核心作用可以幫助我們實現異步、解耦以及削峰,從而提高系統性能和穩定性。
在大部分場景下業務系統如果只需要實現異步解耦、削峰填谷等能力,常規的普通消息就可以滿足此類需求。除此之外,在某些特殊的業務場景中,普通消息類型存在無法滿足需求的情況。這就需要消息隊列服務本身支持一些特殊的消息類型,或者開發者通過開發一些定製化的代碼實現目的。這裡我們列舉在使用消息隊列過程中幾種特殊場景的例子:
順序消費場景
生產者按照一定的先後順序發佈消息,消費者按照既定的先後順序消費消息,即先發布的消息一定會先被客戶端消費。
分佈式事務場景
分佈式架構下,隨著系統的演進,數據庫也進行了垂直拆分,如果選擇使用消息隊列進行上下游解耦的話,生產者和消費者需要保證數據一致性。
延時消費場景
生產者將消息發送到消息隊列後,並不期望立馬投遞這條消息,而是推遲到某個時間點之後將消息投遞給消費者進行消費。
對於順序消息和事務消息,這裡就不進行詳細介紹了,大家有興趣可以自行研究,本文後續內容會和大家一起詳細討論下延時消息更多的細節及應用場景。
延時消息介紹
延時(定時)消息的特點就是發送者成功發送一條消息後,這條消息並不會馬上被消費者消費,而是在某個特定的時間或者延遲一段時間後,消息才被消費者可見並進行後續的消費,延時消息整個生命週期可以用如下示意圖來表示:
- 消息發佈者將一條延時消息發送到消息隊列服務端;
- 在預計投遞時間未到之前,消息對消費者不可見,消費者此時無法立刻消費;
- 投遞時間到達後,消息才對消費者可見,消費者此時可以消費;
- 消費者獲取此條消息並進行消費;
- 消費者成功消費後,進行確認,此條消息將不再被消費。
延時消息應用場景
交易場景
在生產者和消費者有時間窗口的要求下,我們可以考慮使用延時消息。如在電商交易場景下,交易中超時未支付的訂單需要被關閉的場景,在訂單創建時會發送一條延時消息。這條消息將會在30分鐘以後投遞給消費者,消費者收到此消息後,需要判斷對應的訂單是否已完成支付;如支付未完成,則關閉訂單。
遊戲場景
再比如在遊戲社區裡,遊戲運營方經常會發起一些活動,玩家在活動期間內按照規則完成一系列任務,活動時間截止後,遊戲後臺根據玩家完成任務的情況進行判定,發送系統通知或者進行rank排名並派發獎勵等。
此種場景也可以採用延時消息來實現,上游系統發佈活動公告後,同時發送一條延時消息,延時時間設置為活動週期的時間。當活動截止後,下游系統可以隨即消費消息並進行相應的邏輯處理。
其他場景
同時延時消息也可以廣泛應用於信息提醒等比較通用的場景。
如何實現延時消息
介紹完延時消息的一些概念及應用場景後,我們接下來分析一下目前比較主流的幾款開源消息中間件對延時消息的支持情況以及實現方式。
Kafka
原生Kafka默認是不支持延時消息的,需要開發者自己實現一層代理服務,比如發送端將消息發送到延時Topic,代理服務消費延時Topic的消息然後轉存起來,代理服務通過一定的算法,計算延時消息所附帶的延時時間是否到達,然後將延時消息取出來併發送到實際的Topic裡面,消費端從實際的Topic裡面進行消費。
RabbitMQ
RabbitMQ實現延時消息有兩種方案,第一種是採用rabbitmq-delayed-message-exchange 插件實現,第二種則是利用DLX(Dead Letter Exchanges)+ TTL(消息存活時間)來間接實現。大致的實現思路如下:
- 創建一個普通隊列delay_queue,為此隊列設置死信交換機 (通過x-dead-letter-exchange參數) 和 RoutingKey (通過x-dead-letter-routing-key參數),生產者將向delay_queue發送延時消息。
- 創建步驟1中設置的死信交換機,同時創建一個目標隊列 target_queue,並使用步驟1中設置的RoutingKey將兩者綁定起來。消費者將從target_queue裡面消費延時消息。
- 設置消息的存活時間TTL,可以在步驟1中設置到隊列級別delay_queue的消息存活時間,或者在發送消息時動態設置消息級別的存活時間。
RocketMQ
開源RocketMQ支持延遲消息,但是不支持秒級精度。默認支持18個level的延遲消息,這是通過broker端的messageDelayLevel配置項確定的messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
消息隊列服務在啟動時,會創建一個內部topic:SCHEDULE_TOPIC_XXXX,根據延遲level的個數,創建對應數量的隊列。生產者發送消息時可以設置延時等級,示例代碼:
Message msg=new Message();
msg.setTopic("TopicA");
msg.setBody("this is a delay message".getBytes());
//設置延遲level為5,對應延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);
發送的消息會暫存在Broker對應的內部topic中,再通過定時任務從內部topic中拉取數據,如果延遲時間到了,就會把消息轉發到目標topic下,消費者從目標topic消費消息。
阿里雲消息隊列RocketMQ版
通過上一章節的討論,我們可以看出目前幾款主流的開源消息隊列服務,在支持延時消息的場景下或多或少有些不完美的地方。主要體現在以下幾點:
- Kafka不支持延時消息,需要完全開發代理服務來實現,工作量大。
- RabbitMQ需要額外的插件,或者利用DLX+TTL的方式進行中轉,實現不是非常直觀。
- RocketMQ支持延時消息,但是無法支持秒級延時。
那麼有沒有一款消息隊列服務,能夠完美的支持延時(定時)消息。本節我們將介紹阿里雲消息隊列RocketMQ版。
阿里雲消息隊列RocketMQ版基於Apache RocketMQ構建的低延遲、高併發、高可用、高可靠的分佈式消息中間件。消息隊列RocketMQ版既可為分佈式應用系統提供異步解耦和削峰填谷的能力,同時也具備互聯網應用所需的海量消息堆積、高吞吐、可靠重試等特性。同時支持豐富的消息類型包括普通消息、順序消息、事務消息以及我們本文討論的延時消息。接下來我們看下阿里雲RocketMQ為延時消息提供的能力及優勢:
- 支持秒級的延時(定時)消息,同時延時時間可以最大設置為40天,基本滿足所有場景。
- 延時(定時)消息的投遞精度可控制在1~2秒之內。
- 延時(定時)消息在某段時間內是對消費者不可見的,從另一個維度看也屬於積壓的消息,阿里雲消息隊列RocketMQ版的不同實例規格可以支持百萬級到千萬級的消息積壓。
- 提供了多語言支持,包括Java、.NET、CC++、GO、Python、PHP、Node.js等
使用阿里雲消息隊列RocketMQ版收發延時(定時)消息,只需要在控制檯創建Topic的時候選擇定時/延時消息類型,既可以使用TCP或者http協議進行消息收發。
控制檯創建定時/延時Topic
Java語言示例代碼(TCP協議)
- 發送定時消息
// 定時消息,單位毫秒(ms),在指定時間戳(當前時間之後)進行投遞,例如2020-03-07 16:21:00投遞。如果被設置成當前時間戳之前的某個時刻,消息將立刻投遞給消費者。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2020-03-07 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);
// 發送消息,只要不拋異常就是成功。
SendResult sendResult = producer.send(msg);
- 發送延時消息
// 延時消息,單位毫秒(ms),在指定延遲時間(當前時間之後)進行投遞,例如消息在3秒後投遞。
long delayTime = System.currentTimeMillis() + 3000;
// 設置消息需要被投遞的時間。
msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);
同時訂閱延時消息的邏輯無需任何改造,完全可以按照訂閱普通消息的方式,沒有任何的代碼侵入性。
結束語
到此我們討論了延時消息的特性、應用場景,對比了各類消息隊列對延時消息的支持情況,同時也向大家介紹了阿里雲消息隊列RocketMQ版。我們在對消息中間件進行選型時,也會考慮到多方面的因素。除了消息中間件本身所能提供的能力外,也包括服務性能、穩定性、可擴展能力,以及需要結合開發團隊自身的技術棧等情況。最後如果大家想了解更多阿里雲消息隊列RocketMQ版。可以參考下面的鏈接:
https://help.aliyun.com/product/29530.html?spm=a2c4g.11186623.6.540.2add192aWINJ9c
《阿里雲原生產品手冊》上線,本電子書聚焦雲原生12款核心產品,覆蓋容器產品、微服務產品、消息中間件產品、Serverless產品等,內容包括每款產品的核心亮點、解決問題、客戶案例、常見問題等,展示最全面的雲原生產品與行業應用,為企業雲原生上雲和容器化改造提供思路和指引。
下載鏈接:https://developer.aliyun.com/topic/download?id=1000