一个虽复杂但可直接套用的线程池实例
继“多线程程序模型研究”文章发布后,最近又继续研究,推出一个比较复杂但功能比较完善,而且可以直接套用的线程池的实例,希望对使用多线程设计的读者有所帮助。
该实例来源于Apache项目源代码,源程序有800余行,功能比较全面,而且是非常完善的,并且运行于诸多服务器如tomcat上,就是分析起来有点繁琐。如果开发人员直接把这段程序拿来修改后使用到自己的开发项目中,不失为拿来主义的上策。
本文对该源程序进行了修改和简化,对其中核心部分进行分析,然后创建测试类进行测试。读者学习之后可以直接模仿和套用,而不需要花费大量时间自己亲自再去写线程池程序了。
关键的程序有2个:线程池类(ThreadPool)和线程池接口(ThreadPoolRunnable)。
首先看较复杂的线程池类程序,文件名为ThreadPool.java,有200余行,需要读者有耐心,内容如下:
—————————————————————————————
import java.util.Vector; public class ThreadPool { public static final int MAX_THREADS = 100; public static final int MAX_SPARE_THREADS = 50; public static final int MIN_SPARE_THREADS = 10; public static final int WORK_WAIT_TIMEOUT = 60 * 1000; protected Vector pool; protected MonitorRunnable monitor; protected int maxThreads; protected int minSpareThreads; protected int maxSpareThreads; protected int currentThreadCount; protected int currentThreadsBusy; protected boolean stopThePool; public ThreadPool() { maxThreads = MAX_THREADS; maxSpareThreads = MAX_SPARE_THREADS; minSpareThreads = MIN_SPARE_THREADS; currentThreadCount = 0; currentThreadsBusy = 0; stopThePool = false; } public synchronized void start() { adjustLimits(); openThreads(minSpareThreads); monitor = new MonitorRunnable(this); } public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } public int getMaxThreads() { return maxThreads; } public void setMinSpareThreads(int minSpareThreads) { this.minSpareThreads = minSpareThreads; } public int getMinSpareThreads() { return minSpareThreads; } public void setMaxSpareThreads(int maxSpareThreads) { this.maxSpareThreads = maxSpareThreads; } public int getMaxSpareThreads() { return maxSpareThreads; } public void runIt(ThreadPoolRunnable r) { if (null == r) { throw new NullPointerException(); } if (0 == currentThreadCount || stopThePool) { throw new IllegalStateException(); } ControlRunnable c = null; synchronized (this) { if (currentThreadsBusy == currentThreadCount) { if (currentThreadCount < maxThreads) { int toOpen = currentThreadCount + minSpareThreads; openThreads(toOpen); } else { while (currentThreadsBusy == currentThreadCount) { try { this.wait(); }catch (InterruptedException e) { } if (0 == currentThreadCount || stopThePool) { throw new IllegalStateException(); } } } } c = (ControlRunnable) pool.lastElement(); pool.removeElement(c); currentThreadsBusy++; } c.runIt(r); } public synchronized void shutdown() { if (!stopThePool) { stopThePool = true; monitor.terminate(); monitor = null; for (int i = 0; i < (currentThreadCount - currentThreadsBusy); i++) { try { ((ControlRunnable) (pool.elementAt(i))).terminate(); } catch (Throwable t) { } } currentThreadsBusy = currentThreadCount = 0; pool = null; notifyAll(); } } protected synchronized void checkSpareControllers() { if (stopThePool) { return; } if ((currentThreadCount - currentThreadsBusy) > maxSpareThreads) { int toFree = currentThreadCount - currentThreadsBusy - maxSpareThreads; for (int i = 0; i < toFree; i++) { ControlRunnable c = (ControlRunnable) pool.firstElement(); pool.removeElement(c); c.terminate(); currentThreadCount--; } } } protected synchronized void returnController(ControlRunnable c) { if (0 == currentThreadCount || stopThePool) { c.terminate(); return; } currentThreadsBusy--; pool.addElement(c); notify(); } protected synchronized void notifyThreadEnd() { currentThreadsBusy--; currentThreadCount--; notify(); openThreads(minSpareThreads); } protected void adjustLimits() { if (maxThreads <= 0) { maxThreads = MAX_THREADS; } if (maxSpareThreads >= maxThreads) { maxSpareThreads = maxThreads; } if (maxSpareThreads <= 0) { if (1 == maxThreads) { maxSpareThreads = 1; } else { maxSpareThreads = maxThreads / 2; } } if (minSpareThreads > maxSpareThreads) { minSpareThreads = maxSpareThreads; } if (minSpareThreads <= 0) { if (1 == maxSpareThreads) { minSpareThreads = 1; } else { minSpareThreads = maxSpareThreads / 2; } } } protected void openThreads(int toOpen) { if (toOpen > maxThreads) { toOpen = maxThreads; } if (0 == currentThreadCount) { pool = new Vector(toOpen); } for (int i = currentThreadCount; i < toOpen; i++) { pool.addElement(new ControlRunnable(this)); } currentThreadCount = toOpen; } class MonitorRunnable implements Runnable { ThreadPool p; Thread t; boolean shouldTerminate; MonitorRunnable(ThreadPool p) { shouldTerminate = false; this.p = p; t = new Thread(this); t.start(); } public void run() { while (true) { try { synchronized (this) { this.wait(WORK_WAIT_TIMEOUT); } if (shouldTerminate) { break; } p.checkSpareControllers(); } catch (Throwable t) { t.printStackTrace(); } } } public synchronized void terminate() { shouldTerminate = true; this.notify(); } } class ControlRunnable implements Runnable { ThreadPool p; Thread t; ThreadPoolRunnable toRun; boolean shouldTerminate; boolean shouldRun; boolean noThData; Object thData[] = null; ControlRunnable(ThreadPool p) { toRun = null; shouldTerminate = false; shouldRun = false; this.p = p; t = new Thread(this); t.start(); noThData = true; thData = null; } public void run() { while (true) { try { synchronized (this) { if (!shouldRun && !shouldTerminate) { this.wait(); } } if (shouldTerminate) { break; } try { if (noThData) { thData = toRun.getInitData(); noThData = false; } if (shouldRun) { toRun.runIt(thData); } } catch (Throwable t) { System.err.println("ControlRunnable Throwable: "); t.printStackTrace(); shouldTerminate = true; shouldRun = false; p.notifyThreadEnd(); } finally { if (shouldRun) { shouldRun = false; p.returnController(this); } } if (shouldTerminate) { break; } } catch (InterruptedException ie) { } } } public synchronized void runIt(ThreadPoolRunnable toRun) { if (toRun == null) { throw new NullPointerException("No Runnable"); } this.toRun = toRun; shouldRun = true; this.notify(); } public synchronized void terminate() { shouldTerminate = true; this.notify(); } } }
—————————————————————————————
以上程序中,关键的是openThreads方法、runIt方法以及2个内部类:MonitorRunnable和ControlRunnable。
刚开始运行的时候,线程池会往Vector对象里装入minSpareThreads个元素,每个元素都是ControlRunnable线程类,ControlRunnable类在其构造方法中启动线程。如果shouldRun和shouldTerminate都是false的话,线程就等待。如果shouldRun为true,就调用ThreadPoolRunnable的runIt(Object[])方法,该接口的方法就是我们需要在自己的任务类中覆盖的方法。
如果minSpareThreads个线程都处于Busy后,线程池会再创建出minSpareThreads个线程。MonitorRunnable是用来监视线程池运行情况的,其线程间隔60秒(WORK_WAIT_TIMEOUT)调用一次线程池类的checkSpareControllers方法,如果发现(currentThreadCount – currentThreadsBusy) > maxSpareThreads,就会调用ControlRunnable类的terminate方法删除空闲线程,准备删除的线程是否空闲是通过shouldTerminate参数来判断的。
线程池接口ThreadPoolRunnable有2个空方法getInitData和runIt,我们一般自己创建一个任务类实现这个线程池接口就可以了,把具体的任务内容放在任务类的runIt方法中。如果不想用getInitData,就让它返回null值。
线程池接口程序很简单,文件名为ThreadPoolRunnable.java,就几行,内容如下:
———————————————————————————————
public interface ThreadPoolRunnable { public Object[] getInitData(); public void runIt(Object thData[]); }
———————————————————————————————
线程池类和线程池接口都已经说完,下面就举个例子说说怎么使用它们了。
我们的任务还是扫描端口(请参考我的“多线程程序模型研究”),文件名为TestThreadPool.java,内容如下:
———————————————————————————————
import java.net.InetAddress; import java.net.Socket; public class TestThreadPool { public static void main(String[] args) { String host = null; //第一个参数,目标主机。 int beginport = 1; //第二个参数,开始端口。 int endport = 65535; //第三个参数,结束端口。 try{ host = args[0]; beginport = Integer.parseInt(args[1]); endport = Integer.parseInt(args[2]); if(beginport <= 0 || endport >= 65536 || beginport > endport){ throw new Exception("Port is illegal"); } }catch(Exception e){ System.out.println("Usage: java PortScannerSingleThread host beginport endport"); System.exit(0); } ThreadPool tp = new ThreadPool(); tp.setMaxThreads(100); tp.setMaxSpareThreads(50); tp.setMinSpareThreads(10); tp.start(); for(int i = beginport; i <= endport; i++){ Task task = new Task(host,i); tp.runIt(task); } } } class Task implements ThreadPoolRunnable{ String host; int port; Task(String host, int port){ this.host = host; this.port = port; } public Object[] getInitData(){ return null; } public void runIt(Object thData[]){ Socket s = null; try{ s = new Socket(InetAddress.getByName(host),port); System.out.println("The port " + port + " at " + host + " is open."); }catch(Exception e){ }finally{ try{ if(s != null) s.close(); }catch(Exception e){ } } } }
———————————————————————————————–
在TestThreadPool类的main方法中定义了三个参数,分别是目标主机IP地址,开始端口和结束端口。然后通过new ThreadPool()创建线程池类,并通过setMaxThreads、setMaxSpareThreads和setMinSpareThreads设置线程池的maxThreads、maxSpareThreads和minSpareThreads参数。Task类通过实现ThreadPoolRunnable接口,在runIt中定义了具体内容(创建Socket对象达到扫描端口的目的)。
以上程序都在JDK1.4.2的环境下编译并运行通过,输入 java TestThreadPool 10.1.1.182 1 10000 运行得出如下结果:
The port 25 at 10.1.1.182 is open.
The port 110 at 10.1.1.182 is open.
The port 135 at 10.1.1.182 is open.
…
通过以上的线程池类、线程池接口的分析和介绍,读者可以在理解的基础上,直接把这个线程池类和接口拿来,应用到自己的开发项目中。
转载请注明来源:一个虽复杂但可直接套用的线程池实例