讀流程
客戶端接入后,下面一步操作就是讀取客戶端傳輸過來的數據,這一節我們就來分析下服務端讀取客戶端數據流程。從前面分析來看,channel的事件輪詢、事件處理是在NioEventLoop的run方法中,從這里我們就很容易找我服務端讀流程的入口方法:processSelectedKeys()。
(資料圖片)
從processSelectedKeys()一直追蹤下去,可以看到OP_READ處理邏輯分支:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}可能你會比較奇怪:為什么OP_READ和OP_ACCEPT都會走這個分支?
OP_ACCEPT是NioServerSocketChannel處理的事件,而OP_READ是NioSocketChannel處理的事件,所以,雖然它們都走這個分支,但是channel類型確是不一樣的,即這里的unsafe類型也不一樣,一個是:NioMessageUnsafe,另一個是:NioSocketChannelUnsafe。NioServerSocketChannel負責監聽客戶端連接,當有客戶端連接進入時,對它來說就是有個讀入消息需要被處理。這里我們是處理client channle的OP_READ,所以,unsafe是NioSocketChannelUnsafe類型實例。
AbstractNioByteChannel.NioByteUnsafe#read方法代碼如下:
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 申請ByteBuf對象 byteBuf = allocHandle.allocate(allocator); //doReadBytes(byteBuf):將數據讀取到ByteBuf中 //lastBytesRead()將讀取的字節數設置到lastBytesRead allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; //觸發pipeline channelRead事件,將讀入數據ByteBuf傳入到handler中 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading());//判斷是否繼續讀取 allocHandle.readComplete(); //觸發pipeline channelReadComplete pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}這個方法刨除其它邏輯,關于客戶端數據處理邏輯主要包括3個步驟:
allocHandle.lastBytesRead(doReadBytes(byteBuf)):調用java api,從channel中讀取字節數據到ByteBuf緩存中;pipeline.fireChannelRead(byteBuf):觸發pipeline的channelRead事件,并將帶有讀入數據的ByteBuf通過參數傳入;pipeline.fireChannelReadComplete():觸發pipeline的channelReadComplete事件;事件傳播
調用pipeline的fireChannelRead()就可觸發channelRead事件在handler之間傳播,事件傳播這塊代碼比較繞,給人感覺不停的來回調用容易繞暈,下面通過圖可以更加直觀的看出調用流程,再配合代碼就很好理解了。
關鍵點就在于HandlerContext中提供了一個靜態方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg),第一個是在哪個handler上觸發事件,第二個參數就是數據本身,通過這個方法就可以指定在哪個handler上觸發channelRead事件。由于pipeline中的handler是被包裝成HandlerContext放入的,所以,可以通過handler()方法找到真正的handler對象進行觸發。
比如pipeline的fireChannelRead()就是觸發head的channelRead事件,如果處理完成需要把事件繼續傳播給下一個handler,就需要調用ctx.fireChannelRead(msg)方法即可,該方法中通過next屬性獲取到下一個節點,然后執行static invokeChannelRead(next, msg)這個方法就可以將事件傳播到下一個節點上。
pipeline.fireChannelRead(byteBuf)運行完成后會調用pipeline.fireChannelReadComplete()方法,觸發channelReadComplete事件,執行機制和channelRead事件一樣,就不再贅述。
搞清楚上面原理,就很容易理解
ctx.fireChannelRead()和ctx.pipeline().fireChannelRead()之間的區別了,避免誤用。
Pipeline線程模型
上面分析的都是常規模式,沒有給handler指定額外線程情況下channelRead和channelReadComplete傳播機制,大致如下圖:
先觸發channelRead事件,按照pipeline中順序依次觸發,當所有handler都觸發完后,再觸發channelReadComplete事件,按照pipeline中的順序依次觸發。這些所有流程采用的都是同步方式,在同一個線程中執行,這個線程就是channel注冊的NioEventLoop。
我們來看下static void invokeChannelRead()這個方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}在執行next.invokeChannelRead(m)方法前有個executor.inEventLoop()判斷,判斷當前執行線程是不是就是handler執行所需的線程。執行handler方法是不能隨便線程都可以去執行的,必須使用handler內部指定的executor線程執行器中執行才行。如下圖,也就是說紅色框框中的內容必須在executor線程執行器中執行,如果當前線程和handler執行線程不是同一個,就需要進行線程切換:則調用封裝成一個任務,提交到executor的任務隊列中讓其執行。
executor線程執行器是通過next.executor()方法獲取到的,從這個方法源碼中可以看到獲取邏輯:如果HandlerContext中executor有值則直接返回;否則返回channel注冊的NioEventLoop作為線程執行器。
在添加handler時可以指定一個EventGroup:pipeline.addLast( bizGroup, "handler2", new OtherTest02());,這樣,再把handler包裝成HandlerContext過程中會從這個EventGroup根據chooser選取策略獲得一個EventLoop賦值給executor。
所以,從上面分析,默認情況下handler都是在channel注冊的NioEventLoop線程中執行的,除非在addLast添加handloer時特別指定。
下面我們通過一個案例分析下pipeline線程模型,如下,給handler02添加一個額外的線程池:
EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( "handler01", new OtherTest01()); pipeline.addLast( bizGroup, "handler02", new OtherTest02()); pipeline.addLast( "handler03", new OtherTest03());}這時,channelRead和channelReadComplete事件觸發流程見下圖:
channelRead事件執行流程說明:
handler01的channelRead事件,本身當前線程和handler01是同一個線程,所以,直接調用handler#channelRead()方法;handler01#channelRead()方法執行完成后,事件繼續向下傳播,需要調用handler02#channelRead()方法,但是handler02執行線程并不是默認的channel的注冊線程,而是額外設置的biz線程,需要將調用包裝成一個任務提交到biz線程的任務隊列taskQueue中,然后直接返回;biz線程執行器內部線程會一直循環從taskQueue中獲取任務執行,這樣就完成了線程切換效果;當handler02#channelRead()方法執行完成后,需要執行handler03#channelRead(),它們又不在同一個線程中執行,這時有需要切換線程,所以會把handler03#channelRead()的調用封裝成一個任務提交到register eventLoop的taskQueue中,待其內部線程提取執行;下面再來看下channelReadComplete事件執行流程:
a1將任務提交給taskQueue任務隊列后直接返回了,而不是等其執行完成再返回;a1返回后,從源碼分析來看,會立即觸發channelReadComplete事件,涉及到線程切換,同理b1這里也是將handler02#channelReadComplete()調用封裝成任務放入到biz eventLoop的taskQueue中的,然后也直接返回了;這樣,biz eventLoop線程執行器taskQueue中就有兩個任務,會按照順序依次執行:先執行channelRead()調用,再執行channelReadComplete()調用;執行a3、b3時同理;總結
從上面可以看出,Pipeline中handler可以在不同線程間切換得到關鍵是:taskQueue。還要一點非常重要:handler線程池執行器默認使用的channel注冊的NioEventLoop這個,NioEventLoop采用的是單線程工作模式,同時還需要處理selector.select()事件輪詢,所以,handler里肯定不能有耗時、特別是IO阻塞等操作,不然卡在handler中,selector#select()執行不到,無法及時接收到客戶端傳送過來的數據。
關鍵詞:
責任編輯:Rex_29





