1. 前言
Apache RocketMQ作為廣為人知的開源消息中間件,誕生於阿里巴巴,於2016年捐贈給了Apache。從RocketMQ 4.0到如今最新的v4.7.1,不論是在阿里巴巴內部還是外部社區,都贏得了廣泛的關注和好評。
出於興趣和工作的需要,近期本人對RocketMQ 4.7.1的部分代碼進行了研讀,其間產生了很多困惑,也收穫了更多的啟發。
本文將站在發送方視角,通過閱讀RocketMQ Producer源碼,來分析在事務消息發送中RocketMQ是如何工作的。需要說明的是,本文所貼代碼,均來自4.7.1版本的RocketMQ源碼。本文中所討論的發送,僅指從Producer發送到Broker的過程,並不包含Broker將消息投遞到Consumer的過程。
2. 宏觀概覽
RocketMQ事務消息發送流程:
圖1
結合源碼來看,RocketMQ的事務消息TransactionMQProducer的sendMessageInTransaction方法,實際調用了DefaultMQProducerImpl的sendMessageInTransaction方法。我們進入sendMessageInTransaction方法,整個事務消息的發送流程清晰可見:
首先,做發送前檢查,並填入必要參數,包括設prepare事務消息。
源碼清單-1
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
進入發送處理流程:
源碼清單-2
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
根據broker返回的處理結果決策本地事務是否執行,半消息發送成功則開始本地事務執行:
源碼清單-3
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE: // 當備broker狀態不可用時,半消息要回滾,不執行本地事務
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
本地事務執行結束,根據本地事務狀態進行二階段處理:
源碼清單-4
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
// 組裝發送結果
// ...
return transactionSendResult;
}
接下來,我們深入每個階段代碼分析。
3. 深扒內幕
3.1 一階段發送
重點分析send方法。進入send方法後,我們發現,RocketMQ的事務消息的一階段,使用了SYNC同步模式:
源碼清單-5
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
這一點很容易理解,畢竟事務消息是要根據一階段發送結果來決定要不要執行本地事務的,所以一定要阻塞等待broker的ack。
我們進入DefaultMQProducerImpl.java中去看sendDefaultImpl方法的實現,通過讀這個方法的代碼,來嘗試瞭解在事務消息的一階段發送過程中producer的行為。 值得注意的是,這個方法並非為事務消息定製,甚至不是為SYNC同步模式定製的,因此讀懂了這段代碼,基本可以對RocketMQ的消息發送機制有了一個較為全面的認識。
這段代碼邏輯非常通暢,不忍切片。為了節省篇幅,將代碼中較為繁雜但信息量不大的部分以註釋代替,儘可能保留流程的完整性。個人認為較為重要或是容易被忽略的部分,以註釋標出,後文還有部分細節的詳細解讀。
源碼清單-6
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
// 一、消息有效性校驗。見後文
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 獲取當前topic的發送路由信息,主要是要broker,如果沒找到則從namesrv獲取
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 二、發送重試機制。見後文
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
// 第一次發送是mq == null, 之後都是有broker信息的
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 三、rocketmq發送消息時如何選擇隊列?——broker異常規避機制
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// 發送核心代碼
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
// rocketmq 選擇 broker 時的規避機制,開啟 sendLatencyFaultEnable == true 才生效
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
// 四、RocketMQ的三種CommunicationMode。見後文
case ASYNC: // 異步模式
return null;
case ONEWAY: // 單向模式
return null;
case SYNC: // 同步模式
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
// ...
// 自動重試
} catch (MQClientException e) {
// ...
// 自動重試
} catch (MQBrokerException e) {
// ...
// 僅返回碼==NOT_IN_CURRENT_UNIT==205 時自動重試
// 其他情況不重試,拋異常
} catch (InterruptedException e) {
// ...
// 不重試,拋異常
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
// 組裝返回的info信息,最後以MQClientException拋出
// ... ...
// 超時場景拋RemotingTooMuchRequestException
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
// 填充MQClientException異常信息
// ...
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
3.1.1 消息有效性校驗
源碼清單-7
Validators.checkMessage(msg, this.defaultMQProducer);
在此方法中校驗消息的有效性,包括對topic和消息體的校驗。topic的命名必須符合規範,且避免使用內置的系統消息TOPIC。消息體長度 > 0 && 消息體長度 <= 102410244 = 4M 。
源碼清單-8
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// topic
Validators.checkTopic(msg.getTopic());
Validators.isNotAllowedSendTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
3.1.2 發送重試機制
Producer在消息發送不成功時,會自動重試,最多發送次數 = retryTimesWhenSendFailed + 1 = 3次 。
值得注意的是,並非所有異常情況都會重試,從以上源碼中可以提取到的信息告訴我們,在以下三種情況下,會自動重試:
1)發生RemotingException,MQClientException兩種異常之一時。
2)發生MQBrokerException異常,且ResponseCode是NOT_IN_CURRENT_UNIT = 205時。
3)SYNC模式下,未發生異常且發送結果狀態非 SEND_OK。
在每次發送消息之前,會先檢查是否在前面這兩步就已經耗時超長(超時時長默認3000ms),若是,則不再繼續發送並且直接返回超時,不再重試。這裡說明了2個問題:
1)producer內部自動重試對業務應用而言是無感知的,應用看到的發送耗時是包含所有重試的耗時在內的;
2)一旦超時意味著本次消息發送已經以失敗告終,原因是超時。這個信息最後會以RemotingTooMuchRequestException的形式拋出。
這裡需要指出的是,在RocketMQ官方文檔中指出,發送超時時長是10s,即10000ms,網上許多人對rocketMQ的超時時間解讀也認為是10s。然而代碼中卻明明白白寫著3000ms,最終我debug之後確認,默認超時時間確實是3000ms。這裡也建議RocketMQ團隊對文檔進行確認,如確有誤,還是早日更正為好。
圖2
3.1.3 broker的異常規避機制
源碼清單-8
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
這行代碼是發送前選擇queue的過程。
這裡涉及RocketMQ消息發送高可用的的一個核心機制,latencyFaultTolerance。這個機制是Producer負載均衡的一部分,通過sendLatencyFaultEnable的值來控制,默認是false關閉狀態,不啟動broker故障延遲機制,值為true時啟用broker故障延遲機制,可由Producer主動打開。
選擇隊列時,開啟異常規避機制,則根據broker的工作狀態避免選擇當前狀態不佳的broker代理,不健康的broker會在一段時間內被規避,不開啟異常規避機制時,則按順序選取下一個隊列,但在重試場景下會盡量選擇不同於上次發送broker的queue。每次消息發送都會通過updateFaultItem方法來維護broker的狀態信息。
源碼清單-9
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
// 計算延遲多久,isolation表示是否需要隔離該broker,若是,則從30s往前找第一個比30s小的延遲值,再按下標判斷規避的週期,若30s,則是10min規避;
// 否則,按上一次發送耗時來決定規避時長;
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
深入到selectOneMessageQueue方法內部一探究竟:
源碼清單-10
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
// 開啟異常規避
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
// 按順序取下一個message queue作為發送的queue
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 當前queue所在的broker可用,且與上一個queue的broker相同,
// 或者第一次發送,則使用這個queue
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
// 不開啟異常規避,則隨機自增選擇Queue
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
3.1.4 RocketMQ的三種CommunicationMode
源碼清單-11
public enum CommunicationMode {
SYNC,
ASYNC,
ONEWAY,
}
以上三種模式指的都是消息從發送方到達broker的階段,不包含broker將消息投遞給訂閱方的過程。
三種模式的發送方式的差異:
- 單向模式:ONEWAY。消息發送方只管發送,並不關心broker處理的結果如何。這種模式下,由於處理流程少,發送耗時非常小,吞吐量大,但不能保證消息可靠不丟,常用於流量巨大但不重要的消息場景,例如心跳發送等。
- 異步模式:ASYNC。消息發送方發送消息到broker後,無需等待broker處理,拿到的是null的返回值,而由一個異步的線程來做消息處理,處理完成後以回調的形式告訴發送方發送結果。異步處理時如有異常,返回發送方失敗結果之前,會經過內部重試(默認3次,發送方不感知)。這種模式下,發送方等待時長較小,吞吐量較大,消息可靠,用於流量大但重要的消息場景。
- 同步模式:SYNC。消息發送方需等待broker處理完成並明確返回成功或失敗,在消息發送方拿到消息發送失敗的結果之前,也會經歷過內部重試(默認3次,發送方不感知)。這種模式下,發送方會阻塞等待消息處理結果,等待時長較長,消息可靠,用於流量不大但重要的消息場景。需要強調的是,事務消息的一階段半事務消息的處理是同步模式。
在sendKernelImpl方法中也可以看到具體的實現差異。ONEWAY模式最為簡單,不做任何處理。負責發送的sendMessage方法參數中,相比同步模式,異步模式多了回調方法、包含topic發送路由元信息的topicPublishInfo、包含發送broker信息的instance、包含發送隊列信息的producer、重試次數。另外,異步模式下,會對有壓縮的消息先做copy。
源碼清單-12
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
官方文檔中有這樣一張圖,十分清晰的描述了異步通信的詳細過程:
圖3
3.2 二階段發送
源碼清單-3體現了本地事務的執行,localTransactionState將本地事務執行結果與事務消息二階段的發送關聯起來。
值得注意的是,如果一階段的發送結果是SLAVE_NOT_AVAILABLE,即備broker不可用時,也會將localTransactionState置為Rollback,此時將不會執行本地事務。之後由endTransaction方法負責二階段提交,見源碼清單-4。具體到endTransaction的實現:
源碼清單-13
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 採用oneway的方式發送二階段消息
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
在二階段發送時,之所以用oneway的方式發送,個人理解這正是因為事務消息有一個特殊的可靠機制——回查。
3.3 消息回覆
當Broker經過了一個特定的時間,發現依然沒有得到事務消息的二階段是否要提交或者回滾的確切信息,Broker不知道Producer發生了什麼情況(可能producer掛了,也可能producer發了commit但網絡抖動丟了,也可能...),於是主動發起回查。
事務消息的回查機制,更多的是在broker端的體現。RocketMQ的broker以Half消息、Op消息、真實消息三個不同的topic來將不同發送階段的事務消息進行了隔離,使得Consumer只能看到最終確認commit需要投遞出去的消息。其中詳細的實現邏輯在本文中暫不多贅述,後續可另開一篇專門來從Broker視角來解讀。
回到Producer的視角,當收到了Broker的回查請求,Producer將根據消息檢查本地事務狀態,根據結果決定提交或回滾,這就要求Producer必須指定回查實現,以備不時之需。
當然,正常情況下,並不推薦主動發送UNKNOW狀態,這個狀態毫無疑問會給broker帶來額外回查開銷,只在出現不可預知的異常情況時才啟動回查機制,是一種比較合理的選擇。
另外,4.7.1版本的事務回查並非無限回查,而是最多回查15次:
源碼清單-14
/**
* The maximum number of times the message was checked, if exceed this value, this message will be discarded.
*/
@ImportantField
private int transactionCheckMax = 15;
附錄
官方給出Producer的默認參數如下(其中超時時長的參數,在前文中也已經提到,debug的結果是默認3000ms,並非10000ms):
圖4
RocketMQ作為一款優秀的開源消息中間件,有很多開發者基於它做了二次開發,例如螞蟻集團商業化產品SOFAStack MQ消息隊列,就是基於RocketMQ內核進行的再次開發的金融級消息中間件,在消息管控、透明運維等方面做了大量優秀的工作。
願RocketMQ在社區廣大開發者的共創共建之下,能夠不斷髮展壯大,迸發更強的生命力。
我們是阿里雲智能全球技術服務-SRE團隊,我們致力成為一個以技術為基礎、面向服務、保障業務系統高可用的工程師團隊;提供專業、體系化的SRE服務,幫助廣大客戶更好地使用雲、基於雲構建更加穩定可靠的業務系統,提升業務穩定性。我們期望能夠分享更多幫助企業客戶上雲、用好雲,讓客戶雲上業務運行更加穩定可靠的技術,您可用釘釘掃描下方二維碼,加入阿里雲SRE技術學院釘釘圈子,和更多雲上人交流關於雲平臺的那些事。