Java基础(十一)—— 线程池

本文最后更新于:2022年7月19日 下午

概览:Java线程池

execute与submit的区别

execute只能提交runnable类型任务,无返回值

submit可以提交runnable以及callable类型任务,有返回值Future<>

返回 使用
Futere<?> submit(runnable);
Future<T> submit(runnable,T);//T可以是任意对象,eg”任务完成”
Future<T> submit(callable);

callable任务的阻塞获取与定时获取

对于返回的Futute<>

  • get():阻塞式的,直到等到结果
  • get(long,TimeUnit):定时获取,超过时间就抛出异常。

任务取消 - cancle

任务

1
2
3
4
5
6
public class ResultTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return 1+1;
}
}

1.取消未执行的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class MyThreadPoolTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
ResultTask task1 = new ResultTask();
ResultTask task2 = new ResultTask();

Future<Integer> future1 = threadPool.submit(task1);
Future<Integer> future2 = threadPool.submit(task2);

// 向只有一个线程的下称池提交两个任务,第二个任务就会存放任务队列中暂不执行
System.out.println("任务是否完成"+ future2.isDone());
// 取消任务
boolean isCancle = future2.cancel(false);
System.out.println("任务是否取消成功" + isCancle);
System.out.println("任务是否取消成功" + future2.isCancelled());

// 获取任务结果
try {
Integer integer = future2.get();
System.out.println("任务2结果" + integer);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}

结果:任务取消成功,future获取不到结果出现异常!

1
2
3
4
5
6
7
任务是否完成false
任务是否取消成功true
任务是否取消成功true
Exception in thread "main" java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at threads.MyThreadPoolTest.main(MyThreadPoolTest.java:29)

2.取消已经执行完成的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MyThreadPoolTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
ResultTask task1 = new ResultTask();

Future<Integer> future1 = threadPool.submit(task1);
// 获取任务结果
try {
Integer integer = future1.get();
System.out.println("任务1结果" + integer);
System.out.println("任务是否完成"+ future1.isDone());
// 取消任务
boolean isCancle = future1.cancel(false);
System.out.println("任务是否取消成功" + isCancle);
System.out.println("任务是否取消成功" + future1.isCancelled());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}

结果:已经完成的任务是无法取消的。

1
2
3
4
任务1结果2
任务是否完成true
任务是否取消成功false
任务是否取消成功false

3.取消正在执行的任务

  • cancle(boolean):函数参数,true表示尝试中断正在运行的线程:调用interrupt()

任务改动:

1
2
3
4
5
6
7
8
9
10
11
12
public class ResultTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int i = 0;
// 线程没被中断时递增
while(!Thread.interrupted()){
i++;
}
System.out.println(i);
return 1+1;
}
}

false的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MyThreadPoolTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
ResultTask task1 = new ResultTask();

Future<Integer> future1 = threadPool.submit(task1);
System.out.println("任务是否完成"+ future1.isDone());
// 取消任务
boolean isCancle = future1.cancel(false);
System.out.println("任务是否取消成功" + isCancle);
System.out.println("任务是否取消成功" + future1.isCancelled());
// 获取任务结果
try {
Integer integer = future1.get();
System.out.println("任务1结果" + integer);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}

结果:false时,任务显示取消成功,调用get()会触发异常,但是实际上任务还是会执行完。

1
2
3
4
5
6
7
8
任务是否完成false
任务是否取消成功true
任务是否取消成功true
Exception in thread "main" java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at threads.MyThreadPoolTest.main(MyThreadPoolTest.java:24)
程序没有结束运行

将传参改为true

执行结果:true时,任务显示取消成功,调用get()会触发异常,但是实际上任务是否继续执行取决于是否响应中断。

1
2
3
4
5
6
7
8
9
任务是否完成false
任务是否取消成功true
249
任务是否取消成功true
Exception in thread "main" java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at threads.MyThreadPoolTest.main(MyThreadPoolTest.java:24)
程序自动结束运行

线程池执行任务流程

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果核心线程数没有满的时候,添加核心线程并执行任务
if (workerCountOf(c) < corePoolSize) {
// 成功时则返回
if (addWorker(command, true))
return;
c = ctl.get();
}

// 线程池还在运行并且能添加进任务队列时
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次判断,如果不再运行就从队列中移除任务,拒绝任务
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池中没有可用线程时,必须有线程来执行任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 线程池已满,尝试添加非核心线程执行任务
else if (!addWorker(command, false))
reject(command);
}

增加线程 addWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 标注1
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 标注2
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

标注1:当线程池是SHUTDOWN以上的状态时不能增加线程;当线程池是SHUTDOWN状态时,不能因为新增task而增加线程,但可以因为queue不为空而增加线程。

标注2:rs < SHUTDOWN就是RUNNING状态,这里表示只能在RUNNING状态以及SHUTDOWN且无新增任务时增加线程数。

工作线程如何工作

工作线程是死循环,自动getTask()来获取任务并执行的,执行过程中不断判断中断状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 抛异常:completedAbruptly=true
// getTask()获取不到任务 completedAbruptly=false
// 线程销毁一节查看
processWorkerExit(w, completedAbruptly);
}
}

获取任务:getTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// allowCoreThreadTimeOut - 允许核心线程销毁 或者线程数 > 核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;


// getTask 的阻塞唤醒机制是依赖于blockQueue来实现的
// 判断是否需要销毁,需要销毁返回Null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 带超时的get/阻塞式的get
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

线程销毁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 抛异常:completedAbruptly=true
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 线程集合移除
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
// 状态为running 或者 shutdown
if (runStateLessThan(c, STOP)) {
// getTask()获取不到任务 completedAbruptly=false
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 补充一个工作线程,防止因为task异常导致线程减少
addWorker(null, false);
}
}

空闲线程自动销毁:

当getTask()中从queue获取超时就会返回null,然后进入processWorkerExit()来进行空闲线程的销毁。

线程池疑问

1.核心线程是一开始就创建出来的吗?还是逐步创建出来的?逐步创建出来的。

2.核心线程与非核心线程的区别? - 个人感觉是没有区别的,被销毁的话哪个都有可能。

3.创建非核心线程的时机?当核心线程满了,并且任务队列也满的时候才会添加非核心线程。

callable与runnable如何转换的?

1
2
3
4
5
6
7
8
9
10
11
12
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
// execute只接受runnable对象
execute(ftask);
return ftask;
}

// 通过Future将callable对象转为runnable对象
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

线程池与CPU核心数的关系

经验上:N为CPU核心数

  • 如果是CPU密集型应用,线程池大小应该设置为N+1
  • 如果是IO密集型应用,线程池大小设置为2N+1,eg:数据库数据交互、文件上传下载

+1的目的:即使当线程偶尔由于缺失故障或者其他原因而暂停的时候,这个额外的线程也能够确保CPU的时钟周期不会被浪费

参考链接:https://blog.csdn.net/u011436427/article/details/103744149

参考链接:

深入理解Java线程池:ThreadPoolExecutor - liuzhihu - 博客园 (cnblogs.com)

https://mp.weixin.qq.com/s/1F_omFouxE9UneSDIrdREg


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!