几种java线程池的实现算法分析


 

1.       前言

本文发表ä¸?/span>infoq,因版权属于个人顾再æ­?/span>转载ã€?/span>

在阅读研究线程池的源码以前,只知道如何使用,不了解其内部实现的具体细节,一直感觉是非常高深的技术,研究后才发现,线程池的实现是如此精巧。本文从技术角度分析了线程池的本质原理和组成,同时分析äº?/span>JDKã€?/span>Jetty6ã€?/span>Jetty8ã€?/span>Tomcat的源码实现,对于想了解线程池本质、更好的使用线程池或è€?/span>定制实现自己的线程池的业务场景具有一定指导意义ã€?/span>

 

2.       使用线程池的意义

l  复用:类ä¼?/span>WEB服务器等系统,长期来看内部需要使用大量的线程处理请求,而单次请求响应时间通常比较短,此时Java基于操作系统的本地调用方式大量的创建和销毁线程本身会成为系统的一个性能瓶颈和资源浪费。若使用线程池技æœ?/span>可以实现工作线程的复用,即一个工作线程创建和销毁的生命周期期间内可以执行处理多个任务,从而总体上降低线程创建和销毁的频率和时间,提升了系统性能ã€?/span>

l  流控:服务器资源有限,超过服务器性能的过高并发设置反而成为系统的负担,造成CPU大量耗费于上下文切换、内存溢出等后果。通过线程池技æœ?/span>可以控制系统最大并发数和最大处理任务量,从而很好的实现流控,保证系统不至于崩溃ã€?/span>

l  功能ï¼?/span>JDK的线ç¨?span class=GramE>池实çŽ?/span>的非常灵活,并提供了å¾?/span>多功能,一些场景基于功能的角度会选择使用线程池ã€?/span>

 

3.       线程池技æœ?/span>要点ï¼?/span>

从内部实现上看,线程池技æœ?/span>可主要划分为如下6个要点实现:

1线程池技æœ?/span>要点

l  工作者线ç¨?/span>worker:即线程池中可以重复利用起来执行任务的线程,一ä¸?/span>worker的生命周期内会不停的处理多个业务job。线程池“复用”的本质就是复用一ä¸?/span>worker去处理多ä¸?/span>jobï¼?span class=GramE>â€?/span>流控“的本质就是通过å¯?/span>worker数量的控制实现并发数的控制。通过设置不同的参数来控制worker的数量可以实现线程池的容量伸缩从而实现复杂的业务需æ±?/span>

l  待处理工ä½?/span>job的存储队列:工作者线ç¨?/span>workers的数量是有限的,同一时间最多只能处理最å¤?/span>workers数量ä¸?/span>job。对于来不及处理çš?/span>job需要保存到等待队列里,空闲的工作è€?/span>work会不停的读取空闲队列里的job进行处理。基于不同的队列实现,可以扩展出多种功能的线程池,如定制队列出队顺序实现带处ç?/span>优先级的线程池、定制队列为阻塞有界队列实现可阻塞能力的线程池等ã€?span class=GramE>流控一方面通过控制worker数控制并发数和处理能力,一方面可基于队列控制线ç¨?span class=GramE>池处ç?/span>能力的上限ã€?/span>

l  线程池初始化:即线程池参数的设定和多个工作è€?/span>workers的初始化。通常有一开始就初始化指定数量的workers或者有请求时逐步初始化工作者两种方式。前者线ç¨?span class=GramE>池启åŠ?/span>初期响应会比较快但造成了空载时的少量性能浪费,后者是基于请求量灵活扩容但牺牲了线ç¨?span class=GramE>池启åŠ?/span>初期性能达不到最优ã€?/span>

l  处理业务job算法:业务给线程池添加任åŠ?/span>job时线程池的处理算法。有的线程池基于算法识别直接处理job还是增加工作è€?span class=GramE>数处ç?/span>job或者放入待处理队列,也有的线程池会直接å°?/span>job放入待处理队列,等待工作è€?/span>worker去取出执行ã€?/span>

l  workers的增减算法:业务线程æ•?/span>不是持久不变的,有高低峰期。线程池要有自己的算法根据业务请求频率高低调节自身工作è€?/span>workers的数量来调节线程池大小,从而实现业务高峰期增加工作者数量提高响应速度,而业务低峰期减少工作者数来节省服务器资源。增加算法通常基于几个维度进行:待处理工作job数、线程池定义的最å¤?span class=GramE>最小工作者数、工作者闲置时间ã€?/span>

l  线程池终止逻辑:应用停止时线程池要有自身的停止逻辑,保证所æœ?/span>job都得到执行或者抛弃ã€?/span>

 

4.       几种线程池的实现细节

结合上面的技术点,列举几种线ç¨?span class=GramE>池实çŽ?/span>方式ã€?/span>

l  工作è€?/span>workers与待处理工作队列实现方式举例ï¼?/span>

实现

工作è€?/span>workers结构与并发保æŠ?/span>

待处理工作队列结æž?/span>

JDK

使用äº?/span>HashSet来存储工作è€?/span>workers,通过可重入锁ReentrantLock对其进行并发保护。每ä¸?/span>worker都是一ä¸?/span>Runnable接口ã€?/span>

使用了实现接å?/span>BlockingQueue的阻塞队列来存储待处理工ä½?/span>job,并把队列作为构造函数参数,从而实现业务可以灵活的扩展定制线程池的队列。业务也可使ç”?/span>JDK自身的同步阻塞队åˆ?/span>SynchronousQueue、有界队åˆ?/span>ArrayBlockingQueue、无界队åˆ?/span>LinkedBlockingQueue、优先级队列PriorityBlockingQueueã€?/span>

Jetty6

同样使用äº?/span>HashSet存储工作è€?/span>workers,通过synchronized一个对象进è¡?/span>HashSet的并发保护。每个工作者实际上是一ä¸?/span>Thread的扩展ã€?/span>

使用了数组存储待处理çš?/span>job对象Runnable。数组初始化容量ä¸?/span>_maxThreadsä¸?/span>,使用变é‡?/span>_queued计算保存当前内部待处ç?/span>job的个数即数组length。超过数组最大值时,扩å¤?/span>_maxThreadsä¸?/span>容量,因此数组永远够用够大,容量无界。同样是ç”?/span>synchronized一个对象的方式实现同步ã€?/span>

Jetty8

使用äº?/span>ConcurrentLinkedQueue存储工作è€?/span>workers,利ç”?/span>JDK基于CAS算法的实现提高了并发效率,同时也降低了线程池并发保护的复杂程度。针对队åˆ?/span>ConcurrentLinkedQueue无法保证size()实时性问题引入原子变é‡?/span>AtomicInteger统计工作者数量ã€?/span>

ä¸?/span>JDK相同实现,使用了基于接口BlockingQueue的阻塞队列来存储待处理工ä½?/span>job,也支持在线程池构造函数的参数中传入队列类型。同时,Jetty8内部默认未设置队列类型场景可自动设置使用2种队列:有界无法扩容çš?/span>ArrayBlockingQueueå?/span>Jetty自身定制扩展实现的可扩容队列BlockingArrayQueueã€?/span>

Tomcat

基于JDKçš?/span>ThreadPoolExecutors实现,复ç”?/span>JDK业务

复用JDK业务

l  线程池初始化与处理业åŠ?/span>job算法举例ï¼?/span>

实现

线程池构造与工作者初始化

处理业务job的算æ³?/span>

JDK

1.       基于多个构造参数实现灵活初始化,几个核心参数如下:

corePoolSize:核心工作者数

maximumPoolSize:最大工作者数

keepAliveTime:超过核心工作者数时闲置工作者的存活时间ã€?/span>

 workQueue:待处理job队列,即前面提到çš?/span>BlockingQueue接口ã€?/span>

2.       默认初始化后不启动工作者,等待有请求时才启动。可以通过调用线程池接口提前启动核心工作数个工作者线程,也可以启动业务期望的多个工作者线程ã€?/span>

1.       工作è€?/span>workers数量低于核心工作者数corePoolSize时会优先创建一个工作è€?/span>worker处理job,处理成功则返回ã€?/span>

2.       工作è€?/span>workers数量高于核心工作者数时会优先æŠ?/span>job放入到待处理队列,放入队列成功时处理结束ã€?/span>

3.       步骤2中入队失败会识别工作者数是否还小于最大工作者数maximumPoolsize,小于的话也会新创建一个工作è€?/span>worker处理jobã€?/span>

4.       拒绝处理

Jetty6

1.       同样支持设置多个参数ï¼?/span>

 _spawnOrShrinkAt:扩å®?/span>/缩容阀å€?/span>

 _minThreads:最小工作者数

 _maxThreads:最大工作者数

 _maxIdleTimeMs:闲置工作者最大闲置超时时é—?/span>

2.       初始化后直接启动_minThreadsä¸?/span>工作者线ç¨?/span>

1.       查找闲置的工作è€?/span>worker,找到则派发jobã€?/span>

2.       没有闲置的工作者,å°?/span>job存入待处理数组ã€?/span>

3.       当识别到数组中待处理job超过扩容阀值参数时,扩容增加工作者处ç?/span>job

4.       否则不处ç?/span>

Jetty8

1.       配置参数类似Jetty6,去除了_spawnOrShrinkAt阀值参数ã€?/span>

2.       初始化后直接启动_minThreadsä¸?/span>工作者线ç¨?/span>

非常简单,直接将待处理job入队ã€?/span>

Tomcat

1.       基于JDK线程池的构造方æ³?/span>

2.       来请求时启动工作è€?/span>

处理方法复用JDK的,但是在开始提交前扩展äº?/span>JDK的功能,实现了可以统计提交数submittedCount的能åŠ?/span>

l  线程池工作è€?/span>worker的增减机制举例:

实现

工作者增加算æ³?/span>

工作者减少算æ³?/span>

JDK

1.       待处ç?/span>job来时,工作è€?/span>workers数量低于核心工作者数corePoolSize时ã€?/span>

2.       待处ç?/span>job来时ï¼?/span>workers数超过核心数小于最大工作者数且入待处ç?/span>队列失败场景ã€?/span>

3.       业务调用线程池的更新核心工作者数接口时,若发现扩容,会增加工作者数ã€?/span>

1.       待处理任务队列里没有job并且工作è€?/span>workers数量超过了核心工作者数corePoolSizeã€?/span>

2.       待处理任务队列里没有job并且允许工作者数量小于核心工作者参数为true,此场景会至少保留一个工作者线程ã€?/span>

Jetty6

1.       启动线程池时会启åŠ?/span>_minThreadsä¸?/span>工作者线ç¨?/span>

2.       待处理的job数量高于了阀值参数且工作者数没有达到最大值时会增加工作者ã€?/span>

3.       调用线程池接å?/span>setMinThreads更新最小工作者数时会根据需要增加工作者ã€?/span>

如下三个条件同时满足时会减少工作者:

1.       待处理任务数组中没有待处ç?/span>job

2.       工作è€?/span>workers数量超过äº?span class=GramE>最小工作者数_minThreads

3.       闲置工作è€?span class=GramE>线程数高于了阀值参æ•?/span>

Jetty8

1.       启动线程池时启动最小工作者参æ•?span class=GramE>ä¸?/span>工作者线ç¨?/span>

2.       已经没有闲置工作者或者闲置工作者的数量已经小于待处理的job的总数

3.       调用线程池接å?/span>setMinThreads更新最小工作者数æ—?/span>

如下三个条件同时满足时会减少工作者:

1.       待处理任务队列里没有待处理的job

2.       工作è€?/span>workers总数超过äº?span class=GramE>最小工作者参数配ç½?/span>_minThreads

3.       工作者线程的闲置时间超时

Tomcat

å?/span>JDK增加工作者算æ³?/span>

复用JDK减少算法,同时定制扩展延迟参数,超过参数时,直接抛出异常到外面来终止线程池工作者ã€?/span>

 

5.       小结

对比几种线程池实现,JDK的实现是最为灵活、功能最强且扩展性最好的ï¼?/span>Tomcat即基äº?/span>JDK线程池功èƒ?/span>扩展实现,复用原有业务的同时扩充了自己的业务ã€?/span>Jetty6是完全自己定制的线程池业务,耦合线程池众å¤?/span>复杂的业务逻辑åˆ?span class=GramE>线程池类里面,逻辑相对最为复杂,扩展性也非常差ã€?/span>Jetty8相对Jetty6的实现简化了很多,其中利用了JDK中的同步容器和原子变量,同时实现方式也越来越接近JDKã€?/span>

 

6.       参考源ç ?/span>

l  JDK源码类:java.util.concurrent.ThreadPoolExecutor

l  Jetty6源码类:org.mortbay.thread.QueuedThreadPool

l  Jetty8源码类:org.eclipse.jetty.util.thread.QueuedThreadPool

l  Tomcat源码类:org.apache.tomcat.util.threads.ThreadPoolExecutor