開發與維運

netty系列之:netty中的Channel詳解

簡介

Channel是連接ByteBuf和Event的橋樑,netty中的Channel提供了統一的API,通過這種統一的API,netty可以輕鬆的對接多種傳輸類型,如OIO,NIO等。今天本文將會介紹Channel的使用和Channel相關的一些概念。

Channel詳解

Channel是什麼? Channel是一個連接網絡輸入和IO處理的橋樑。你可以通過Channel來判斷當前的狀態,是open還是connected,還可以判斷當前Channel支持的IO操作,還可以使用ChannelPipeline對Channel中的消息進行處理。

先看下Channel的定義:

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {

可以看到Channel是一個接口,它繼承了AttributeMap, ChannelOutboundInvoker, Comparable三個類。Comparable表示這個類可以用來做比較。AttributeMap用來存儲Channel的各種屬性。ChannelOutboundInvoker主要負責Channel和外部 SocketAddress 進行連接和對寫。

再看下channel中定義的方法:

可以看出channel中定義的方法是多種多樣的,這些方法都有些什麼特點呢?接下來一一為您講解。

異步IO和ChannelFuture

netty中所有的IO都是異步IO,也就是說所有的IO都是立即返回的,返回的時候,IO可能還沒有結束,所以需要返回一個ChannelFuture,當IO有結果之後,會去通知ChannelFuture,這樣就可以取出結果了。

ChannelFuture是java.util.concurrent.Future的子類,它除了可以拿到線程的執行結果之外,還對其進行了擴展,加入了當前任務狀態判斷、等待任務執行和添加listener的功能。

其他的功能都很好理解,它的突破在於可以對ChannelFuture添加listener,我們列出一個添加listener的方法:

Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

添加的Listener會在future執行結束之後,被通知。不需要自己再去調用get等待future結束。這裡實際上就是異步IO概念的實現,不需要主動去調用,當你完成之後來通知我就行。非常的美好!

ChannelFuture 有兩個狀態:uncompleted或者completed,分別代表任務的執行狀態。

當一個IO剛開始的時候,返回一個ChannelFuture對象,這個對象的初始狀態是uncompleted。注意,這個狀態下的IO是還未開始工作的狀態。當IO完成之後,不管是succeeded, failed 或者 cancelled狀態,ChannelFuture的狀態都會轉換成為completed。

下圖展示的是ChannelFuture狀態和IO狀態的對應圖:

+---------------------------+
                                    | Completed successfully    |
                                    +---------------------------+
                               +---->      isDone() = true      |

+————————–+ | | isSuccess() = true |

| Uncompleted | | +=+
+————————–+ | | Completed with failure |
| isDone() = false | | +—————————+
| isSuccess() = false |—-+—-> isDone() = true |
| isCancelled() = false | | | cause() = non-null |
| cause() = null | | +
=+

+————————–+ | | Completed by cancellation |

| +—————————+

+—-> isDone() = true |

| isCancelled() = true |

+—————————+

如果要監控IO的狀態,可以使用上面我們提到的 addListener 方法,為ChannelFuture添加一個ChannelFutureListener。

如果要等待IO執行完畢,還有一個await()方法,但是這個方法會去等待IO執行完畢,是一個同步的方法,所以並不推薦。

相比而言,addListener(GenericFutureListener)是一個非阻塞的異步方法,將會把一個ChannelFutureListener添加到ChannelFuture中,當IO結束之後會自動通知ChannelFutureListener,非常好用。

對於處理IO操作的ChannelHandler來說,為了避免IO的阻塞,一定不要在ChannelHandler的IO方法中調用await(),這樣有可能會導致ChannelHandler因為IO阻塞導致性能下降。

下面舉兩個例子,一個是錯誤的操作,一個是正確的操作:

// 錯誤操作
    @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ChannelFuture future = ctx.channel().close();
       future.awaitUninterruptibly();
       // 調用其他邏輯
   }
   // 正確操作
    @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ChannelFuture future = ctx.channel().close();
       future.addListener(new ChannelFutureListener() {
           public void operationComplete(ChannelFuture future) {
               // 調用其他邏輯
           }
       });
   }

大家可以對比下上面兩種寫法的區別。

另外要注意的是ChannelFuture中的這些await方法比如:await(long), await(long, TimeUnit), awaitUninterruptibly(long), 或者 awaitUninterruptibly(long, TimeUnit)可以帶一個過期時間,大家要注意的是這個過期時間是等待IO執行的時間,並不是IO的timeout時間,也就是說當await超時之後,IO還有可能沒有執行完成,這就導致了下面的代碼有可能報錯:

Bootstrap b = ...;
   ChannelFuture f = b.connect(...);
   f.awaitUninterruptibly(10, TimeUnit.SECONDS);
   if (f.isCancelled()) {
       // 用戶取消了Channel
   } else if (!f.isSuccess()) {
       // 這裡可能會報異常,因為底層的IO可能還沒有執行完成
       f.cause().printStackTrace();
   } else {
       // 成功建立連接
   }

上面的代碼可以改成下面的例子:

Bootstrap b = ...;
   // 配置連接timeout的時間
   b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
   ChannelFuture f = b.connect(...);
   f.awaitUninterruptibly();
   // 等待直到底層IO執行完畢
   assert f.isDone();
   if (f.isCancelled()) {
       // 用戶手動取消Channel
   } else if (!f.isSuccess()) {
       f.cause().printStackTrace();
   } else {
       // 成功建立連接
   }

Channel的層級結構

netty中的Channel是有層級結構的,通過parent屬性可獲取這種層級結構。parent獲取的對象和Channel的創建方式有關。比如如果是一個被ServerSocketChannel accepted的SocketChannel,那麼它的parent就是ServerSocketChannel。

釋放資源

和所有的IO一樣,Channel在用完之後也需要被釋放,需要調用close()或者close(ChannelPromise) 方法。

事件處理

channel負責建立連接,建立好的連接就可以用來處理事件ChannelEvent了,實際上ChannelEvent是由定義的一個個Channelhandler來處理的。而ChannelPipeline就是連接channel和channelhandler的橋樑。

我們將會下下一章詳細講解ChannelEvent、Channelhandler和ChannelPipeline的關聯關係,敬請期待。

總結

Channel在netty中是做為一個關鍵的通道而存在的,後面的Event和Handler是以channel為基礎運行的,所以說Channel就是netty的基礎,好了,今天的介紹到這裡就結束了,敬請期待後續的文章。

本文已收錄於 http://www.flydean.com/04-netty-channel/

最通俗的解讀,最深刻的乾貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!

歡迎關注我的公眾號:「程序那些事」,懂技術,更懂你!

Leave a Reply

Your email address will not be published. Required fields are marked *