线程池的介绍
Executor简介
Executor家族图谱
- Executor:一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command)
- ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法
- AbstractExecutorService:ExecutorService执行方法的默认实现
- ScheduledExecutorService:一个可定时调度任务的接口
- ScheduledThreadPoolExecutor:ScheduledExecutor的实现,一个可定时调度任务的线程池
- ThreadPoolExecutor:线程池,可以通过调用Executor以下静态工厂方法来创建线程池并返回一个ExecutorService对象
ThreadPoolExecutor构造函数的各个参数说明
1 | public ThreadPoolExecutor(int corePoolSize,//核心线程数,若运行的线程少于corePoolSize,会创建新的线程来执行新任务,即线程池中的其他线程都是空闲的 |
corePoolSize < 运行的线程数 < maximumPoolSize:仅当队列满时才创建新线程
corePoolSize = 运行的线程数 = maximumPoolSize:创建固定大小的线程池
当新任务提交到池中:
当前运行的线程数<corePoolSize,即有空闲线程,会创建线程来处理任务;
当前运行的线程数>corePoolSize,且<maximumPoolSize,且等待队列已满,会创建线程来处理任务。
若线程数>maximumPoolSize,新任务将会根据拒绝策略来处理
等待队列
三种通用的入队策略
- 直接传递:通过 SynchronousQueue 直接把任务传递给线程。如果当前没可用线程,尝试入队操作会失败,然后再创建一个新的线程。当处理可能具有内部依赖性的请求时,该策略会避免请求被锁定。直接传递通常需要无界的最大线程数(maximumPoolSize),避免拒绝新提交的任务。当任务持续到达的平均速度超过可处理的速度时,可能导致线程的无限增长。
- 无界队列:使用无界队列(如 LinkedBlockingQueue)作为等待队列,当所有的核心线程都在处理任务时, 新提交的任务都会进入队列等待。因此,不会有大于 corePoolSize 的线程会被创建(maximumPoolSize 也将失去作用)。这种策略适合每个任务都完全独立于其他任务的情况;例如网站服务器。这种类型的等待队列可以使瞬间爆发的高频请求变得平滑。当任务持续到达的平均速度超过可处理速度时,可能导致等待队列无限增长。
- 有界队列:当使用有限的最大线程数时,有界队列(如 ArrayBlockingQueue)可以防止资源耗尽,但是难以调整和控制。队列大小和线程池大小可以相互作用:使用大的队列和小的线程数可以减少CPU使用率、系统资源和上下文切换的开销,但是会导致吞吐量变低,如果任务频繁地阻塞(例如被I/O限制),系统就能为更多的线程调度执行时间。使用小的队列通常需要更多的线程数,这样可以最大化CPU使用率,但可能会需要更大的调度开销,从而降低吞吐量。
拒绝策略
当线程池已经关闭或达到饱和(最大线程和队列都已满)状态时,新提交的任务将会被拒绝。 ThreadPoolExecutor 定义了四种拒绝策略:
- AbortPolicy:默认策略,在需要拒绝任务时抛出RejectedExecutionException;
- CallerRunsPolicy:直接在 execute 方法的调用线程中运行被拒绝的任务,如果线程池已经关闭,任务将被丢弃;
- DiscardPolicy:直接丢弃任务;
- DiscardOldestPolicy:丢弃队列中等待时间最长的任务,并执行当前提交的任务,如果线程池已经关闭,任务将被丢弃。
我们也可以自定义拒绝策略,只需要实现 RejectedExecutionHandler; 需要注意的是,拒绝策略的运行需要指定线程池和队列的容量。
线程池的使用
Executors提供了一系列静态工厂方法用于创建各种线程池
newFixedThreadPool
创建可重用且固定线程数的线程池。若池中所有线程处于活动状态,那么新任务得在队列中等待,若池中有线程异常结束,则会创建新线程
1 | public static ExecutorService newFixedThreadPool(int nThreads){ |
例子:
1 | public class TestThreadPool{ |
newSingleThreadExecutor
创建一个单线程的Executor。若该线程因异常而结束则会新建一条线程
1 | public static ExecutorService newSingleThreadExecutor(){ |
例子:
1 | //抽到公共类中,供多个地方调用 |
newScheduledThreadPool
创建一个可延迟执行或定期执行的线程池
1 | public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize){ |
例子:使用newScheduledThreadPool模拟心跳机制
1 | public class HeartBeat{ |
newCachedThreadPool
创建可缓存的线程池。若线程池中线程在60秒未被使用就被移除,在执行新任务时,当线程池中有之前创建的可用线程就重用可用线程,否则新建一条线程
1 | public static ExecutorService newCachedThreadPool{ |
例子:
1 | public class ThreadPoolTest{ |
通过使用Executor可以很轻易的实现各种调优 管理 监视 记录日志和错误报告等待。
Executors各个方法的弊端和解决方案:
弊端
newFixedThreadPool和newSingleThreadExecutor的弊端
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM
newCachedThreadPool和newScheduledThreadPool的弊端
主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM
解决方案
不允许通过使用Executors去创建,而是通过ThreadPoolExecutor的方式(阿里编码规约)
通过ThreadPoolExecutor创建线程池
1 | import java.util.concurrent.ArrayBlockingQueue; |
Executor的生命周期
ExecutorService提供了管理Executor生命周期的的方法,包括了:运行、关闭、终止三种状态
ExecutorService在初始化时处于运行状态
shutdown()
等待提交的任务执行完成并不再接受新任务,在完成全部提交的任务后关闭shutdownNow()
强制终止所有运行中的任务并不再允许提交新任务
可将一个Runnable或Callable提交给ExecutorService的submit方法执行,最终返回一个Future用来获得任务的执行结果或取消任务
例子:
1 | public class CallableAndFuture{ |
**ExecutorCompletionService:**实现了CompletionService,将执行完成的任务放到阻塞队列中,通过take或poll方法来获得执行结果
例子:(启动10条线程,谁先执行完成就返回谁)
1 | public class CompletionServiceTest { |
通过Executor来设计应用程序可以简化开发过程,提高开发效率,并有助于实现并发,在开发中如果需要创建线程可优先考虑使用Executor
线程池源码的解读
带着问题看
①线程池的池子是哪个数据结构
②线程池构造方法的参数的含义
③FutureTask如何获取到结果,任务没完成就ft.get()是怎么阻塞的
④线程池提交runnable和callable是有什么区别和联系
⑤工作线程Worker是如何处理池子和阻塞队列的任务的
⑥ coreSize个线程数是如何保持住的
⑦ 线程池是如何进程保持在哪里的(除非你手动shutDown)
1 | public interface ExecutorService extends Executor { |
1 | public interface Executor { |