Java线程池
线程池
什么是 Executor 框架?
Executor 框架,是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架。
无限制的创建线程,会引起应用程序内存溢出。所以创建一个线程池是个更好的的解决方案,因为可以限制线程的数量并且可以回收再利用这些线程。利用 Executor 框架,可以非常方便的创建一个线程池。
🦅 为什么使用 Executor 框架?
- 每次执行任务创建线程 new Thread() 比较消耗性能,创建一个线程是比较耗时、耗资源的。
- 调用 new Thread() 创建的线程缺乏管理,被称为野线程,而且可以无限制的创建,线程之间的相互竞争会导致过多占用系统资源而导致系统瘫痪,还有线程之间的频繁交替也会消耗很多系统资源。
- 接使用 new Thread() 启动的线程不利于扩展,比如定时执行、定期执行、定时定期执行、线程中断等都不便实现。
🦅 在 Java 中 Executor 和 Executors 的区别?
- Executors 是 Executor 的工具类,不同方法按照我们的需求创建了不同的线程池,来满足业务的需求。
- Executor 接口对象,能执行我们的线程任务。
- ExecutorService 接口,继承了 Executor 接口,并进行了扩展,提供了更多的方法我们能获得任务执行的状态并且可以获取任务的返回值。
- 使用 ThreadPoolExecutor ,可以创建自定义线程池。
- Future 表示异步计算的结果,他提供了检查计算是否完成的方法,以等待计算的完成,并可以使用 #get() 方法,获取计算的结果。
- ExecutorService 接口,继承了 Executor 接口,并进行了扩展,提供了更多的方法我们能获得任务执行的状态并且可以获取任务的返回值。
讲讲线程池的实现原理
《Java并发编程:线程池的使用》《【死磕 Java 并发】—– J.U.C 之线程池:线程池的基础架构》《【死磕 Java 并发】—– J.U.C 之线程池:ThreadPoolExecutor》《【死磕 Java 并发】—– J.U.C 之线程池:ScheduledThreadPoolExecutor》
创建线程池的几种方式?
Java 类库提供一个灵活的线程池以及一些有用的默认配置,我们可以通过Executors 的静态方法来创建线程池。
Executors 创建的线程池,分成普通任务线程池,和定时任务线程池。
- 普通任务线程池
- 1、#newFixedThreadPool(int nThreads) 方法,创建一个固定长度的线程池。
- 每当提交一个任务就创建一个线程,直到达到线程池的最大数量,这时线程规模将不再变化。
- 当线程发生未预期的错误而结束时,线程池会补充一个新的线程。
- 2、#newCachedThreadPool() 方法,创建一个可缓存的线程池。
- 如果线程池的规模超过了处理需求,将自动回收空闲线程。
- 当需求增加时,则可以自动添加新线程。线程池的规模不存在任何限制。
- 3、#newSingleThreadExecutor() 方法,创建一个单线程的线程池。
- 它创建单个工作线程来执行任务,如果这个线程异常结束,会创建一个新的来替代它。
- 它的特点是,能确保依照任务在队列中的顺序来串行执行。
- 1、#newFixedThreadPool(int nThreads) 方法,创建一个固定长度的线程池。
- 定时任务线程池
- 4、#newScheduledThreadPool(int corePoolSize) 方法,创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似 Timer 。
- 5、#newSingleThreadExecutor() 方法,创建了一个固定长度为 1 的线程池,而且以延迟或定时的方式来执行任务,类似 Timer 。
🦅 如何使用 ThreadPoolExecutor 创建线程池?
Executors 提供了创建线程池的常用模板,实际场景下,我们可能需要自动以更灵活的线程池,此时就需要使用 ThreadPoolExecutor 类。
1
// ThreadPoolExecutor.javapublic ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}
- corePoolSize 参数,核心线程数大小,当线程数 < corePoolSize ,会创建线程执行任务。
- maximumPoolSize 参数,最大线程数, 当线程数 >= corePoolSize 的时候,会把任务放入 workQueue 队列中。
- keepAliveTime 参数,保持存活时间,当线程数大于 corePoolSize 的空闲线程能保持的最大时间。
- unit 参数,时间单位。
- workQueue 参数,保存任务的阻塞队列。
- handler 参数,超过阻塞队列的大小时,使用的拒绝策略。
- threadFactory 参数,创建线程的工厂。
🦅 ThreadPoolExecutor 有哪些拒绝策略?
ThreadPoolExecutor 默认有四个拒绝策略:
- ThreadPoolExecutor.AbortPolicy() ,直接抛出异常 RejectedExecutionException 。
- ThreadPoolExecutor.CallerRunsPolicy() ,直接调用 run 方法并且阻塞执行。
- ThreadPoolExecutor.DiscardPolicy() ,直接丢弃后来的任务。
- ThreadPoolExecutor.DiscardOldestPolicy() ,丢弃在队列中队首的任务。
如果我们有需要,可以自己实现 RejectedExecutionHandler 接口,实现自定义的拒绝逻辑。当然,绝大多数是不需要的。
线程池的关闭方式有几种?
ThreadPoolExecutor 提供了两个方法,用于线程池的关闭,分别是:
- #shutdown() 方法,不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。
- #shutdownNow() 方法,立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。
实际场景下,一般会结合这两个方法,一起实现线程池的优雅关闭。示例代码如下:
1
void shutdownAndAwaitTermination(ExecutorService pool) {pool.shutdown(); // Disable new tasks from being submittedtry { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); }}} catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt();}}
Java 线程池大小为何会大多被设置成 CPU 核心数 +1 ?
详细的可以看看 《如何合理地估算线程池大小?》 。如下是简单的总结和整理:
一般说来,大家认为线程池的大小经验值应该这样设置:(其中 N 为CPU的个数)
如果是 CPU 密集型应用,则线程池大小设置为 N+1
因为 CPU 密集型任务使得 CPU 使用率很高,若开过多的线程数,只能增加上下文切换的次数,因此会带来额外的开销。
如果是 IO 密集型应用,则线程池大小设置为 2N+1
IO密 集型任务 CPU 使用率并不高,因此可以让 CPU 在等待 IO 的时候去处理别的任务,充分利用 CPU 时间。
如果是混合型应用,那么分别创建线程池
可以将任务分成 IO 密集型和 CPU 密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。
因为如果划分之后两个任务执行时间相差甚远,那么先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。
如果一台服务器上只部署这一个应用并且只有这一个线程池,那么这种估算或许合理,具体还需自行测试验证。
但是,IO 优化中,这样的估算公式可能更适合:最佳线程数目 = ((线程等待时间 + 线程 CPU 时间)/ 线程 CPU 时间 )* CPU 数目因为很显然,线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
下面举个例子:比如平均每个线程 CPU 运行时间为 0.5s ,而线程等待时间(非 CPU 运行时间,比如 IO)为 1.5s ,CPU 核心数为 8 。那么根据上面这个公式估算得到:((0.5 + 1.5) / 0.5) * 8 = 32。这个公式进一步转化为:最佳线程数目 = (线程等待时间与线程 CPU 时间之比 + 1)* CPU数目。
🦅 线程池容量的动态调整?
ThreadPoolExecutor 提供了动态调整线程池容量大小的方法:
- setCorePoolSize:设置核心池大小。
- setMaximumPoolSize:设置线程池最大能创建的线程数目大小。
当上述参数从小变大时,ThreadPoolExecutor 进行线程赋值,还可能立即创建新的线程来执行任务。
什么是 Callable、Future、FutureTask ?
1)Callable
Callable 接口,类似于 Runnable ,从名字就可以看出来了,但是Runnable 不会返回结果,并且无法抛出返回结果的异常,而 Callable 功能更强大一些,被线程执行后,可以返回值,这个返回值可以被 Future 拿到,也就是说,Future 可以拿到异步执行任务的返回值。
简单来说,可以认为是带有回调的 Runnable 。
2)Future
Future 接口,表示异步任务,是还没有完成的任务给出的未来结果。所以说 Callable 用于产生结果,Future 用于获取结果。
3)FutureTask
在 Java 并发程序中,FutureTask 表示一个可以取消的异步运算。
- 它有启动和取消运算、查询运算是否完成和取回运算结果等方法。只有当运算完成的时候结果才能取回,如果运算尚未完成 get 方法将会阻塞。
- 一个 FutureTask 对象,可以对调用了 Callable 和 Runnable 的对象进行包装,由于 FutureTask 也是继承了 Runnable 接口,所以它可以提交给 Executor 来执行。
线程池执行任务的过程?
刚创建时,里面没有线程调用 execute() 方法,添加任务时:
- 如果正在运行的线程数量小于核心参数 corePoolSize ,继续创建线程运行这个任务
- 否则,如果正在运行的线程数量大于或等于 corePoolSize ,将任务加入到阻塞队列中。
- 否则,如果队列已满,同时正在运行的线程数量小于核心参数 maximumPoolSize ,继续创建线程运行这个任务。
- 否则,如果队列已满,同时正在运行的线程数量大于或等于 maximumPoolSize ,根据设置的拒绝策略处理。
- 完成一个任务,继续取下一个任务处理。
- 没有任务继续处理,线程被中断或者线程池被关闭时,线程退出执行,如果线程池被关闭,线程结束。
- 否则,判断线程池正在运行的线程数量是否大于核心线程数,如果是,线程结束,否则线程阻塞。因此线程池任务全部执行完成后,继续留存的线程池大小为 corePoolSize 。
🦅 线程池中 submit 和 execute 方法有什么区别?
两个方法都可以向线程池提交任务。
- #execute(…) 方法,返回类型是 void ,它定义在 Executor 接口中。
- #submit(…) 方法,可以返回持有计算结果的 Future 对象,它定义在 ExecutorService 接口中,它扩展了 Executor 接口,其它线程池类像 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 都有这些方法。
🦅 如果你提交任务时,线程池队列已满,这时会发生什么?
艿艿:重点在于线程池的队列是有界还是无界的。
- 如果你使用的 LinkedBlockingQueue,也就是无界队列的话,没关系,继续添加任务到阻塞队列中等待执行,因为 LinkedBlockingQueue 可以近乎认为是一个无穷大的队列,可以无限存放任务。
- 如果你使用的是有界队列比方说 ArrayBlockingQueue 的话,任务首先会被添加到 ArrayBlockingQueue 中,ArrayBlockingQueue满了,则会使用拒绝策略 RejectedExecutionHandler 处理满了的任务,默认是 AbortPolicy 。
Fork/Join 框架是什么?
艿艿:这是,可能了解的人不多,我也是。大体知道就好。
Oracle 的官方给出的定义是:Fork/Join 框架是一个实现了 ExecutorService接口 的多线程处理器。它可以把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率。
我们再通过 Fork 和 Join 这两个单词来理解下 Fork/Join 框架。
- Fork 就是把一个大任务切分为若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。
- 比如计算 1+2+…+10000 ,可以分割成 10 个子任务,每个子任务分别对 1000 个数进行求和,最终汇总这 10 个子任务的结果。
感兴趣的胖友,可以看看如下文章:
- 《JDK 7 中的 Fork/Join 模式》
- 《聊聊并发(八) —— Fork/Join 框架介绍》
如何让一段程序并发的执行,并最终汇总结果?
- 1、CountDownLatch:允许一个或者多个线程等待前面的一个或多个线程完成,构造一个 CountDownLatch 时指定需要 countDown 的点的数量,每完成一点就 countDown 一下。当所有点都完成,CountDownLatch 的 #await() 就解除阻塞。
- 2、CyclicBarrier:可循环使用的 Barrier ,它的作用是让一组线程到达一个 Barrier 后阻塞,直到所有线程都到达 Barrier 后才能继续执行。
CountDownLatch 的计数值只能使用一次,CyclicBarrier 可以通过使用 reset 重置,还可以指定到达栅栏后优先执行的任务。
- 3、Fork/Join 框架,fork 把大任务分解成多个小任务,然后汇总多个小任务的结果得到最终结果。使用一个双端队列,当线程空闲时从双端队列的另一端领取任务。
