契胡是哪个族:MINA源码分析的札记

来源:百度文库 编辑:偶看新闻 时间:2024/06/03 04:58:09
2010-11-13

MINA源码分析的札记1--Write流程

    博客分类:
  • Java
Mina数据结构

从IoSession调用write的过程:

IoSession.write(object message)

真正实现这个方法的是AbstractIoSession

 1、创建writeFuture对象,用于异步操作的返回

 2、将传入的Object对象,包装成WriteRequest对象,交给IoFilterChain去处理。

 3、核心的实现就是这些代码:

Java代码  
  1. // Now, we can write the message. First, create a future  
  2. WriteFuture writeFuture = new DefaultWriteFuture(this);  
  3. WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);  
  4.   
  5. // Then, get the chain and inject the WriteRequest into it  
  6. IoFilterChain filterChain = getFilterChain();  
  7. filterChain.fireFilterWrite(writeRequest);  
  8.   
  9. //.....  
  10. return writeFuture;  

 

 

下面看IoFilterChain是怎么处理的?

首先,IoFilterChain是用一个双向列表来保存它所包含的所有filter的,大概的结构如下:

head <-> filter1 <-> filter2 <-> ..... <-> filterN <-> tail

其中head和tail是两个固有、内置、特殊的filter,主要是衔接和过渡的功能。如tail就是负责调用IoHandler的功能。

回到IoFilterChain怎么处理write的话题。

4、IoFilterChain从列表中找到tail,从tail开始查找filter,顺序调用每个filter的filterWrite()方法

    典型实现:

Java代码  
  1. public void fireFilterWrite(WriteRequest writeRequest) {  
  2.     Entry tail = this.tail;  
  3.     callPreviousFilterWrite(tail, session, writeRequest);  
  4. }  
  5.   
  6. private void callPreviousFilterWrite(Entry entry, IoSession session,  
  7.         WriteRequest writeRequest) {  
  8.     try {  
  9.         IoFilter filter = entry.getFilter();  
  10.         NextFilter nextFilter = entry.getNextFilter();  
  11.         filter.filterWrite(nextFilter, session, writeRequest);  
  12.     } catch (Throwable e) {  
  13.         writeRequest.getFuture().setException(e);  
  14.         fireExceptionCaught(e);  
  15.     }  
  16. }  

 

5、最后必然就掉到head这个filter的filterWrite,它的实现应该有些特殊,才能把消息发送给IoProcessor

     将消息放入发送队列中,然后调用IoProcessor flush出去。

 

Java代码  
  1. public void filterWrite(NextFilter nextFilter, IoSession session,  
  2.                WriteRequest writeRequest) throws Exception {  
  3.   
  4.            AbstractIoSession s = (AbstractIoSession) session;  
  5.   
  6.            s.getWriteRequestQueue().offer(s, writeRequest);  
  7.            if (!s.isWriteSuspended()) {  
  8.                s.getProcessor().flush(s);  
  9.            }  
  10.        }  

WriteRequestQueue的默认实现就是java.util.concurrent.ConcurrentLinkedQueue,舍去传入的session对象。 

 

 

 在看processor怎么flush出去之前,先备忘下面的东西。

NOTE1:在这个filter传递过程中,IoSession一直被传递下去:

Java代码  
  1. filterWrite(NextFilter nextFilter, IoSession session,  
  2.                 WriteRequest writeRequest)  

 NOTE2:其实head这个filter就只实现了两个方法,也就是它只关心这两个操作:

Java代码  
  1. private class HeadFilter extends IoFilterAdapter {  
  2.      @SuppressWarnings("unchecked")  
  3.      @Override  
  4.      public void filterWrite(NextFilter nextFilter, IoSession session,  
  5.              WriteRequest writeRequest) throws Exception {        }  
  6.   
  7.      @SuppressWarnings("unchecked")  
  8.      @Override  
  9.      public void filterClose(NextFilter nextFilter, IoSession session)  
  10.              throws Exception {        }  
  11.  }  

 

6、IoProcessor是怎么处理的?

IoProcessor的flush方法在AbstractPollingIoProcessor类中实现。

它将传入的session对象加入到flushingSessions列表,

然后调用processor内在的selector.wakeUp (java.nio.channels.Selector),

而在AbstractPollingIoProcessor类中独立运行的Processor线程,就回从select()等待中被唤醒。

 

7、AbstractPollingIoProcessor.Processor线程

被唤醒之后,从flushingSessions中取出session对象,再从session中取出WriteRequest对象

如果WriteRequest对象是IoBuffer类型的,则:

Java代码  
  1. if (buf.remaining() <= length) {  
  2.             return session.getChannel().write(buf.buf());  
  3.         }  

如果WriteRequest对象是File对象,则:

Java代码  
  1. region.getFileChannel().transferTo(  
  2.                     region.getPosition(), length, session.getChannel());  

  region是FileRegion对象。就是将文件内容传输到session的Channel上去。

【这部分省略了很多细节,具体了解细节的话,主要看AbstractPollingIoProcessor类和NioProcessor类】

 

8、自此,才将数据真正发送到网络上去。