安卓-线程池及源码解读

线程池的介绍

Executor简介

Executor家族图谱

Executor家族图谱

  • Executor:一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command)
  • ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法
  • AbstractExecutorService:ExecutorService执行方法的默认实现
  • ScheduledExecutorService:一个可定时调度任务的接口
  • ScheduledThreadPoolExecutor:ScheduledExecutor的实现,一个可定时调度任务的线程池
  • ThreadPoolExecutor:线程池,可以通过调用Executor以下静态工厂方法来创建线程池并返回一个ExecutorService对象

ThreadPoolExecutor构造函数的各个参数说明

1
2
3
4
5
6
7
8
9
10
11
public ThreadPoolExecutor(int corePoolSize,//核心线程数,若运行的线程少于corePoolSize,会创建新的线程来执行新任务,即线程池中的其他线程都是空闲的
int maximumPoolSize,//最大线程数,可允许创建的线程数,corePoolSize和maximumPoolSize设置的边界自动调整池大小
long keepAliveTime,//若线程数多于corePoolSize,则这些等待工作的线程的空闲时间超过keepAliveTime时将被终止
TimeUnit unit,//keepAliveTime参数的时间单位
BlockingQueue<Runnable> workQueue,//保存任务的阻塞队列,与线程池的大小有关:
//当运行的线程数少于corePoolSize时,在有新任务时直接创建新线程来执行任务而无需再进队列
//当运行的线程数等于或多于corePoolSize,在有新任务添加时则选加入队列,不直接创建线程
//当队列满时,在有新任务时就创建新线程
ThreadFactory threadFactory,
RejectedExecutionHandler handler
)

corePoolSize < 运行的线程数 < maximumPoolSize:仅当队列满时才创建新线程
corePoolSize = 运行的线程数 = maximumPoolSize:创建固定大小的线程池

当新任务提交到池中:

当前运行的线程数<corePoolSize,即有空闲线程,会创建线程来处理任务;

当前运行的线程数>corePoolSize,且<maximumPoolSize,且等待队列已满,会创建线程来处理任务。

若线程数>maximumPoolSize,新任务将会根据拒绝策略来处理

等待队列

三种通用的入队策略

  1. 直接传递:通过 SynchronousQueue 直接把任务传递给线程。如果当前没可用线程,尝试入队操作会失败,然后再创建一个新的线程。当处理可能具有内部依赖性的请求时,该策略会避免请求被锁定。直接传递通常需要无界的最大线程数(maximumPoolSize),避免拒绝新提交的任务。当任务持续到达的平均速度超过可处理的速度时,可能导致线程的无限增长。
  2. 无界队列:使用无界队列(如 LinkedBlockingQueue)作为等待队列,当所有的核心线程都在处理任务时, 新提交的任务都会进入队列等待。因此,不会有大于 corePoolSize 的线程会被创建(maximumPoolSize 也将失去作用)。这种策略适合每个任务都完全独立于其他任务的情况;例如网站服务器。这种类型的等待队列可以使瞬间爆发的高频请求变得平滑。当任务持续到达的平均速度超过可处理速度时,可能导致等待队列无限增长。
  3. 有界队列:当使用有限的最大线程数时,有界队列(如 ArrayBlockingQueue)可以防止资源耗尽,但是难以调整和控制。队列大小和线程池大小可以相互作用:使用大的队列和小的线程数可以减少CPU使用率、系统资源和上下文切换的开销,但是会导致吞吐量变低,如果任务频繁地阻塞(例如被I/O限制),系统就能为更多的线程调度执行时间。使用小的队列通常需要更多的线程数,这样可以最大化CPU使用率,但可能会需要更大的调度开销,从而降低吞吐量。

拒绝策略

当线程池已经关闭或达到饱和(最大线程和队列都已满)状态时,新提交的任务将会被拒绝。 ThreadPoolExecutor 定义了四种拒绝策略:

  1. AbortPolicy:默认策略,在需要拒绝任务时抛出RejectedExecutionException;
  2. CallerRunsPolicy:直接在 execute 方法的调用线程中运行被拒绝的任务,如果线程池已经关闭,任务将被丢弃;
  3. DiscardPolicy:直接丢弃任务;
  4. DiscardOldestPolicy:丢弃队列中等待时间最长的任务,并执行当前提交的任务,如果线程池已经关闭,任务将被丢弃。

我们也可以自定义拒绝策略,只需要实现 RejectedExecutionHandler; 需要注意的是,拒绝策略的运行需要指定线程池和队列的容量

线程池的使用

Executors提供了一系列静态工厂方法用于创建各种线程池

newFixedThreadPool

创建可重用固定线程数的线程池。若池中所有线程处于活动状态,那么新任务得在队列中等待,若池中有线程异常结束,则会创建新线程

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
//使用一个基于FIFO排序得阻塞队列,在所有corePoolSize线程都忙得时候新任务将在队列中等待
new LinkedBlockingQueue<Runnable>());
}

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class TestThreadPool{
public static void main(String[] args) throws InterruptedException, ExecutionException{
//创建线程池
ExecutorService pool = Executors.newFixedThreadPool(5);

List<Future<Integer>> list = new ArrayList<Future<Integer>>();

for(int i=0; i<10; i++){
Future<Integer> future = pool.submit(new Callable<Integer>(){
public Integer call() throws Exception{
int sum = 0;
for(int i=0; i<=100; i++){
sum+=1;
}
return sum;
}
});
list.add(future);
}

pool.shutdown();
for(Future<Integer> future: list){
System.out.println(future.get());
}
}


/*for(int i=0; i<10; i++){
pool.submit(new Runnable(){
public void run(){
...
}
});
}
pool.shutdown();
*/
}

newSingleThreadExecutor

创建一个单线程的Executor。若该线程因异常而结束则会新建一条线程

1
2
3
4
5
public static ExecutorService newSingleThreadExecutor(){
return new FinalizableDelegatedExecutorService
//corePoolSize和maximumPoolSize都等于1,表示固定线程池大小为1
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISENDS, new LinkedBlockingQueue<Runnable>()));
}

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//抽到公共类中,供多个地方调用
public class Concurrents {
public static ExecutorService newSingleThreadExecutor(final String name) {
return Executors.newSingleThreadExecutor(new DefaultThreadFactory(name));
}
}

//使用
private ExecutorService scaleExecutor;
private void ensureMessageExecutor() {
if (scaleExecutor == null) {
scaleExecutor = Concurrents
.newSingleThreadExecutor("scaleExecutor");
}

scaleExecutor.submit(new Runnable() {
@Override
public void run(){
...
}
});

//关闭线程池
scaleExecutor.shutdown();
}

newScheduledThreadPool

创建一个可延迟执行或定期执行的线程池

1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize){
return new ScheduledThreadPoolExecutor(corePoolSize);
}

例子:使用newScheduledThreadPool模拟心跳机制

1
2
3
4
5
6
7
8
9
10
11
public class HeartBeat{
public static void main(String[] args){
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
Runnable task = new Runnable(){
public void run(){
System.out.println("HeartBeat.....")
}
};
executor.scheduleAtFixedRate(task, 5, 3, TimeUnit.SECONDS);//5秒后第一次执行,之后每隔3秒执行一次
}
}

newCachedThreadPool

创建可缓存的线程池。若线程池中线程在60秒未被使用就被移除,在执行新任务时,当线程池中有之前创建的可用线程就重用可用线程,否则新建一条线程

1
2
3
4
5
public static ExecutorService newCachedThreadPool{
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
//使用同步队列,将任务直接提交给线程
new SynchronousQueue<Runnable>());
}

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ThreadPoolTest{
public static void main(String[] args){
ExecutorService threadPool = Executors.newCachedThreadPool();//线程池里面的线程数会动态变化,并可在线程线被移除前使用
for(int i=1; i<=3; i++){
final int task = i;//10个任务
//TimeUnit.SECONDS.sleep(1L);
threadPool.execute(new Runnable(){//接受一个Runnable实例
public void run(){
System.out.println("线程名字: " + Thread.currentThread().getName() + " 任务名为: " + task);
}
});
}
}
}


输出:(为每个任务新建一条线程,共创建了3条线程)
线程名字: pool-1-thread-1 任务名为: 1
线程名字: pool-1-thread-2 任务名为: 2
线程名字: pool-1-thread-3 任务名为: 3

去掉第6行的注释其输出如下:(始终重复利用一条线程,因为newCachedThreadPool能重用可用线程)
线程名字: pool-1-thread-1 任务名为: 1
线程名字: pool-1-thread-1 任务名为: 2
线程名字: pool-1-thread-1 任务名为: 3

通过使用Executor可以很轻易的实现各种调优 管理 监视 记录日志和错误报告等待。

Executors各个方法的弊端和解决方案:

弊端

newFixedThreadPool和newSingleThreadExecutor的弊端

主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM

newCachedThreadPool和newScheduledThreadPool的弊端

主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM

解决方案

不允许通过使用Executors去创建,而是通过ThreadPoolExecutor的方式(阿里编码规约)

通过ThreadPoolExecutor创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 测试ThreadPoolExecutor对线程的执行顺序
*/
public class ThreadPoolSerialTest{
public static void main(String[] args){
//核心线程数
int corePoolSize = 3;
//最大线程数
int maximumPoolSize = 6;
//超过 corePoolSize 线程数量的线程最大空闲时间
long keepAliveTime = 2;
//以秒为时间单位
TimeUnit unit = TimeUnit.SECONDS;
//创建工作队列,用于存放提交的等待执行任务
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);
ThreadPoolExecutor threadPoolExecutor = null;
try{
//创建线程池
threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
new ThreadPoolExecutor.AbortPolicy());
//循环提交任务
for(int i = 0; i < 8; i++){
//提交任务的索引
final int index = (i + 1);
threadPoolExecutor.submit(()->{
//线程打印输出
System.out.println("大家好,我是线程:" + index);
try{
//模拟线程执行时间 10s
Thread.sleep(10000L);
} catch (InterruptedException e){
e.printStackTrace();
}
});
//每个任务提交后休眠500ms再提交下一个任务,用于保证提交顺序
Thread.sleep(500L);
}
} catch (InterruptedException e){
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}

Executor的生命周期

ExecutorService提供了管理Executor生命周期的的方法,包括了:运行、关闭、终止三种状态

  • ExecutorService在初始化时处于运行状态

  • shutdown()等待提交的任务执行完成并不再接受新任务,在完成全部提交的任务后关闭

  • shutdownNow()强制终止所有运行中的任务并不再允许提交新任务

可将一个Runnable或Callable提交给ExecutorService的submit方法执行,最终返回一个Future用来获得任务的执行结果或取消任务

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class CallableAndFuture{
public static void main(String[] args) throws ExecutionException, InterruptedException{
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new Callable<String>{//接受一个callable实例
return "MOBIN";
});
System.out.println("任务的执行结果:" + future.get());
}
}


输出:
任务的执行结果:MOBIN

**ExecutorCompletionService:**实现了CompletionService,将执行完成的任务放到阻塞队列中,通过take或poll方法来获得执行结果

例子:(启动10条线程,谁先执行完成就返回谁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CompletionServiceTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(10); //创建含10.条线程的线程池
CompletionService completionService = new ExecutorCompletionService(executor);
for (int i =1; i <=10; i ++) {
final int result = i;
completionService.submit(new Callable() {
public Object call() throws Exception {
Thread.sleep(new Random().nextInt(5000)); //让当前线程随机休眠一段时间
return result;
}
});
}
System.out.println(completionService.take().get()); //获取执行结果
}
}



输出结果可能每次都不同(在110之间)
3

通过Executor来设计应用程序可以简化开发过程,提高开发效率,并有助于实现并发,在开发中如果需要创建线程可优先考虑使用Executor

线程池源码的解读

带着问题看

①线程池的池子是哪个数据结构
②线程池构造方法的参数的含义
③FutureTask如何获取到结果,任务没完成就ft.get()是怎么阻塞的
④线程池提交runnable和callable是有什么区别和联系
⑤工作线程Worker是如何处理池子和阻塞队列的任务的
⑥ coreSize个线程数是如何保持住的
⑦ 线程池是如何进程保持在哪里的(除非你手动shutDown)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface ExecutorService extends Executor {
//顺次地关闭ExecutorService,停止接收新的任务,等待所有已经提交的任务执行完毕之后,关闭ExecutorService
void shutdown();
//阻止等待任务启动并试图停止当前正在执行的任务,停止接收新的任务,返回处于等待的任务列表
List<Runnable> shutdownNow();
//判断线程池是否已经关闭
boolean isShutdown();
//如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。
boolean isTerminated();
//等待(阻塞)直到关闭或最长等待时间或发生中断,timeout - 最长等待时间 ,unit - timeout 参数的时间单位 如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false
boolean awaitTermination(long timeout, TimeUnit unit)
//提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。
<T> Future<T> submit(Callable<T> task);
//提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。
<T> Future<T> submit(Runnable task, T result);
//提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null
Future<?> submit(Runnable task);
//执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
//执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
//执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消尚未完成的任务。
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
1
2
3
4
public interface Executor {
//执行已提交的 Runnable 任务对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法
void execute(Runnable command);
}