開發與維運

聊聊java中NIO的2.0版本AIO

一、IO的演進


在jdk1.4之前,java中的IO類庫實在是超級原始,很多我們現在熟知的概念都還沒有出現,比如說管道、緩衝區等等。正是由於這些等等原因,C語言和C++一直都是IO方面的首選。這是原始的IO方式,也叫作BIO,它的原理很簡單,我們使用一張圖來表示一下:

v2-97d3a5c1786fd53bcaf3c0d6d8da726c_1440w.jpg也就是說BIO時代,每次有一個客戶端連接進來的時候,都會有一個新的線程去處理,缺點顯而易見,如果連接比較多的時候,我們就要建立大量的線程去一一處理。

幾年之後,2002年,jdk1.4開始被正式發佈了,做出的一個巨大的改變就是新增了NIO包。它提供了很多異步的IO操作方法,比如說緩衝區ByteBuffer、Pipe、Channel還有多路複用器Selector等等。新的NIO類庫的出現,極大地促進了java對異步非阻塞式編程的發展。NIO的原理也是很簡單。在這裡同樣使用一張圖來演示一遍:

v2-ceab78ec37e55d5cc79047e2e689297d_1440w.jpg

現在我們可以看到,所有的客戶端連接都可以只用一個線程就可以實現了。

不過時代總是在一點一點的變化,逐漸的java官方為我們提供的NIO類庫越來越不能滿足需求,比如說不支持異步文件讀寫操作、沒有統一的文件屬性等等。於是過了幾年,在2011年7月28日,官方將用了將近十年的NIO類庫做了升級,也被稱為NIO2.0。後來也叫作AIO。AIO的原理是在之前的基礎上進行的改進,意思是異步非阻塞式IO,也就是說你的客戶端在進行讀寫操作的時候,只需要給服務器發送一個請求,不用一直等待回答就可以去做其他的事了。

下面我們使用代碼敲一遍來看看如何實現AIO。

二、AIO的實現


這個案例很簡單,就是服務端和客戶端一個簡單的通信。我們先把代碼寫好,然後再去分析代碼的含義。

1、服務端


第一步:定義Server啟動類

class AioServer{
    public static void main(String[] args){
        new AioServerHandle().start();   
        try {
            Thread.sleep(10000000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 }

在這裡我們定義了一個AioServerHandle線程去處理服務器端的邏輯,在這裡我們還休眠了很長時間,這是為了避免沒有客戶端連接時,程序運行結束。現在我們最主要的就是AioServerHandle的代碼邏輯了。

第二步:AioServerHandle類實現

public class AioServerHandle extends Thread {
    AsynchronousServerSocketChannel serverSocketChannel;
    public AioServerHandle() {
        try {
            serverSocketChannel = AsynchronousServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8888));    
            System.out.println("服務端初始化成功");
        } catch (IOException e) {
            e.printStackTrace();
        }         
    }
    @Override
    public void run() {
        serverSocketChannel.accept(this, new AcceptCompleteHandler(serverSocketChannel));
        try {
            Thread.sleep(100000000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

我們分析一下這段代碼,首先我們定義了一個AsynchronousServerSocketChannel,他表示的就是異步的ServerSocketChannel。然後我們在構造方法中打開連接,綁定地址和端口。最後再run方法中new了一個AcceptCompleteHandler來處理接入的客戶端。現在就像踢皮球一樣,真正的處理邏輯又給了新的類AcceptCompleteHandler,我們再來看。

第三步:AcceptCompleteHandler的實現


public class AcceptCompleteHandler implements CompletionHandler<AsynchronousSocketChannel, AioServerHandle> {
    //第一部分
    private AsynchronousServerSocketChannel serverSocketChannel;
    public AcceptCompleteHandler(AsynchronousServerSocketChannel serverSocketChannel) {
        this.serverSocketChannel = serverSocketChannel;
    }
    //第二部分
    @Override
    public void completed(final AsynchronousSocketChannel channel, AioServerHandle attachment) {
        //第二部分第一小節
        attachment.serverSocketChannel.accept(attachment, this);
        System.out.println("有客戶端鏈接進來");
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        //第二部分第二小節
        channel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                attachment.flip();
                byte[] bytes = new byte[attachment.remaining()];
                attachment.get(bytes);
                System.out.println("客戶端發送來的數據是:" + new String(bytes));
                
                String sendMsg = "服務端返回的數據:java的架構師技術棧";
                ByteBuffer writeBuffer = ByteBuffer.allocate(sendMsg.getBytes().length);
                writeBuffer.put(sendMsg.getBytes());
                writeBuffer.flip();
                channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer attachment) {
                        if (attachment.hasRemaining()) {
                            channel.write(attachment, attachment, this);
                        }
                    }
                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        try {
                            System.out.println("服務端出現寫數據異常");
                            channel.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            //第二部分第三小節
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    System.out.println("服務端讀取數據異常");
                    serverSocketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    //第三部分
    @Override
    public void failed(Throwable exc, AioServerHandle attachment) {
        System.out.println("服務端鏈接異常");
    }
}

第一部分:


通過構造方法來接受傳遞過來的AsynchronousServerSocketChannel。

第二部分第一小節:


serverSocketChannel繼續接受傳遞過來的客戶端,為什麼呢?因為調用了AsynchronousServerSocketChannel的accept方法之後,如果有新的客戶端連接進來,系統會回調我們的CompletionHandler得completed方法。但是一個AsynchronousServerSocketChannel往往能接受成千上萬個客戶端,所以在這裡繼續調用了Accept方法。以便於接受其他客戶端的鏈接。

第二部分第二小節:


channel.read方法讀取客戶端傳遞過來的數據,而且在內部還有一個channel.write方法,表示返回給客戶端的信息。代碼邏輯是一樣的。

第二部分第三小節:


在這裡表示讀取信息失敗,內部也有一個failed方法表示的就是寫入信息失敗。

第三部分:


這也是一個failed方法,表示的是鏈接客戶端失敗。

到這裡我們會看到,AIO的代碼邏輯很複雜,在這裡只是實現一個最簡單的通信例子就這麼麻煩,稍微增加點功能代碼邏輯會讓我們發瘋。不過為了保持代碼的完整性,我們還是要給出客戶端的實現。

2、客戶端


客戶端的實現就比較簡單了。

第一步:創建客戶端入口類


class AioClient{
    public static void main(String[] args){
        new AioClientHandle().start();
        try {
            Thread.sleep(100000000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
     }
}

在這裡我們同樣使用一個AioClientHandle來處理客戶端的代碼邏輯,現在我們繼續看代碼。

第二步:AioClientHandle類實現:


public class AioClientHandle extends Thread implements CompletionHandler<Void, AioClientHandle> {
    private AsynchronousSocketChannel socketChannel;
    public AioClientHandle() {
        try {
            socketChannel = AsynchronousSocketChannel.open();
            System.out.println("客戶端初始化成功");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888), this, this);
    }
    @Override
    public void completed(Void result, AioClientHandle attachment) {
        System.out.println("client鏈接成功");
        String sendMsg = "我是:java的架構師技術棧,服務端你好";
        ByteBuffer writeBuffer = ByteBuffer.allocate(sendMsg.getBytes().length);
        writeBuffer.put(sendMsg.getBytes());
        writeBuffer.flip();
        socketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                if (attachment.hasRemaining()) {
                    socketChannel.write(writeBuffer, attachment, this);
                } else {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    socketChannel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer attachment) {
                            readBuffer.flip();
                            byte[] bytes = new byte[readBuffer.remaining()];
                            readBuffer.get(bytes);
                            System.out.println("客戶端讀取數據:" + new String(bytes));
                        }
                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {
                            try {
                                System.out.println("客戶端讀取數據失敗");
                                socketChannel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    System.out.println("客戶端寫數據失敗");
                    socketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    @Override
    public void failed(Throwable exc, AioClientHandle attachment) {
        try {
            System.out.println("客戶端出現異常");
            socketChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

這個代碼邏輯和服務端的差不多,在這裡就不說了。下面我們主要分析一下為什麼不用AIO。

三、AIO的缺點


上面BB了這麼久就是為了說明為什麼不使用他,你千萬別急,因為知己知彼才能百戰不殆。你只有理解了AIO才能知道工作中應該用什麼,

1、實現複雜


上面的代碼量你已經看到了,噁心到不能噁心。實現這麼一個簡單的功能就要寫這麼多。

2、需要額外的技能


也就是說你想要學號AIO,還需要java多線程的技術做鋪墊才可以。否則我們很難寫出質量高的代碼。

3、一個著名的Selector空輪詢bug


它會導致CPU100%,之前在我的群裡面,有人曾經遇到過這個問題,而且官方說在1.6的版本中解決,但是現在還有。遇到的時候我們雖然可以解決但是不知道的人會很痛苦。

4、可靠性差


也就是說我們的網絡狀態是複雜多樣的,會遇到各種各樣的問題,比如說網斷重連、緩存失效、半包讀寫等等。可靠性比較差。稍微出現一個問題,還需要大量的代碼去完善。

當然還有很多其他的缺點,不過就單單第一條估計就很難發展。後來出現了更加牛的網絡通信框架netty。很好的解決了上面的問題,也是目前最主流的框架。更多內容,在後續文章中推出。今天的文章先到這,感謝支持。

Leave a Reply

Your email address will not be published. Required fields are marked *