本文最后更新于:2022年7月19日 下午
概览 :Java线程池
execute与submit的区别 execute只能提交runnable类型任务,无返回值
submit可以提交runnable以及callable类型任务,有返回值Future<>
返回
使用
Futere<?>
submit(runnable);
Future<T>
submit(runnable,T);//T可以是任意对象,eg”任务完成”
Future<T>
submit(callable);
callable任务的阻塞获取与定时获取 对于返回的Futute<>
get():阻塞式的,直到等到结果
get(long,TimeUnit):定时获取,超过时间就抛出异常。
任务取消 - cancle 任务
1 2 3 4 5 6 public class ResultTask implements Callable <Integer > { @Override public Integer call () throws Exception { return 1 +1 ; } }
1.取消未执行的任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class MyThreadPoolTest { public static void main (String[] args) { ExecutorService threadPool = Executors.newSingleThreadExecutor(); ResultTask task1 = new ResultTask(); ResultTask task2 = new ResultTask(); Future<Integer> future1 = threadPool.submit(task1); Future<Integer> future2 = threadPool.submit(task2); System.out.println("任务是否完成" + future2.isDone()); boolean isCancle = future2.cancel(false ); System.out.println("任务是否取消成功" + isCancle); System.out.println("任务是否取消成功" + future2.isCancelled()); try { Integer integer = future2.get(); System.out.println("任务2结果" + integer); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
结果:任务取消成功,future获取不到结果出现异常!
1 2 3 4 5 6 7 任务是否完成false 任务是否取消成功true 任务是否取消成功true Exception in thread "main " java .util .concurrent .CancellationException at java .util .concurrent .FutureTask .report (FutureTask .java :121) at java .util .concurrent .FutureTask .get (FutureTask .java :192) at threads .MyThreadPoolTest .main (MyThreadPoolTest .java :29)
2.取消已经执行完成的任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class MyThreadPoolTest { public static void main (String[] args) { ExecutorService threadPool = Executors.newSingleThreadExecutor(); ResultTask task1 = new ResultTask(); Future<Integer> future1 = threadPool.submit(task1); try { Integer integer = future1.get(); System.out.println("任务1结果" + integer); System.out.println("任务是否完成" + future1.isDone()); boolean isCancle = future1.cancel(false ); System.out.println("任务是否取消成功" + isCancle); System.out.println("任务是否取消成功" + future1.isCancelled()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
结果:已经完成的任务是无法取消的。
1 2 3 4 任务1 结果2 任务是否完成true 任务是否取消成功false 任务是否取消成功false
3.取消正在执行的任务
cancle(boolean):函数参数,true表示尝试中断正在运行的线程:调用interrupt()
任务改动:
1 2 3 4 5 6 7 8 9 10 11 12 public class ResultTask implements Callable <Integer > { @Override public Integer call () throws Exception { int i = 0 ; while (!Thread.interrupted()){ i++; } System.out.println(i); return 1 +1 ; } }
false的情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class MyThreadPoolTest { public static void main (String[] args) { ExecutorService threadPool = Executors.newSingleThreadExecutor(); ResultTask task1 = new ResultTask(); Future<Integer> future1 = threadPool.submit(task1); System.out.println("任务是否完成" + future1.isDone()); boolean isCancle = future1.cancel(false ); System.out.println("任务是否取消成功" + isCancle); System.out.println("任务是否取消成功" + future1.isCancelled()); try { Integer integer = future1.get(); System.out.println("任务1结果" + integer); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
结果:false时,任务显示取消成功,调用get()会触发异常,但是实际上任务还是会执行完。
1 2 3 4 5 6 7 8 任务是否完成false 任务是否取消成功true 任务是否取消成功true Exception in thread "main " java .util .concurrent .CancellationException at java .util .concurrent .FutureTask .report (FutureTask .java :121) at java .util .concurrent .FutureTask .get (FutureTask .java :192) at threads .MyThreadPoolTest .main (MyThreadPoolTest .java :24) 程序没有结束运行
将传参改为true
执行结果:true时,任务显示取消成功,调用get()会触发异常,但是实际上任务是否继续执行取决于是否响应中断。
1 2 3 4 5 6 7 8 9 任务是否完成false 任务是否取消成功true 249 任务是否取消成功true Exception in thread "main " java .util .concurrent .CancellationException at java .util .concurrent .FutureTask .report (FutureTask .java :121) at java .util .concurrent .FutureTask .get (FutureTask .java :192) at threads .MyThreadPoolTest .main (MyThreadPoolTest .java :24) 程序自动结束运行
线程池执行任务流程
源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
增加线程 addWorker 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
标注1:当线程池是SHUTDOWN以上的状态时不能增加线程;当线程池是SHUTDOWN状态时,不能因为新增task而增加线程,但可以因为queue不为空而增加线程。
标注2:rs < SHUTDOWN
就是RUNNING状态,这里表示只能在RUNNING状态 以及SHUTDOWN且无新增任务 时增加线程数。
工作线程如何工作 工作线程是死循环,自动getTask()来获取任务并执行的,执行过程中不断判断中断状态 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
获取任务:getTask()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
线程销毁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
空闲线程自动销毁:
当getTask()中从queue获取超时就会返回null,然后进入processWorkerExit()来进行空闲线程的销毁。
线程池疑问 1.核心线程是一开始就创建出来的吗?还是逐步创建出来的?逐步创建出来的。
2.核心线程与非核心线程的区别? - 个人感觉是没有区别的,被销毁的话哪个都有可能。
3.创建非核心线程的时机?当核心线程满了,并且任务队列也满的时候才会添加非核心线程。
callable与runnable如何转换的? 1 2 3 4 5 6 7 8 9 10 11 12 public <T> Future<T> submit (Callable<T> task) { if (task == null ) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }protected <T> RunnableFuture<T> newTaskFor (Callable<T> callable) { return new FutureTask<T>(callable); }
线程池与CPU核心数的关系 经验上:N为CPU核心数
如果是CPU密集型应用,线程池大小应该设置为N+1
如果是IO密集型应用,线程池大小设置为2N+1,eg:数据库数据交互、文件上传下载
+1的目的:即使当线程偶尔由于缺失故障或者其他原因而暂停的时候,这个额外的线程也能够确保CPU的时钟周期不会被浪费 。
参考链接:https://blog.csdn.net/u011436427/article/details/103744149
参考链接:
深入理解Java线程池:ThreadPoolExecutor - liuzhihu - 博客园 (cnblogs.com)
https://mp.weixin.qq.com/s/1F_omFouxE9UneSDIrdREg