`

Java多线程基础总结九:Mina窥探

阅读更多

   一直以来的多线程的基础总结都是脱离应用的,但是要说多线程的应用就不能不说Mina。Apache Mina作为一个高性能的Java异步并发网 络通讯框架,其内部的多线程的设计和实现可谓是学习多线程的良药。手上的Mina源码是svn剪下来的最新的代码,mvn转化成eclipse项目 后导入mina-core的源码看看多线程的应用吧。
   首先简单的介绍在org.apache.mina.core.service包里的核心接口之一:IoService。这个接口是对于服务器端接收连接和客户端发起连 接这两种服务的顶层抽象,所以就有了IoAcceptor和IoConnector两个子接口的继承与隔离。很拍马屁的说,从这个小细节可以看到mina架 构的精细。这种程度的接口的隔离最重要的就是对接口内抽象行为的准确划分,错误的划分接口的职责将使得后面得实现显得不合理甚至是 错误。只是不知道负责抽象的设计人员是否是一次性的抽象成功,如果是只能说牛x。至于交互会话IoSession和实际的I/O操作处理器 IoProcessor 以及底层处理I/O事件的IoHandle这些接口就不废话了,今天看的是与IoServiceListener有关的多线程应用。 IoServiceListener主要是用来监听IoService相关的事件,而今日主角--IoServiceListenerSupport则是用来把IoService和对应的 IoServiceListener包装在一起进行管理的辅助类。先看看其源码:

public class IoServiceListenerSupport {
   /** The {@link IoService} that this instance manages. */
   private final IoService service;

   /** A list of {@link IoServiceListener}s. */
   private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener> ();

   /** Tracks managed sessions. */
   private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long,  IoSession>();

   /** Read only version of {@link #managedSessions}. */
   private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap (managedSessions);

   private final AtomicBoolean activated = new AtomicBoolean();

   /** Time this listenerSupport has been activated */
   private volatile long activationTime;

   /** A counter used to store the maximum sessions we managed since the listenerSupport has been  activated */
   private volatile int largestManagedSessionCount = 0;

   /** A global counter to count the number of sessions managed since the start */
   private volatile long cumulativeManagedSessionCount = 0;

   /**
    * Adds a new listener.
    *
    * @param listener The added listener
    */
   public void add(IoServiceListener listener) {
     if (listener != null) {
       listeners.add(listener);
     }
   }

   /**
    * @return true if the instance is active
    */
   public boolean isActive() {
     return activated.get();
   }

   /**
    * Calls {@link IoServiceListener#serviceActivated(IoService)}
    * for all registered listeners.
    */
   public void fireServiceActivated() {
     if (!activated.compareAndSet(false, true)) {
       // The instance is already active
       return;
     }

     activationTime = System.currentTimeMillis();

     // Activate all the listeners now
     for (IoServiceListener listener : listeners) {
       try {
         listener.serviceActivated(service);
       } catch (Throwable e) {
         ExceptionMonitor.getInstance().exceptionCaught(e);
       }
     }
   }

   /**
    * Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners.
    *
    * @param session The session which has been created
    */
   public void fireSessionCreated(IoSession session) {
     boolean firstSession = false;

     if (session.getService() instanceof IoConnector) {
       synchronized (managedSessions) {
         firstSession = managedSessions.isEmpty();
       }
     }
   ...

     cumulativeManagedSessionCount ++;
   ...
   }
}


   这里为了说明多线程的应用,我仅仅节选相关的方法和方法片段。从这个类中我们还是能看到丰富的多线程的应用的。下面就开始这盘 菜吧!
   首先从全局变量开始看看:CopyOnWriteArrayList,ConcurrentMap,AtomicBoolean,volatile堪称多线程小动员了。后两者之前都有 介绍暂时省略,后面的方法分析时会看到使用情景。前两者都是java.util.concurrent包下的并发集合框架成员。ConcurrentMap是高效的 并发Map实现,主要是采取分段加锁的机制,默认是16段锁,所以多线程的竞争大大的降低。只有key的hash 分布在同一段位上线程之间存 在竞争。
   而CopyOnWriteArrayList是ArrayList的一个线程安全的变体,变态之处在于对其的所有的可变操作都是对底层的数组的进行的一次新的 复制。看看其实现原来是使用ReentrantLock来保证可变时的线程安全,又多了一个多线程的成员啊。当然使用这个并发集合实现是需要特 殊的情况的(要知道它的每次的可变都是全盘复制,这意味着很大的性能成本):如果遍历集合的操作次数大大的超过了可变操作时,这时 候它的性能优势就体现出来了。因为内部的数组使用的是volatile的,所以遍历查找时都不用同步而能保证可见性,这是锁同步无法比拟的 。要说mina重视性能从这儿可见一斑,毕竟每个全局变量都是为了性能去选择相应的同步机制,而不是synchronized通吃天下。
   IoServiceListenerSupport的 fireServiceActivated(),fireServiceDeactivated(),fireSessionCreated(), fireSessionDestroyed() 方法均有遍历listeners的操作,如果这些方法在线程之间频繁的使用的话,那么无疑使用CopyOnWriteArrayList 是个很好的解决方案。managedSessions主要是管理IoSession的,需要使用并发Map的数据结构,那么ConcurrentMap无疑已被证明是相对出 色的并发Map。activated类似一个开关的设计,看看为什么使用无锁得AtomicBoolean?

fireServiceActivated() {
     if (!activated.compareAndSet(false, true)) {
       // The instance is already active
       return;
     }
     ...
}

  这里可以看到activated的状态改变是要依赖其原来的值得,也就是如果使用volatile的话,要判断之前的是否为false,不满足则置为 true。这样的操作对于volatile只能使用锁同步实现。而AtomicBoolean的“CAS”轻松的使用无锁同步原语解决了这个问题。  compareAndSet(...)不管有多少个线程执行,只有取得activated最新值得线程才能返回true,其余的都会是false。这就看到mina的 committer还是很纠结性能的。
  然后看看那几个volatile的变量吧。首先是activationTime,这个变量除了有自身的get()可以随时取得最新的值之外,就在 fireServiceActivated()内出现了:activationTime = System.currentTimeMillis();可以看到仅仅是一次性的赋值而且不依赖其自身的值 。所以完全的满足线程安全的条件。 largestManagedSessionCount的使用和其类似。
  再来看看cumulativeManagedSessionCount,它是一个全局的计数器,负责记录启动后所管理得会话数量。除了同样有get()方法之外就 是出现在fireSessionCreated(...)方法中:cumulativeManagedSessionCount ++;这引起了我的注意,因为这样的写法是不能保证线程同步 的!因为volatile的变量根本无法完成“++”的原子操作。“++”是需要依赖其自身的值而进行更改的操作。为此我简单的写了个验证的例 子证明了这个铁律。
package thread;

public class VolatileTest1 {

	public static void main(String[] args) {
		final VolatileSample1 sample = new VolatileSample1();

		Runnable runnable = new Runnable() {
			public void run() {
				for (int i = 0; i < 500; i++) {
					System.out.println(sample.incrementAndGet());
				}
			}
		};

		for (int i = 0; i < 50; i++) {
			new Thread(runnable).start();
		}
	}
}

class VolatileSample1 {
	private volatile long counter = 0;

	public long incrementAndGet() {
		counter++;
		return get();
	}

	public long get() {
		return counter;
	}

}

 
  使用50个线程每个线程执行500次对counter进行叠加。正确的答案应该是25000,但是很不幸的是仅仅测试几次就出现了24999和其他的 小于25000的最大值。所以很遗憾的发现mina的这个变量的方案的选择是有问题的!
  这里应该是使用的是AtomicLong,而对应的fireServiceActivated(...)内的代码如果不使用API提供的话应该是:

for (;;) {
      long current = cumulativeManagedSessionCount.get();
      long next = current + 1;
      if (compareAndSet(current, next))
        break;
   }

虽然是for循环+CAS的atomic杀手锏,但是不要怕,一般for循环cpu都是一次搞定,极少情况是多次,性能不会有什么明显影响。这个 bug可能在数量较少的线程的情况下很难显现,或者是mina的开发者考虑使用情景故意的宽松线程机制?从严谨性来看是我个人认为是不对 的。
最后就是看看一个有趣的synchronized的代码片段:
/**
    * Close all the sessions
    * TODO disconnectSessions.
    *
    */
   private void disconnectSessions() {
     Object lock = new Object();
     ...
     try {
       synchronized (lock) {
         while (!managedSessions.isEmpty()) {
           lock.wait(500);
         }
       }
     } catch (InterruptedException ie) {
       // Ignored
     }
   }


   这个方法是关闭所有的会话,但是为了减少线程锁的竞争使用了一个很可爱的方式。这里的lock是个局部变量,所以每个进入这个方法 的线程都会拥有一个lock对象,而      synchronized(lock)是迷惑的重点。这个synchronized不会使得进入方法的线程们产生任何的竞争,因为 每个线程都能获得属于自己的lock。synchronized的作用就在于lock.wait(500)的调用,就因为wait方法必须要 synchronized的配合,所 以就出现了这个可爱的代码。这段代码的主要意图就是不管几个线程去关闭所有会话,每个线程都是间隔500ms去检查 managedSessions是 否为空。这里没有对managedSessions使用synchronized的原因就是为了减少线程锁对 managedSessions的资源独占,而改用while循环的机 制宁愿等待也不为了检查managedSessions而影响其他线程的工作。很精巧的一个实现!
   写到这里,算是这个小菜的结尾了,总感觉意犹未尽。mina对高性能并发的目标还是从代码上得到了一些的体现。当然它的高性能主要 是对NIO的封装使用,不过这一切都是建立在多线程的并发基础上的。至于之前无意发现的bug(个人认为)如果mina的committer也有相同的 认知的话,我想以后会有相应的修改的。


ref:http://www.bianceng.cn/Programming/Java/201206/34157_3.htm
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics