大數據

如何解決 Netty Channel.isWritable 返回 false

在 Netty 裡,有4個方法用來查詢 Channel 的狀態:isOpen,isRegistered,isActive,isWritable,其中,isWritable 在併發量很高時會返回很多 false。

isWritable 是什麼含義?

isWritable:Returns true if and only if the I/O thread will perform the requested write operation immediately. Any write requests made when this method returns false are queued until the I/O thread is ready to process the queued write requests.

為什麼 isWritable 會返回 false?而且這個問題在 stackoverflow,netty 社區也被很多人問到。

i.e.

if (channel.isWritable()) {
    channel.writeAndFlush(data);
}

 

首先,說下 netty 處理 writeAndFlush 的原理:

1、業務線程調用 writeAndFlush 發送消息,會生成 WriteAndFlushTask,交由 IO 線程處理,write 操作將消息寫入 ChannelOutboundBuffer(不會寫到 socket),flush 操作將 ChannelOutboundBuffer 寫 入socket 的發送緩衝區;(這裡注意,writeAndFlush 它只是一個語法糖,意味著這不是原子操作,因此在此方法執行的中間,可能在多個線程之間進行了上下文切換。)

2、ChannelOutboundBuffer 它配置一個高水位線和低水位線,當 buffer 的大小超過高水位線的時候對應 channel 的 isWritable 就會變成 false,當 buffer 的大小低於低水位線的時候,isWritable 就會變成 true。

其中,高水位線和低水位線是字節數,默認高水位是64K,低水位是32K,通過以下方式進行設置:

.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 64 * 1024)
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024)

 

如何解決這個問題?

ChannelOutboundBuffer 的容量過高或過低時都會觸發 fireChannelWritabilityChanged 方法,因此可通過重寫 channelWritabilityChanged 方法調整消息產生速度。

在常用的中間件裡,我們看看它們是如何處理的:

1、Notify 在給訂閱組投遞消息時,先檢查此訂閱組的 Channel 是否超過最高位,如果是,則此次不投遞,如果不是,繼續投遞。

2、Flink 核心發送方法中如果 Channel 不可寫,則會跳過發送,當 Channel 再次可寫後,Netty 會調用該 Handle 的 ChannelWritabilityChanged 方法,從而重新觸發發送函數。

3、Duboo 發送請求時,判斷是否已經關閉的 Channel,如果是,不再放入連接池,重新申請連接。

總之,在使用 Channel 寫數據之前,建議使用 isWritable 方法來判斷一下當前 ChannelOutboundBuffer 裡的寫緩存水位,防止 OOM 發生。

Leave a Reply

Your email address will not be published. Required fields are marked *