Java并发常见知识点总结(下)
一、ThreadLocal
ThreadLocal 有什么用?
ThreadLocal
是 JDK 提供的一个类,用于在多线程环境中为每个线程提供独立的、私有的变量副本。每个线程都可以有自己独立的 ThreadLocal
变量副本,避免了多个线程访问共享变量时出现的线程安全问题。其主要用途是确保每个线程都能拥有自己的数据副本,而无需担心其他线程的干扰。
工作原理
- 每个线程都有自己的副本:当你使用
ThreadLocal
时,每个线程会从ThreadLocal
中获取自己的副本。即使是同一个ThreadLocal
实例,多个线程访问时会得到各自独立的副本,互不干扰。 - 访问和修改本地副本:线程可以通过
get()
方法获取当前线程的本地副本,通过set()
方法修改该副本的值。
示例代码
public class ThreadLocalExample {
private static ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
public static void main(String[] args) {
Runnable task = () -> {
int value = threadLocal.get();
value += 1;
threadLocal.set(value);
System.out.println(Thread.currentThread().getName() + " Value: " + threadLocal.get());
};
Thread thread1 = new Thread(task, "Thread-1");
Thread thread2 = new Thread(task, "Thread-2");
thread1.start(); // 输出: Thread-1 Value: 1
thread2.start(); // 输出: Thread-2 Value: 1
}
}
使用场景
- 数据库连接:在每个线程中存储数据库连接对象,确保每个线程都有独立的连接,避免线程间共享数据库连接造成的冲突。
- 用户会话管理:每个线程可能对应一个用户请求,在
ThreadLocal
中存储当前用户的会话信息,避免多个请求之间的数据冲突。 - 线程池中任务共享状态:在多线程环境下为每个线程创建一个本地副本来保存一些临时数据,确保线程安全。
优点
- 避免线程安全问题:因为每个线程都有独立的副本,所以无需担心线程之间的竞争和同步问题。
- 性能优越:比起使用显式同步机制,
ThreadLocal
在处理线程私有数据时性能更高。
注意事项
- 内存泄漏:如果使用
ThreadLocal
时未清理线程的本地副本,可能会导致内存泄漏。尤其是在使用线程池时,线程对象会被复用,线程池中的线程持有的ThreadLocal
对象可能不会被及时清理。
ThreadLocal 原理
ThreadLocal
是 JDK 提供的一个类,用于在多线程环境中为每个线程提供独立的变量副本。每个线程拥有自己独立的 ThreadLocal
变量,而不会与其他线程共享,从而避免了多线程访问共享数据时的线程安全问题。具体来说,ThreadLocal
通过每个线程独立存储变量的方式,确保不同线程之间的值互不干扰。
核心原理
每个线程一个独立存储:
每个Thread
对象都有一个与之关联的ThreadLocalMap
,用于存储该线程局部变量。ThreadLocalMap
实际上是一个哈希表,ThreadLocal
对象作为 key,存储的值作为 value。这样每个线程就拥有了自己独立的变量副本。线程本地存储的获取与设置:
当线程调用ThreadLocal
的set(T value)
方法时,实际上是把数据存储到该线程的ThreadLocalMap
中,而ThreadLocal
作为 key 存储该值。当线程调用get()
方法时,会从该线程的ThreadLocalMap
中获取相应的值。线程关联的数据存储:
ThreadLocal
通过每个线程的ThreadLocalMap
来存储线程本地变量。每个线程在其生命周期内,ThreadLocalMap
是独立的且不会与其他线程共享。这样可以避免多线程竞争的情况。
内部实现
在每个 Thread
对象中,有两个关键字段:
threadLocals
:指向当前线程的ThreadLocalMap
。inheritableThreadLocals
:如果某个ThreadLocal
对象标记为可继承,那么它将通过该字段传递给子线程。
public class Thread implements Runnable {
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}
关键方法
set(T value)
:- 当线程调用
set()
方法时,会获取当前线程Thread.currentThread()
,然后通过getMap()
方法访问该线程的ThreadLocalMap
。如果该线程还没有ThreadLocalMap
,会调用createMap()
创建一个新的ThreadLocalMap
。
- 当线程调用
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
ThreadLocalMap
:ThreadLocalMap
是一个内部类,用于存储线程本地变量。它将ThreadLocal
对象作为 key,存储的值作为 value。在每个线程内部,只存在一个ThreadLocalMap
,因此该线程中的所有ThreadLocal
变量都共享这个ThreadLocalMap
。
static class ThreadLocalMap {
private Entry[] table;
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> key, Object value) {
super(key);
this.value = value;
}
}
}
- 内存管理:
ThreadLocalMap
中的每个Entry
是ThreadLocal
对象的弱引用,这意味着如果ThreadLocal
对象没有强引用,垃圾回收器可以回收它。当ThreadLocal
对象被回收时,ThreadLocalMap
会清除对应的 entry,避免内存泄漏。
线程本地存储的访问模式
- 每个线程都可以独立访问
ThreadLocal
对象,且每个线程访问的副本互不干扰。 - 当调用
set()
和get()
方法时,ThreadLocal
会通过Thread.currentThread()
获取当前线程,并且访问该线程的ThreadLocalMap
。
应用场景
避免线程安全问题:
ThreadLocal
在需要避免线程间共享数据的场景中非常有用,例如在处理用户会话、数据库连接等时,每个线程维护一个独立的资源副本。减少锁的使用:
在多线程环境中,使用ThreadLocal
可以避免使用同步锁来控制并发访问,因为每个线程有自己独立的数据副本。线程池中:
在线程池中,每个线程复用,但每个线程的ThreadLocal
变量不会被共享,因此非常适合用来存储线程级别的状态。
总结
ThreadLocal
为每个线程提供了独立的存储空间,避免了线程间的共享问题。ThreadLocalMap
存储了ThreadLocal
键值对,每个线程有自己独立的ThreadLocalMap
。- 它通过弱引用机制避免内存泄漏,确保在不再使用的
ThreadLocal
被垃圾回收后清理相关数据。
ThreadLocal 内存泄露问题
ThreadLocal
内存泄漏的根本问题源于其内部数据结构的设计和线程的生命周期管理。理解这一问题需要了解 ThreadLocalMap
的工作机制,尤其是其如何管理 ThreadLocal
实例和它们关联的值。
内存泄漏原因
ThreadLocalMap
的结构:
每个线程都有一个ThreadLocalMap
,它用来存储该线程的局部变量。ThreadLocalMap
内部存储的是ThreadLocal
对象和它对应的值。具体而言,ThreadLocalMap
的 key 是ThreadLocal
实例本身,而 value 是线程局部变量的值。ThreadLocal
的弱引用:ThreadLocalMap
中的ThreadLocal
键是一个弱引用 (WeakReference<ThreadLocal<?>>
)。这意味着当没有其他强引用指向ThreadLocal
实例时,它会被垃圾回收器回收。value
的强引用:ThreadLocalMap
中存储的值是强引用。当ThreadLocal
被垃圾回收时,其对应的value
仍然存在于ThreadLocalMap
中,并没有被自动清理。static class Entry extends WeakReference<ThreadLocal<?>> { Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } }
这意味着如果
ThreadLocal
实例失去强引用(例如,局部变量超出作用域或被显式置为null
),但线程仍然存活(例如,线程池中的线程),那么ThreadLocalMap
中相应的条目将不会被自动清理。结果,ThreadLocalMap
中的值仍然被强引用,导致内存泄漏。
如何避免内存泄漏
显式调用
remove()
:
为了避免ThreadLocal
引起的内存泄漏,最推荐的做法是在使用完ThreadLocal
后显式地调用remove()
方法。这会从当前线程的ThreadLocalMap
中移除对应的条目,确保即使ThreadLocal
被回收,其相关的值也能被清理。threadLocal.remove();
在一些长时间存活的线程中(如线程池中的线程),即使
ThreadLocal
不再被显式引用,也需要显式调用remove()
方法来避免内存泄漏。使用
try-finally
结构:
在线程池等线程复用的环境中,为了确保remove()
方法能被调用,推荐在使用ThreadLocal
时配合try-finally
块。这可以保证即使发生异常,ThreadLocal
对应的值也能在操作结束后被清理。try { // 使用 threadLocal 的逻辑 } finally { threadLocal.remove(); }
通过这种方式,确保线程池中的线程不会因未清理的
ThreadLocal
条目而导致内存泄漏。避免
ThreadLocal
作为static
变量时的忘记清理:
尽管ThreadLocal
可以作为static
变量使用,但如果没有在每次操作完后显式调用remove()
,就可能造成静态变量持有的ThreadLocal
实例不被回收,从而造成内存泄漏。
总结
ThreadLocal
内存泄漏的主要原因在于其 ThreadLocalMap
存储的 key 是弱引用,而 value 是强引用。线程在存活期间会继续持有这些 value
,即使 ThreadLocal
对象已经被回收。为了避免内存泄漏,在使用完 ThreadLocal
后应当显式调用 remove()
,尤其是在线程池等长时间存活的线程场景中。
跨线程传递 ThreadLocal
的值
在多线程应用中,ThreadLocal
用于保证每个线程有自己的独立副本,避免线程间的数据竞争。然而,ThreadLocal
的局限之一是它不支持在父线程与子线程之间共享值。这在异步任务或者线程池环境中尤为突出,因为新创建的线程无法直接访问父线程的 ThreadLocal
值。为了解决这个问题,提供了几种不同的方案。
1. InheritableThreadLocal
InheritableThreadLocal
是 JDK 1.2 引入的工具类,它继承自 ThreadLocal
,用于在父线程创建子线程时,将父线程的 ThreadLocal
值传递到子线程中。InheritableThreadLocal
解决了线程池场景中常见的问题,即如何传递线程的上下文信息。
原理:
InheritableThreadLocal
通过在Thread
类中新增一个inheritableThreadLocals
变量来实现。这个变量存储了需要传递给子线程的ThreadLocal
值。每当新线程创建时,inheritableThreadLocals
会被复制给子线程。
class Thread implements Runnable {
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}
- 当创建一个新线程时,
Thread
类的构造函数会将父线程的inheritableThreadLocals
传递给子线程,从而实现ThreadLocal
值的传递。
private void init() {
Thread parent = currentThread();
if (parent.inheritableThreadLocals != null) {
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
}
}
局限性:
InheritableThreadLocal
只适用于直接创建的子线程,对于线程池中的线程,它并不能自动传递父线程的ThreadLocal
值。- 不能完全满足复杂的异步执行场景需求。
2. TransmittableThreadLocal
(TTL)
由于 JDK 中的 InheritableThreadLocal
无法有效支持线程池场景,阿里巴巴开源了 TransmittableThreadLocal
(TTL),它基于装饰器模式增强了 InheritableThreadLocal
的功能,提供了线程池中 ThreadLocal
值的传递。
原理:
TTL 通过以下两种方式增强了 ThreadLocal
的传递:
- 自定义线程:TTL 在内部创建了自定义的线程类,在
run()
方法内手动将父线程的ThreadLocal
值传递给子线程。 - 线程池装饰器:TTL 还提供了装饰器,用于替换线程池中的默认线程类,确保线程池中的所有任务都能够传递
ThreadLocal
的值。
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.12.0</version>
</dependency>
应用场景:
- 压测流量标记:用于在压测环境中传递流量标记,确保在多线程情况下流量标记能正确地传递到每个任务。
- 分布式上下文传递:在分布式系统中,可以传递链路追踪信息(如 Trace ID)或用户上下文信息。
3. 应用场景
- 异步任务和线程池:当线程池中的任务需要传递某些上下文数据(如用户认证信息、请求 ID 或日志标识符)时,
TransmittableThreadLocal
允许在不同线程之间共享这些信息,而不需要显式地通过参数传递。 - 日志追踪:比如在日志中打印当前请求的
Trace ID
,如果没有合适的机制,可能会丢失关联的上下文信息,而ThreadLocal
或TransmittableThreadLocal
可以帮助在线程池中的任务之间传递该信息。
总结
InheritableThreadLocal
适用于简单的父子线程场景,可以传递父线程的ThreadLocal
值给子线程,但不适用于线程池。TransmittableThreadLocal
通过装饰器模式增强了InheritableThreadLocal
,支持在线程池等复杂场景中传递ThreadLocal
值。
为什么要使用线程池?
线程池是池化技术的应用之一,它主要通过预先创建并重用一定数量的线程来执行任务,从而解决频繁创建和销毁线程带来的性能问题。线程池的好处体现在以下几个方面:
1. 降低资源消耗
创建和销毁线程是资源密集型的操作,尤其在高并发场景下,频繁的线程创建和销毁会显著影响系统性能。使用线程池可以减少这种开销:
- 线程池会在系统启动时创建一定数量的线程,并将这些线程保持在池中,任务执行完毕后线程不会销毁,而是继续等待下一次任务。
- 这样,线程池中的线程可以被多个任务复用,减少了每次创建线程时所需的时间和内存开销。
2. 提高响应速度
当一个新的任务到来时,线程池可以直接从池中获取空闲线程来执行任务,而不需要等待线程创建的过程。这样可以显著提升任务的响应速度,尤其在任务频繁到达的场景下:
- 线程池会预先创建一定数量的线程,因此当任务到达时,可以直接分配空闲的线程,减少了等待时间。
- 如果线程池中的线程都在工作,且没有线程空闲,线程池会根据预设的策略(如拒绝策略)来处理新的任务。
3. 提高线程的可管理性
线程是系统中重要的资源,管理线程池可以有效地控制线程的生命周期,避免线程过多导致资源枯竭和系统不稳定:
- 统一管理:线程池统一管理线程的生命周期,可以更方便地进行调优和监控。
- 任务队列:线程池通常使用任务队列来缓存等待执行的任务,避免瞬间大量任务堆积造成系统崩溃。
- 灵活配置:线程池可以设置核心线程数、最大线程数、任务队列的大小等参数,提供更高的灵活性和可配置性。
4. 避免线程过多造成系统资源枯竭
线程数量的增长直接影响系统的稳定性和性能,过多的线程会导致系统资源(如 CPU 和内存)的竞争,造成资源枯竭和上下文切换的开销。线程池提供了有效的线程控制机制:
- 核心线程数:线程池可以设置一个核心线程数,即使在空闲时也会保持这些线程,避免了线程的频繁创建和销毁。
- 最大线程数:线程池还可以设置最大线程数,确保不会创建过多线程,从而避免系统资源耗尽。
- 任务队列:当线程池中的线程都在工作时,任务会被排队等待执行,而不会因为没有空闲线程而导致任务丢失。
5. 支持任务调度与超时控制
许多线程池实现提供了对任务的调度、执行时间限制和超时控制等功能:
- 调度任务:线程池支持定时任务的调度,可以定期执行某些任务,避免了重复创建线程进行定时任务的麻烦。
- 超时控制:线程池支持对任务的执行时间进行限制,任务超时后可以被取消或者采取其他措施,避免长时间阻塞影响系统其他任务的执行。
总结
使用线程池的主要好处包括:
- 降低资源消耗:复用线程,减少线程创建和销毁的开销。
- 提高响应速度:任务可以直接利用空闲线程,缩短响应时间。
- 提高线程的可管理性:集中管理线程,避免线程泄漏和过多线程导致系统不稳定。
- 避免线程过多造成资源枯竭:通过配置核心线程数和最大线程数来限制线程数量,避免系统资源被过度消耗。
- 支持任务调度与超时控制:线程池可以进行定时任务调度,管理任务的执行时间。
因此,线程池是一个在多线程编程中非常重要且不可或缺的工具,它能够有效提高系统性能、稳定性和可管理性。
如何创建线程池?
Java 提供了两种主要的方式来创建线程池:通过 ThreadPoolExecutor
构造函数创建线程池和通过 Executors
工具类创建线程池。每种方式适用于不同的场景,下面分别介绍这两种方式。
1. 通过 ThreadPoolExecutor
构造函数来创建(推荐)
ThreadPoolExecutor
是 java.util.concurrent
包中最灵活的线程池实现,可以通过构造函数来精确控制线程池的各种参数,如核心线程数、最大线程数、线程空闲时间等。它适用于需要高度定制的线程池场景。
ThreadPoolExecutor
构造函数
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
corePoolSize
:线程池维护的最小线程数,即使线程空闲,也会保留在池中等待任务。maximumPoolSize
:线程池允许的最大线程数,当任务数量过多时,线程池最多会创建这么多线程。keepAliveTime
:当线程池中的线程数超过corePoolSize
时,空闲线程存活的最大时间。当超过该时间后,空闲线程会被终止。unit
:keepAliveTime
参数的时间单位。workQueue
:用于存储等待执行任务的阻塞队列。常用的队列有LinkedBlockingQueue
(无界队列)和ArrayBlockingQueue
(有界队列)。
示例代码:
import java.util.concurrent.*;
public class ThreadPoolExecutorExample {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // core pool size
4, // maximum pool size
60, // keep alive time
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10) // work queue with capacity
);
for (int i = 0; i < 10; i++) {
executor.execute(new RunnableTask(i));
}
executor.shutdown();
}
static class RunnableTask implements Runnable {
private final int taskId;
public RunnableTask(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
System.out.println("Task " + taskId + " is executed by " + Thread.currentThread().getName());
}
}
}
ThreadPoolExecutor
提供了最大的灵活性,可以精细地控制线程池的行为,但也需要开发者自己管理线程池的生命周期(例如,调用 shutdown()
来关闭线程池)。
2. 通过 Executors
工具类来创建
Executors
是 Java 提供的一个工具类,提供了多种简单的方式来创建常用类型的线程池。虽然它不如 ThreadPoolExecutor
灵活,但它适用于大多数常见的线程池需求。
常用的线程池类型
FixedThreadPool
:固定大小的线程池,线程池的线程数量始终不变。当任务到达时,如果有空闲线程,则立即执行;如果没有,则任务会被排队等待,直到有线程空闲。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
SingleThreadExecutor
:只有一个线程的线程池,用于保证任务按顺序执行。如果有多个任务提交,它们会被排队,按顺序执行。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
CachedThreadPool
:根据需要创建新线程的线程池,如果线程池中有空闲线程,就复用它们;如果没有,就创建新的线程。适用于短时间任务量大、并发线程数不确定的场景。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ScheduledThreadPool
:可以延迟执行或者定期执行任务的线程池,适用于需要定时任务的场景。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4);
示例代码:
import java.util.concurrent.*;
public class ExecutorsExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(4);
for (int i = 0; i < 10; i++) {
executor.execute(new RunnableTask(i));
}
executor.shutdown();
}
static class RunnableTask implements Runnable {
private final int taskId;
public RunnableTask(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
System.out.println("Task " + taskId + " is executed by " + Thread.currentThread().getName());
}
}
}
总结
ThreadPoolExecutor
提供了更高的灵活性和配置选项,适用于需要精细控制线程池行为的场景。Executors
工具类 提供了简单易用的 API,可以快速创建一些常见类型的线程池,适用于大多数常见需求。
推荐在需要高度定制线程池的情况下使用 ThreadPoolExecutor
,而在简单的任务执行需求下,使用 Executors
工具类可以更方便快捷。
为什么不推荐使用内置线程池?
在 Java 中,通过 Executors
工具类创建的内置线程池是非常常见的,但它们并不总是最好的选择。阿里巴巴 Java 开发手册明确指出,线程资源应该通过自定义的 ThreadPoolExecutor
来提供,而避免使用 Executors
创建的线程池。主要原因如下:
1. Executors
线程池存在潜在的资源问题
Executors
工具类提供了几种常用的线程池实现,但是这些实现隐藏了很多潜在的资源风险,具体如下:
FixedThreadPool
和SingleThreadExecutor
:- 默认使用的队列是
LinkedBlockingQueue
,它是一个无界队列,任务队列的最大长度是Integer.MAX_VALUE
。如果任务提交过多,且线程池的处理速度跟不上任务的提交速度,会导致任务队列堆积,这可能会导致系统内存溢出(OutOfMemoryError, OOM)。
- 默认使用的队列是
CachedThreadPool
:- 使用的是
SynchronousQueue
,这是一个没有容量的队列,每提交一个任务就会创建一个新的线程。如果任务提交速度过快,且线程执行速度慢,会导致大量线程的创建,最终也会引发 OOM。
- 使用的是
ScheduledThreadPool
:- 使用的是无界的延迟队列
DelayedWorkQueue
,与其他线程池类似,若任务堆积,会导致大量任务进入队列,从而可能引起内存泄漏或者 OOM。
- 使用的是无界的延迟队列
这些线程池的行为可能导致系统资源消耗异常,尤其是在高并发环境下。由于 Executors
工具类无法精确控制线程池的队列类型、线程数量等参数,因此使用这些内置的线程池实现往往会造成不可预见的风险。
2. 隐藏的配置风险
使用 Executors
创建线程池时,它将配置的细节隐藏了起来。举个例子,newFixedThreadPool
和 newSingleThreadExecutor
都使用了 LinkedBlockingQueue
队列,并且该队列的长度是无限的(Integer.MAX_VALUE
),这使得我们无法控制队列的容量。而实际生产环境中,有时需要控制线程池的资源消耗,避免任务堆积导致的内存溢出。
推荐的做法是使用 ThreadPoolExecutor
来创建线程池,这样可以更加清晰地定义线程池的核心线程数、最大线程数、任务队列的类型及大小,从而避免潜在的风险。
3. 资源耗尽和“过度切换”的问题
不使用线程池或者使用不当的线程池时,可能会发生以下两种情况:
- 过度创建线程:每次提交任务时都创建新的线程,会导致线程数快速增加,消耗大量系统资源,最终可能会导致 OOM 错误。
- 线程频繁切换:如果没有使用合适的线程池来限制线程数量,线程的频繁创建与销毁会导致频繁的上下文切换,这不仅增加了 CPU 的负担,还可能导致性能下降。
线程池通过重用线程和合理的队列管理,避免了上述问题。它不仅能够减少线程创建与销毁的开销,还能避免系统资源过度消耗和线程切换的性能损失。
4. 为什么推荐 ThreadPoolExecutor
而非 Executors
?
ThreadPoolExecutor
提供了更多的配置选项和灵活性,可以精确地控制线程池的行为,避免 Executors
线程池中的潜在问题。通过 ThreadPoolExecutor
,开发人员可以:
- 控制线程池的 核心线程数、最大线程数 和 空闲线程存活时间。
- 选择 合适的任务队列 类型,避免无界队列带来的风险。
- 在任务队列满时,能够设置 拒绝策略(例如抛出异常、调用者运行等)。
总结
尽管 Executors
提供了快速创建线程池的方法,但它隐藏了线程池的实现细节,可能导致资源管理不当、内存泄漏或 OOM 错误。为了避免这些潜在问题,应该尽量避免使用 Executors
工具类来创建线程池,而是通过 ThreadPoolExecutor
来创建线程池,这样能够让开发人员更明确地管理线程池的配置和资源限制。
⭐️ 线程池常见参数及解释
核心参数:
corePoolSize:线程池的核心线程数,即使这些线程处于空闲状态,线程池也会保持它们的存活。
maximumPoolSize:线程池的最大线程数。当任务队列已满且核心线程数不足时,线程池会创建新线程,直到达到最大线程数。
workQueue:任务队列,用于存放等待执行的任务。当线程池的当前线程数达到核心线程数时,新的任务会被放入此队列。
其他参数:
keepAliveTime:非核心线程空闲时,存活的最长时间。超过此时间,非核心线程会被回收。
unit:
keepAliveTime
的时间单位(如秒、毫秒等)。threadFactory:线程工厂,用于创建新线程。默认情况下,线程池会使用
Executors.defaultThreadFactory()
。handler:拒绝策略。当线程池中的任务超出容量时,执行此策略。常见的策略有:
AbortPolicy
、CallerRunsPolicy
、DiscardPolicy
、DiscardOldestPolicy
。
线程池工作流程:
- 任务到达线程池后,首先检查当前线程池的核心线程数。如果正在运行的线程数小于
corePoolSize
,则直接创建新线程处理任务。 - 如果线程数达到
corePoolSize
,且队列没有满,新任务将被加入队列。 - 当队列满且线程数小于
maximumPoolSize
时,线程池会创建新的非核心线程来处理任务。 - 如果线程数达到
maximumPoolSize
,则拒绝任务并根据handler
执行拒绝策略。
线程池的核心线程会被回收吗?
默认情况下,ThreadPoolExecutor
不会回收核心线程,即使它们处于空闲状态。这是为了避免频繁创建和销毁线程所带来的性能开销,因为核心线程通常设计为长期保持活跃。
但是,如果线程池用于周期性任务,且周期之间有较长空闲时间,可以通过设置 allowCoreThreadTimeOut(true)
来启用核心线程的回收机制。这时,核心线程也会遵循 keepAliveTime
指定的空闲时间回收。
public void allowCoreThreadTimeOut(boolean value) {
// 核心线程的 keepAliveTime 必须大于 0 才能启用超时机制
if (value && keepAliveTime <= 0) {
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
}
// 设置 allowCoreThreadTimeOut 的值
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
// 如果启用了超时机制,清理所有空闲的线程,包括核心线程
if (value) {
interruptIdleWorkers();
}
}
}
核心线程空闲时处于什么状态?
设置了核心线程的存活时间:
核心线程空闲时,会处于WAITING
状态,等待任务。如果阻塞时间超过了核心线程的存活时间(keepAliveTime
),该线程会退出并被移除,状态变为TERMINATED
。没有设置核心线程的存活时间:
核心线程空闲时,会一直处于WAITING
状态,等待任务,直到线程池被关闭。
当任务队列中有任务时,线程会被唤醒,状态从 WAITING
转为 RUNNABLE
,然后执行任务。
源码分析
线程池内部使用 Worker
来处理任务。Worker
会不断从任务队列(BlockingQueue
)中获取任务。获取任务的行为由 timed
标记决定:
timed == true
:使用poll()
方法来获取任务。若超时,则线程退出,状态变为TERMINATED
,并从线程池中移除。timed == false
:使用take()
方法阻塞当前线程,直到任务可用。
相关源码
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
timed == true
:如果核心线程设置了存活时间,或者线程数超过了核心线程数,线程使用poll()
获取任务。timed == false
:线程使用take()
,如果队列为空,则线程会一直等待。
这样,getTask()
方法根据配置的线程池参数来灵活管理线程的空闲与回收状态。
⭐️ 线程池的拒绝策略
当线程池中的线程数量已达到最大线程数,并且任务队列也已满时,ThreadPoolExecutor
提供了几种拒绝策略:
ThreadPoolExecutor.AbortPolicy
:抛出RejectedExecutionException
来拒绝新任务的执行。ThreadPoolExecutor.CallerRunsPolicy
:让调用execute
方法的线程直接运行被拒绝的任务。这种策略会影响性能,因为它可能导致调用线程的阻塞,适用于能够接受延迟的场景。ThreadPoolExecutor.DiscardPolicy
:丢弃新任务,不做任何处理,也不抛出异常。ThreadPoolExecutor.DiscardOldestPolicy
:丢弃任务队列中最早的未处理任务,并尝试提交当前任务。
例子:CallerRunsPolicy
当使用 CallerRunsPolicy
时,任务不会被丢弃或抛出异常,而是交由调用线程(而非线程池中的线程)来执行。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 直接由调用者线程执行被拒绝的任务
r.run();
}
}
}
这种策略会减少任务的丢失,但会对调用线程的执行产生影响,因此适用于要求任务必须执行的场景。
如果不允许丢弃任务,应该选择哪个拒绝策略?
应该选择 CallerRunsPolicy
。这个策略会确保任务不会被丢弃,也不会抛出异常。被拒绝的任务将被回退到调用 execute
方法的线程中执行。
CallerRunsPolicy
源码分析:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 只要当前程序没有关闭,就用执行 execute 方法的线程执行该任务
if (!e.isShutdown()) {
r.run();
}
}
}
从源码中可以看出,CallerRunsPolicy
通过调用 r.run()
,将任务交给调用线程来执行,前提是线程池没有关闭。这确保了任务不会丢失,但可能会影响调用线程的性能,因此适用于要求任务必须执行且可以承受延迟的场景。
CallerRunsPolicy
拒绝策略的风险
CallerRunsPolicy
拒绝策略在任务队列已满且线程池已达到最大线程数时,会让调用 execute
方法的线程(通常是主线程)执行被拒绝的任务。这种策略确保任务不丢失,但也带来了风险:
- 主线程阻塞:如果被拒绝的任务是耗时的任务,主线程将会被阻塞,无法继续执行其他操作,可能导致程序性能大幅下降,甚至引发系统的**“阻塞”**问题。
- 后续任务无法执行:如果主线程被阻塞,后续提交的任务需要等待主线程执行完毕后才能继续提交,严重时可能会导致任务积压甚至OOM(内存溢出)。
如何解决?
增加阻塞队列大小和线程池最大线程数:
- 通过增加阻塞队列的大小,可以容纳更多的任务,减少任务被拒绝的可能性。
- 调整线程池的
maximumPoolSize
参数,提高最大线程数,增加并发任务的处理能力,从而减少主线程的阻塞。
这种做法能够有效利用服务器资源,但当资源达到极限时,仍然可能出现瓶颈。
任务持久化:
- 当线程池无法处理新任务时,可以将任务持久化到外部存储(如数据库、Redis、消息队列等)。
- 设计自定义的
RejectedExecutionHandler
,在拒绝任务时,将任务存入数据库或消息队列,并在有空闲线程时,从存储中获取任务继续处理。
示例方案:
- 使用 MySQL 数据库存储任务,将任务暂时保存在数据库中,等线程池有空闲资源时再从数据库中取出任务。
- 可以继承
BlockingQueue
,实现一个混合式阻塞队列,优先从数据库中获取最早的任务。
这种方式能够保证任务不丢失,并且避免了主线程被阻塞的风险。
临时线程处理:
- 类似于 Netty 的做法,可以在拒绝任务时创建临时线程来处理这些任务。
- 这种做法能够确保任务不丢失并尽快得到处理,但需要良好的硬件设备支持,并且临时创建的线程不易管理和监控。
示例:
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { // 创建一个临时线程来处理任务 final Thread t = new Thread(r, "Temporary task executor"); t.start(); } catch (Throwable e) { throw new RejectedExecutionException("Failed to start a new thread", e); } } }
限时阻塞等待:
- 类似于 ActiveMQ 的做法,可以在拒绝任务时尝试阻塞等待,尽最大可能将任务入队。
- 这种做法适用于高并发场景,确保任务尽量被执行,但可能会增加系统的延迟。
示例:
new RejectedExecutionHandler() { @Override public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { try { // 限时阻塞等待任务入队 executor.getQueue().offer(r, 60, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RejectedExecutionException("Interrupted waiting for task"); } } }
总结
CallerRunsPolicy
在保证任务不丢失的同时,会导致主线程被阻塞,影响后续任务的执行。为了解决这一问题,可以通过增加阻塞队列的大小、提升线程池最大线程数、持久化任务或使用临时线程等方式来确保任务的高效执行。
线程池常用的阻塞队列
不同类型的线程池使用不同的阻塞队列来管理任务的排队和处理。常见的阻塞队列包括:
1. LinkedBlockingQueue
(有界阻塞队列)
- 应用场景:
FixedThreadPool
和SingleThreadExecutor
- 特点:
- 容量为
Integer.MAX_VALUE
,理论上队列可以无限增长,只有当内存用尽时才会抛出OutOfMemoryError
。 - 是一个基于链表实现的队列,每次操作(入队、出队)都需要锁定头部或尾部,线程安全。
FixedThreadPool
和SingleThreadExecutor
中的线程数最多为核心线程数,任务队列会用来存放任务。- 适用于任务数量较多,但任务处理较为均衡的场景。
- 容量为
2. SynchronousQueue
(同步队列)
- 应用场景:
CachedThreadPool
- 特点:
- 没有容量,每个插入操作必须等待一个移除操作,反之亦然。即每个提交的任务要么立即由空闲线程执行,要么新建一个线程来执行。
- 不存储任务,适用于任务提交的速度较快,但任务执行时间较短,或者线程池希望动态增加线程数来快速处理任务的场景。
- 由于没有队列容量,
CachedThreadPool
的最大线程数是Integer.MAX_VALUE
,这意味着可能会创建大量线程,从而导致OutOfMemoryError
。
3. DelayedWorkQueue
(延迟队列)
- 应用场景:
ScheduledThreadPool
和SingleThreadScheduledExecutor
- 特点:
- 内部任务会根据延迟时间进行排序,使用“堆”结构来保证每次出队的任务都是当前最早需要执行的任务。
- 队列的扩容是自动的,当队列满时会扩容 50%,最大容量为
Integer.MAX_VALUE
。 - 用于定时任务调度,能够保证任务按照延迟时间顺序执行。
ScheduledThreadPool
和SingleThreadScheduledExecutor
使用该队列来管理定时任务。
4. ArrayBlockingQueue
(有界阻塞队列)
- 应用场景:常用于一般的生产者-消费者场景
- 特点:
- 底层由数组实现,队列的容量在创建时指定,且无法更改。
- 在容量有限的情况下,适用于任务需要被严格控制,防止队列过大导致内存问题。若队列已满,新任务将会被阻塞。
- 适用于控制任务提交的速率或者资源有限时的场景。
总结
FixedThreadPool
和SingleThreadExecutor
使用LinkedBlockingQueue
,适合长期运行的线程池,并能有效避免过多线程的创建。CachedThreadPool
使用SynchronousQueue
,适合处理大量短时间的任务,但可能会导致线程数过多。ScheduledThreadPool
使用DelayedWorkQueue
,适合定时任务,能够按延迟时间顺序执行任务。ArrayBlockingQueue
适用于资源有限且需要控制任务积压的场景,阻塞队列大小固定。
线程池处理任务的流程
线程池在处理任务时,遵循以下流程:
当前运行的线程数小于核心线程数
- 如果当前线程池中正在运行的线程数小于核心线程数,线程池会直接创建一个新线程来执行任务,不会被阻塞。
当前运行的线程数大于等于核心线程数,但小于最大线程数
- 如果线程池的当前运行线程数等于或大于核心线程数,但小于最大线程数,则会把任务提交到任务队列中(如
BlockingQueue
)。任务会在队列中排队等待执行。
- 如果线程池的当前运行线程数等于或大于核心线程数,但小于最大线程数,则会把任务提交到任务队列中(如
任务队列已满,且当前线程数小于最大线程数
- 如果任务队列已满,但当前线程数仍小于最大线程数,则会创建一个新的线程来执行任务。
当前线程数等于最大线程数
- 如果线程池中的线程数已经达到最大线程数,则无法再创建新线程,此时会根据线程池的拒绝策略来处理新任务,调用
RejectedExecutionHandler.rejectedExecution()
方法。拒绝策略可以是:- 抛出异常(
AbortPolicy
)。 - 将任务交给调用者执行(
CallerRunsPolicy
)。 - 丢弃最旧的任务(
DiscardOldestPolicy
)。 - 丢弃当前任务(
DiscardPolicy
)。
- 抛出异常(
- 如果线程池中的线程数已经达到最大线程数,则无法再创建新线程,此时会根据线程池的拒绝策略来处理新任务,调用
线程池的预热
在某些场景中,我们希望在任务提交之前,提前创建好线程池中的核心线程,这样可以避免任务提交时出现等待的情况。ThreadPoolExecutor
提供了以下两个方法来实现线程池的预热:
prestartCoreThread()
:- 该方法会启动一个核心线程,并使其等待任务。如果核心线程已经达到最大值,则该方法返回
false
,否则返回true
。
- 该方法会启动一个核心线程,并使其等待任务。如果核心线程已经达到最大值,则该方法返回
prestartAllCoreThreads()
:- 该方法会启动所有核心线程,并返回启动成功的核心线程数。如果核心线程数已达到最大值,则不会启动更多线程。
这两个方法可以帮助我们在应用启动时提前初始化线程池,减少因首次任务提交而导致的线程创建延迟,提升系统的响应能力。
线程池中线程异常后,销毁还是复用?
在 Java 线程池中,任务执行时出现异常的行为,取决于你是使用 execute()
还是 submit()
提交任务:
使用
execute()
提交任务:- 当任务通过
execute()
提交到线程池,且在执行过程中抛出异常时:- 如果异常没有在任务内部被捕获,异常会导致当前线程终止。
- 线程池会检测到该线程终止,并创建一个新的线程来替代它,从而保持线程池的线程数不变。
- 异常会被打印到控制台或日志中,但不会影响后续任务的执行。
- 当任务通过
使用
submit()
提交任务:- 如果任务是通过
submit()
提交,并且执行过程中抛出异常,异常不会直接打印出来。 - 该异常会被封装在
Future
对象中,当调用Future.get()
方法时,异常会被包装成一个ExecutionException
抛出。 - 在这种情况下,线程池中的线程不会因为异常而终止,而是继续复用,准备执行后续的任务。
- 如果任务是通过
总结
execute()
: 任务抛出未捕获异常时,导致当前线程终止,并且线程池会创建新线程来替代。submit()
: 任务抛出未捕获异常时,异常被封装在Future
中,线程池中的线程继续复用。
这两种方式的设计让线程池在不同场景下有不同的行为:
execute()
适合那些不关心任务执行结果、也不需要处理异常的场景。submit()
提供了更灵活的异常处理机制,允许调用者决定如何处理任务中的异常。
如何给线程池命名?
线程池中的线程命名对于排查问题和日志记录非常有帮助。默认情况下,线程池中的线程名称类似于 pool-1-thread-n
,这类名称没有业务含义,不利于我们定位问题。因此,通常在创建线程池时,会显式地为线程池中的线程命名。
有两种常用的命名方式:
1. 利用 Guava 的 ThreadFactoryBuilder
Guava 提供了一个便捷的 ThreadFactoryBuilder
,可以方便地为线程池中的线程命名,示例如下:
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "-%d")
.setDaemon(true)
.build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
在上述代码中,setNameFormat
用来设置线程的名称格式,其中 %d
表示线程池中线程的编号。通过这种方式,你可以轻松地为线程池中的线程添加有意义的命名,方便问题定位。
2. 自定义 ThreadFactory
如果你不想使用 Guava,或者有更复杂的需求,可以自定义一个 ThreadFactory
来为线程池中的线程命名:
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public final class NamingThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger();
private final String name;
public NamingThreadFactory(String name) {
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
return t;
}
}
在这个实现中,每次创建新线程时,都会根据传入的 name
为线程命名,并且为每个线程分配一个唯一的编号。你可以通过这种方式为线程池中的线程设置任何符合业务需求的名称。
通过这两种方式,你可以为线程池中的线程命名,从而更方便地追踪线程的行为并诊断问题。
如何设定线程池的大小?
线程池的大小是性能调优中的关键因素,设置合适的线程数对于避免资源浪费、提高任务处理效率至关重要。过大或过小的线程池都可能导致问题。下面分析如何设定线程池的大小,以及如何针对不同类型的任务进行调整。
1. 线程池大小的影响
- 线程数过小:会导致任务队列堆积,造成大量任务等待,甚至可能导致内存溢出(OOM)。如果任务队列被填满,且没有更多的线程去执行,系统会出现瓶颈,CPU 资源得不到充分利用。
- 线程数过大:线程数量过多会导致频繁的上下文切换。虽然线程池允许多个线程并行工作,但每次切换线程时都需要保存和加载线程的状态,这会增加 CPU 的负担,从而降低执行效率。
2. 基于任务类型设定线程池大小
线程池大小的合理设定需要依据任务的性质来决定。常见的任务类型可以分为 CPU 密集型任务 和 I/O 密集型任务。
CPU 密集型任务:这类任务主要依赖 CPU 的计算能力,线程的数量应该与 CPU 核心数相匹配,或者稍微多一个线程来避免缺页中断等问题。
- 推荐公式:
线程池大小 = N + 1
,其中N
是 CPU 核心数。 - 示例:如果你的服务器有 4 个 CPU 核心,那么可以配置线程池大小为 4 + 1 = 5。
- 推荐公式:
I/O 密集型任务:这类任务主要涉及 I/O 操作(如网络请求、文件读写等),线程大部分时间是等待 I/O 操作完成,而不是在执行计算任务。在这种情况下,CPU 并不会频繁被占用,线程池的大小可以配置为两倍于 CPU 核心数,以便更好地利用 CPU。
- 推荐公式:
线程池大小 = 2 * N
,其中N
是 CPU 核心数。 - 示例:对于 4 核 CPU 的服务器,可以配置线程池大小为 8。
- 推荐公式:
3. 更精确的线程数计算
一个更精确的线程数计算公式是基于任务的等待时间和计算时间的比值来动态调整线程数:
最佳线程数 = N * (1 + WT / ST)
其中:
N
是 CPU 核心数WT
是线程的等待时间ST
是线程的计算时间
对于 CPU 密集型任务,WT/ST
比例接近于 0,线程池的大小大致为 N
,即 CPU 核心数。
对于 I/O 密集型任务,WT/ST
比例较大,线程池的大小可以设置为 2 * N
。
4. 使用 JDK 工具监控线程池性能
为了更精确地判断任务类型和计算线程池的大小,可以使用 JDK 自带的 VisualVM 工具,它可以帮助你查看线程的等待时间和计算时间的比值 (WT/ST
)。
5. 动态调整线程池大小
对于大多数生产环境,线程池的大小需要根据实际的负载和资源使用情况进行动态调整。有些框架或应用会根据实时监控数据自动调整线程池的大小,以确保资源的最大化利用和系统稳定性。
例如,美团的线程池实现就支持线程池参数的动态配置,可以在运行时调整线程池的核心大小、最大大小、队列长度等参数。这种方式能有效应对负载的波动,保证任务的及时处理。
总结
- CPU 密集型任务:线程池大小设置为 CPU 核心数 + 1。
- I/O 密集型任务:线程池大小设置为 CPU 核心数 * 2。
- 动态调整线程池大小是确保资源利用最大化和避免系统瓶颈的关键。
合理设置线程池大小可以避免不必要的性能损耗,并提高系统的响应效率。
如何动态修改线程池的参数?
在 Java 中,ThreadPoolExecutor
提供了丰富的 API 来修改线程池的核心参数。要实现动态调整线程池参数(如核心线程数、最大线程数和队列长度),需要根据实际需求选择合适的方法和策略。
核心线程数 (corePoolSize
) 和最大线程数 (maximumPoolSize
)
ThreadPoolExecutor
提供了 setCorePoolSize(int corePoolSize)
和 setMaximumPoolSize(int maximumPoolSize)
方法来动态修改这两个参数:
setCorePoolSize(int corePoolSize)
:设置线程池的核心线程数。对于核心线程数,线程池会维持至少这个数量的工作线程,即使在空闲状态下也会存在。修改时,如果当前线程数大于corePoolSize
,多余的线程会被回收。setMaximumPoolSize(int maximumPoolSize)
:设置线程池的最大线程数。在任务队列已满且工作线程数未达到最大线程数时,线程池可以扩展线程数。需要注意,最大线程数的调整是有限制的,超出最大线程数后,任务会被拒绝。
空闲线程的存活时间 (keepAliveTime
)
setKeepAliveTime(long time, TimeUnit unit)
:设置非核心线程的最大空闲时间。当线程池的线程数大于corePoolSize
时,非核心线程会在空闲超过指定时间后被销毁。这个时间设置可以影响线程池的动态扩展和回收效率。
队列 (workQueue
)
ThreadPoolExecutor
默认使用的 BlockingQueue
是不可变的,因此无法直接调整其容量。如果需要动态调整队列容量,可以考虑以下两种方法:
- 自定义队列:实现一个继承自
BlockingQueue
的队列,并提供动态调整容量的功能。通过自定义队列的resize
方法来支持在运行时调整队列的大小。 - 替代队列实现:使用现成的开源队列实现,如美团的
ResizableCapacityLinkedBlockingQueue
,它通过去掉LinkedBlockingQueue
的final
修饰符来实现动态调整队列大小。
示例代码
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60L, TimeUnit.SECONDS, // keepAliveTime
new LinkedBlockingQueue<>(100) // workQueue
);
// 动态修改核心线程数
executor.setCorePoolSize(8);
// 动态修改最大线程数
executor.setMaximumPoolSize(15);
// 动态修改队列容量(通过自定义队列)
ResizableCapacityLinkedBlockingQueue queue = new ResizableCapacityLinkedBlockingQueue(100);
executor.setQueue(queue);
queue.setCapacity(200); // 修改队列容量
结论
要实现动态修改线程池的核心参数,关键是利用 ThreadPoolExecutor
提供的 API 来调整 corePoolSize
、maximumPoolSize
和 keepAliveTime
,以及通过自定义队列来动态管理 workQueue
的容量。对于需要高度灵活配置的场景,可以考虑使用开源工具如 Hippo4j
或 Dynamic TP
,这些工具能够在无需修改代码的情况下,通过配置中心实现线程池的动态调整。
如何设计一个能够根据任务的优先级来执行的线程池?
为了根据任务的优先级来执行任务,可以设计一个线程池,使用支持优先级的阻塞队列,如 PriorityBlockingQueue
。这种队列可以根据任务的优先级来控制任务的执行顺序,通常用于需要处理高优先级任务的场景。
设计思路
选择优先级队列
使用PriorityBlockingQueue
作为任务队列。PriorityBlockingQueue
是无界的阻塞队列,支持按优先级顺序执行任务。底层采用小顶堆(最小堆)实现,优先级最高的任务(值最小的任务)会最先出队。任务优先级定义
提交到线程池的任务必须能够比较优先级,可以通过以下两种方式定义任务的优先级:- 实现
Comparable
接口,重写compareTo
方法,定义任务的优先级。 - 提交任务时,使用
Comparator
对象定义任务之间的排序规则。
- 实现
线程池设计
使用ThreadPoolExecutor
构建线程池,并将PriorityBlockingQueue
传入workQueue
参数。
关键点
线程池核心参数
corePoolSize
:核心线程数maximumPoolSize
:最大线程数workQueue
:使用PriorityBlockingQueue
,确保任务按照优先级执行。
无界队列的风险
PriorityBlockingQueue
是无界的,可能会导致任务堆积,甚至引发内存溢出(OOM)。为了解决这一问题,可以对队列的大小进行限制,通过继承PriorityBlockingQueue
并重写offer
方法来限制入队的元素数量。避免饥饿问题
在无界队列中,低优先级的任务可能长期得不到执行,这种现象被称为“饥饿”。可以通过设置任务的最大等待时间或使用其他策略(如动态提升优先级)来减少饥饿问题。性能问题
使用PriorityBlockingQueue
会导致一定的性能开销,因为需要对队列中的任务进行排序操作。因此,在任务数较多时,性能影响可能比较明显,但在多数业务场景下,这点性能损耗是可以接受的。
示例代码
import java.util.concurrent.*;
import java.util.Comparator;
class PriorityTask implements Runnable, Comparable<PriorityTask> {
private final int priority; // 任务优先级
private final String taskName;
public PriorityTask(int priority, String taskName) {
this.priority = priority;
this.taskName = taskName;
}
public int getPriority() {
return priority;
}
@Override
public void run() {
System.out.println("Executing task: " + taskName);
}
@Override
public int compareTo(PriorityTask o) {
// 优先级越低的任务越先执行
return Integer.compare(this.priority, o.priority);
}
@Override
public String toString() {
return taskName + " with priority " + priority;
}
}
public class PriorityThreadPoolExample {
public static void main(String[] args) {
// 定义一个自定义的 Comparator,用来比较任务优先级
Comparator<PriorityTask> comparator = Comparator.comparingInt(PriorityTask::getPriority);
// 使用 PriorityBlockingQueue
PriorityBlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>(10, comparator);
// 创建线程池,使用 PriorityBlockingQueue 作为任务队列
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60L, TimeUnit.SECONDS, // keepAliveTime
queue // 使用 PriorityBlockingQueue
);
// 提交优先级不同的任务
executor.submit(new PriorityTask(3, "Task 1"));
executor.submit(new PriorityTask(1, "Task 2"));
executor.submit(new PriorityTask(5, "Task 3"));
executor.submit(new PriorityTask(2, "Task 4"));
// 等待所有任务完成
executor.shutdown();
}
}
解决方案总结
优先级队列
使用PriorityBlockingQueue
能有效地根据任务的优先级顺序来执行任务。任务排序
任务的优先级可以通过实现Comparable
接口或使用Comparator
进行排序。内存和性能优化
需要防止任务堆积导致的内存溢出,可以限制队列大小或设置最大任务等待时间。性能开销可能不可避免,但大部分场景下是可接受的。饥饿问题解决
可通过任务重新入队并提升优先级等方式来避免饥饿现象。
通过这种设计,可以实现一个根据任务优先级调度任务的线程池,适用于需要严格区分任务优先级的场景。
二、Future
重点是要掌握 CompletableFuture
的使用以及常见知识点。
除了下面的面试题之外,还推荐你看看我写的这篇文章: CompletableFuture 详解。
Future
类有什么用?
Future
类是 Java 中实现异步任务执行的关键工具。它主要用于管理并控制异步执行的任务,允许你在任务执行的同时进行其他操作,并最终获取任务的结果。通过使用 Future
,可以避免程序阻塞在耗时任务上,从而提高程序的执行效率。
核心功能
Future
接口定义了 5 个方法,帮助你管理任务的执行过程:
cancel(boolean mayInterruptIfRunning)
取消任务的执行。如果任务尚未开始执行,或者已经完成,该方法返回false
;如果任务已经开始且可以中断,则返回true
。isCancelled()
检查任务是否已经被取消。isDone()
检查任务是否已经完成,无论是正常完成、被取消还是由于异常结束。get()
获取任务的执行结果。如果任务尚未完成,调用此方法会阻塞直到任务完成并返回结果。get(long timeout, TimeUnit unit)
获取任务的执行结果,但如果在指定的时间内没有完成,会抛出TimeoutException
。
典型应用场景
异步任务执行:
当你有一个耗时的任务(如计算、网络请求等),可以将其交给一个独立的线程去执行,而主线程可以继续做其他事情。通过Future
,你可以在稍后获取任务的结果。任务取消:
可以在任务执行过程中检查其状态,甚至在必要时取消任务。等待任务完成:
你可以让主线程在需要时阻塞,直到任务完成,并返回结果。
示例代码
import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交一个任务并返回 Future
Future<Integer> future = executor.submit(() -> {
try {
Thread.sleep(2000); // 模拟耗时任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return 123; // 任务返回的结果
});
// 主线程可以继续做其他事情
System.out.println("Doing other work...");
try {
// 获取任务执行结果,若任务还未完成则会阻塞
Integer result = future.get();
System.out.println("Task result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}
总结
Future
类是 Java 中的异步任务管理工具,它能帮助你在多线程环境中提交任务、获取结果、取消任务、检查任务状态等。- 它使得主线程可以在执行耗时任务时并行处理其他工作,最终在需要时通过
get()
获取任务的结果。 Future
类适用于大多数异步处理场景,尤其是在需要等待任务执行结果或判断任务是否完成时。
Callable
和 Future
有什么关系?
Callable
和 Future
都是用于多线程编程中异步任务管理的重要接口,它们通常一起使用来实现任务的提交、执行和结果的获取。Callable
和 Future
之间的关系可以通过 FutureTask
来进一步理解。
1. Callable
接口
Callable
是一个可以执行并返回结果的任务接口。与 Runnable
不同的是,Runnable
无法返回结果,而 Callable
在执行完成后能够返回一个结果或者抛出异常。
Callable
的特点:Callable
接口的call()
方法可以返回一个值,或者抛出异常。call()
方法支持抛出异常,这与Runnable.run()
方法不同。Callable
通常被用作长时间运行的任务或需要计算结果的任务。
public interface Callable<V> {
V call() throws Exception;
}
2. Future
接口
Future
接口表示异步计算的结果。它提供了一些方法来检查任务的执行状态、取消任务和获取结果。
Future
的主要功能:cancel(boolean mayInterruptIfRunning)
:取消任务。isCancelled()
:检查任务是否被取消。isDone()
:检查任务是否执行完成。get()
:获取任务的执行结果,阻塞直到任务完成。get(long timeout, TimeUnit unit)
:在指定时间内获取任务结果,如果超时则抛出异常。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
3. FutureTask
类
FutureTask
是 Future
接口的一个实现类,既可以执行任务,又可以获取任务的执行结果。它实现了 Runnable
和 Future
接口,能够在执行时管理任务的生命周期。FutureTask
主要用于包装 Callable
或 Runnable
任务,使其既能通过 Executor
提交,也能用于获取任务的结果。
FutureTask
的作用:FutureTask
封装了Callable
或Runnable
任务,允许任务的执行并返回结果。- 它实现了
Runnable
接口,可以被线程池执行。 - 它实现了
Future
接口,可以用于查询任务的状态、取消任务和获取结果。
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Callable<V> callable) {
// 使用Callable任务构造
}
public FutureTask(Runnable runnable, V result) {
// 使用Runnable任务构造
}
@Override
public void run() {
// 执行任务
}
@Override
public V get() throws InterruptedException, ExecutionException {
// 获取任务结果
}
}
4. ExecutorService.submit()
和 FutureTask
在 ExecutorService.submit()
方法中,提交的任务可能是 Callable
或 Runnable
,返回值是 Future
类型。而实际返回的 Future
对象,通常是 FutureTask
类的实例。通过 FutureTask
,我们能够异步执行任务,并且在任务完成时获取结果。
ExecutorService executor = Executors.newCachedThreadPool();
Callable<Integer> task = () -> {
// 执行任务并返回结果
return 123;
};
Future<Integer> future = executor.submit(task); // 返回 FutureTask 实例
// 获取结果
Integer result = future.get();
System.out.println("Task result: " + result);
总结
Callable
:定义了任务需要执行的具体逻辑,支持返回结果和抛出异常。Future
:提供任务执行的结果、状态检查、取消功能等。它不能执行任务,而是用来获取任务的执行结果。FutureTask
:实现了Future
和Runnable
接口,它是Future
的实现类,可以执行Callable
任务并在任务完成后获取结果。ExecutorService.submit()
返回的就是一个FutureTask
实例。
通过 FutureTask
,可以将 Callable
任务包装成 Runnable
任务,并通过 Future
接口管理任务的执行,获得结果或处理异常。
CompletableFuture
类有什么用?
CompletableFuture
是 Java 8 引入的一个类,用于解决 Future
的局限性,并且提供了更为强大和灵活的异步编程能力。相比于 Future
,CompletableFuture
不仅支持基本的异步任务执行,还引入了异步任务的组合和流式处理,使得异步编程更加灵活和可扩展。
1. CompletableFuture
和 Future
的区别
Future
的限制:Future
的get()
方法是阻塞调用,这会导致主线程在等待结果时停滞不前。- 不支持异步任务的组合和处理,无法链式调用多个异步任务。
CompletableFuture
的优势:CompletableFuture
不仅实现了Future
接口,还实现了CompletionStage
接口,这使得它支持异步任务的组合和链式调用。- 它支持回调处理和任务的自动完成,能够有效避免阻塞,提供更丰富的异步编程能力。
2. CompletableFuture
的关键特性
- 支持异步任务组合:通过链式调用,可以轻松地将多个异步任务串联起来,按需组合和处理。
- 不需要手动调用
get()
:可以通过回调函数(如thenApply
、thenAccept
等)在任务完成后自动处理结果,避免了阻塞式的get()
调用。 - 支持并行计算:多个异步任务可以并行执行,并且可以组合成一个大任务。
- 异常处理:提供了处理异步任务异常的能力,如
exceptionally
方法。
3. CompletableFuture
的常用方法
supplyAsync()
:异步执行一个返回值的任务。runAsync()
:异步执行一个没有返回值的任务。thenApply()
:在任务完成后对结果进行处理,返回一个新的CompletableFuture
。thenAccept()
:在任务完成后处理结果,但不返回新值。thenCombine()
:组合两个异步任务的结果,生成一个新的结果。exceptionally()
:处理异步任务发生异常时的情况。
4. 使用示例
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
// 异步任务 1
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("Task 1 running in: " + Thread.currentThread().getName());
return 100;
});
// 异步任务 2
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Task 2 running in: " + Thread.currentThread().getName());
return 200;
});
// 组合异步任务
CompletableFuture<Integer> result = future1.thenCombine(future2, (res1, res2) -> {
System.out.println("Combining results in: " + Thread.currentThread().getName());
return res1 + res2;
});
// 处理结果
result.thenAccept(res -> System.out.println("Final result: " + res));
// 等待异步任务完成
result.join();
}
}
5. 执行流程解析
future1
和future2
是两个独立的异步任务,它们会在不同的线程中并行执行。thenCombine()
用于将两个异步任务的结果合并。只有在future1
和future2
都执行完毕后,thenCombine
才会执行。thenAccept()
用于处理最终的合并结果,输出最终的结果。join()
用来确保主线程等待异步任务完成。
6. 异常处理
CompletableFuture
提供了优雅的异常处理机制,通过 exceptionally()
可以捕获并处理异步任务中发生的异常。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Something went wrong");
}).exceptionally(ex -> {
System.out.println("Exception occurred: " + ex.getMessage());
return 0; // Provide default value in case of error
});
future.thenAccept(result -> System.out.println("Result: " + result));
总结
CompletableFuture
使得异步编程变得更加灵活,提供了比Future
更强大的功能,如任务链式调用、异步任务组合、异常处理等。CompletionStage
接口 赋予了CompletableFuture
这种流式编程的能力,使得多个异步任务可以像流水线一样处理。- 函数式编程支持 使得异步任务的编排和组合变得更加简洁和优雅。
通过 CompletableFuture
,你可以更高效地处理并发任务,并将多个异步操作组合成一个完整的任务流程,极大提升了多线程编程的可读性和可维护性。
任务依赖编排设计:T3 依赖 T1 和 T2 完成后执行
这个任务编排场景可以通过 CompletableFuture
来轻松实现。通过 allOf()
方法,我们可以等待多个任务完成后,再执行依赖的任务。
设计思路:
- T1 和 T2 并行执行:首先创建两个独立的任务
T1
和T2
,它们各自执行自己的操作。 - 等待 T1 和 T2 完成:使用
CompletableFuture.allOf()
方法来将这两个任务组合起来,确保 T1 和 T2 都执行完毕后再触发后续任务。 - T3 执行:在 T1 和 T2 都完成后,通过
thenRunAsync()
方法执行 T3。
代码实现:
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.thread.ThreadUtil;
import java.util.concurrent.CompletableFuture;
public class TaskDependencyExample {
public static void main(String[] args) {
// T1
CompletableFuture<Void> futureT1 = CompletableFuture.runAsync(() -> {
System.out.println("T1 is executing. Current time:" + DateUtil.now());
// 模拟耗时操作
ThreadUtil.sleep(1000);
});
// T2
CompletableFuture<Void> futureT2 = CompletableFuture.runAsync(() -> {
System.out.println("T2 is executing. Current time:" + DateUtil.now());
ThreadUtil.sleep(1000);
});
// 使用 allOf() 方法合并 T1 和 T2,等待它们都完成
CompletableFuture<Void> bothCompleted = CompletableFuture.allOf(futureT1, futureT2);
// 当 T1 和 T2 都完成后,执行 T3
bothCompleted.thenRunAsync(() -> System.out.println("T3 is executing after T1 and T2 have completed. Current time:" + DateUtil.now()));
// 等待所有任务完成,验证效果
ThreadUtil.sleep(3000);
}
}
关键点解析:
runAsync()
:用于创建不返回结果的异步任务,适用于任务没有返回值的场景。allOf()
:这是一个静态方法,它接受多个CompletableFuture
对象,并返回一个新的CompletableFuture
,该CompletableFuture
只有在所有传入的CompletableFuture
完成时才完成。thenRunAsync()
:在所有任务完成后执行T3
,且thenRunAsync()
也支持异步执行,可以指定线程池进行执行。
执行过程:
T1
和T2
将并行执行,模拟一个耗时操作(通过ThreadUtil.sleep()
模拟)。- 当
T1
和T2
都完成后,thenRunAsync()
会被触发,开始执行T3
。 - 主线程通过
ThreadUtil.sleep()
等待所有任务完成,以便观察最终结果。
适用场景:
- 任务依赖关系:当任务 A 和任务 B 完成之后,需要执行任务 C 的场景。
- 并行执行:T1 和 T2 可以并行运行,确保并行执行时的依赖性逻辑。
这种设计使得异步任务的依赖关系变得清晰,并且能够高效地管理多个并行任务的执行顺序。
使用 CompletableFuture
处理异常
CompletableFuture
提供了多种方法来处理任务中的异常,确保异常不被吞噬或丢失。下面是几种常见的异常处理方式:
1. whenComplete()
方法
whenComplete()
方法允许你在任务完成时执行回调,并且可以处理任务成功或失败的情况。该方法返回一个新的 CompletableFuture
,用于继续执行后续任务。
- 优点:可以处理任务执行后的异常。
- 缺点:无法重新抛出异常或继续传播。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("Task failed");
return 1;
});
future.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("Exception occurred: " + ex.getMessage());
} else {
System.out.println("Result: " + result);
}
});
2. exceptionally()
方法
exceptionally()
方法用于在异常发生时处理异常,并可以返回一个默认值,防止任务失败时直接终止。它不会将异常传播到后续的任务,但可以为失败的任务提供一个替代结果。
- 优点:可以在任务失败时提供备用结果。
- 缺点:无法让异常继续传播给后续任务。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("Task failed");
return 1;
});
future.exceptionally(ex -> {
System.out.println("Exception occurred: " + ex.getMessage());
return -1; // 返回默认值
}).thenAccept(result -> System.out.println("Result: " + result));
3. handle()
方法
handle()
方法不仅可以处理正常结果,还可以处理异常。它允许你根据任务执行结果(正常或异常)返回一个新的结果,这样你可以更精细地控制异常的处理。
- 优点:能够同时处理正常结果和异常,并返回一个新的结果。
- 缺点:处理逻辑会比较复杂。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("Task failed");
return 1;
});
future.handle((result, ex) -> {
if (ex != null) {
System.out.println("Exception occurred: " + ex.getMessage());
return -1; // 处理异常并返回默认值
}
return result;
}).thenAccept(result -> System.out.println("Result: " + result));
4. allOf()
组合多个任务
当你有多个异步任务时,可以使用 CompletableFuture.allOf()
来组合多个任务,并统一处理所有任务的异常。这种方式可以避免在多个任务中重复编写异常处理代码。
- 优点:适用于多个任务的组合,能够统一处理异常。
- 缺点:所有任务都必须完成后,才能执行异常处理。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
return 1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("Task failed");
return 2;
});
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);
allOf.thenRun(() -> {
try {
future1.get();
future2.get();
} catch (Exception e) {
System.out.println("Exception occurred in one of the tasks: " + e.getMessage());
}
});
5. anyOf()
处理部分任务失败
如果你有多个异步任务,并希望只要一个任务成功就继续执行后续任务,anyOf()
方法非常适合这种场景。你可以统一处理一个任务失败的异常,并继续处理其他任务。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("Task failed");
return 1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
return 2;
});
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);
anyOf.thenAccept(result -> {
System.out.println("First completed task result: " + result);
}).exceptionally(ex -> {
System.out.println("Exception occurred: " + ex.getMessage());
return null;
});
小结
whenComplete()
:适用于无论任务成功还是失败,都需要执行回调的场景。exceptionally()
:适用于任务失败时提供备用值的场景。handle()
:适用于处理任务的正常结果和异常,并返回新的结果的场景。allOf()
:适用于多个任务并行时统一处理异常。anyOf()
:适用于任意任务完成后继续执行后续操作,并处理任务失败异常。
根据不同的应用场景选择合适的异常处理方法,以保证异步任务的健壮性和可控性。
为什么要为 CompletableFuture
自定义线程池?
在使用 CompletableFuture
时,默认情况下,它会使用 ForkJoinPool.commonPool()
作为线程池。如果不特别指定线程池,所有的异步任务会共享这个全局线程池。虽然 ForkJoinPool
在并行计算时具有较高的效率,但在以下几种场景中,使用自定义线程池更加合理:
1. 避免资源竞争和线程饥饿
ForkJoinPool.commonPool()
是一个全局共享的线程池,当系统中有多个依赖 CompletableFuture
的组件(如框架、库或多个应用程序模块)时,所有任务都会争夺这个共享线程池的资源。如果大量任务同时提交,线程池可能会耗尽,导致任务无法及时执行,严重时会导致线程饥饿、系统性能下降或死锁。
自定义线程池可以将不同的任务分配到不同的线程池中,避免资源争夺,从而确保任务能够高效执行。
2. 任务特性隔离
不同的任务可能有不同的执行特性。例如,有些任务可能是 I/O 密集型的,而其他任务可能是 CPU 密集型的。如果所有的任务都共享一个线程池,可能会导致某一类任务占用过多资源,影响其他任务的执行。
通过自定义线程池,可以根据任务的特性(如 CPU 密集型、I/O 密集型)配置合适的线程池大小、队列类型等,从而优化性能和资源利用率。
3. 线程池的灵活控制
默认的 ForkJoinPool
提供了一些基本的配置选项,但它无法满足某些复杂的需求。自定义线程池可以帮助你:
- 控制线程池的核心和最大线程数。
- 设置队列类型,如
LinkedBlockingQueue
或SynchronousQueue
,以满足任务处理的需求。 - 配置线程池的空闲时间、最大队列长度等参数,从而保证线程池的高效性。
4. 异常处理
ForkJoinPool
使用的是默认的 ThreadFactory
,该工厂无法处理线程中的异常。自定义线程池可以提供一个更有针对性的 ThreadFactory
,通过重写 uncaughtException
方法处理线程中的未捕获异常。这可以避免任务执行中的异常被吞噬,导致潜在的问题难以发现和调试。
5. 更好的可控性
通过自定义线程池,你可以为特定任务配置不同的执行器,以确保某些高优先级任务或低优先级任务得到合适的资源分配,从而使得任务的执行更加有序和高效。
如何为 CompletableFuture
设置自定义线程池?
通过 CompletableFuture.runAsync()
和 CompletableFuture.supplyAsync()
方法,你可以为每个异步任务指定一个自定义的线程池。以下是设置自定义线程池的示例代码:
// 创建一个自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, // 核心线程数
20, // 最大线程数
60L, TimeUnit.SECONDS, // 空闲线程的存活时间
new LinkedBlockingQueue<Runnable>(100) // 队列大小
);
// 使用自定义线程池执行任务
CompletableFuture.runAsync(() -> {
System.out.println("Task is executing.");
}, executor);
// 使用自定义线程池执行有返回值的任务
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 42; // 执行某个耗时操作
}, executor);
// 处理返回结果
future.thenAccept(result -> {
System.out.println("Result: " + result);
});
小结
自定义线程池可以为 CompletableFuture
提供更高的灵活性和控制能力,避免了共享全局线程池带来的性能问题。通过控制线程池的大小、队列类型、异常处理等,可以更好地优化任务的执行,并确保系统的高效性和健壮性。
三、AQS
关于 AQS 源码的详细分析,可以看看这一篇文章:AQS 详解。
AQS 是什么?
AQS
(AbstractQueuedSynchronizer
)是 Java 并发包中的一个核心类,从 JDK 1.5 开始引入。它是用于构建同步器的基础框架。通过 AQS
,Java 提供了一些常见的同步工具,如 ReentrantLock
、Semaphore
、CountDownLatch
等,开发者可以在这些工具的基础上,复用和扩展同步原理,而不需要重新实现底层的线程管理机制。
AQS 的作用
AQS
通过提供一个标准的框架,简化了同步器的开发。它负责管理线程的排队、资源的获取与释放,并提供了以下核心功能:
- 线程排队:
AQS
通过一个队列来管理等待的线程,确保多个线程的执行顺序。 - 共享/独占锁的支持:它支持基于不同策略(共享锁或独占锁)进行同步。通过继承
AQS
,可以实现各种同步器(如锁、信号量等)。 - 状态管理:
AQS
维护一个整数型的状态值,线程通过改变该状态来控制同步器的行为。 - 锁的获取和释放:
AQS
提供了获取锁和释放锁的通用流程,子类只需实现特定的获取和释放逻辑。
AQS 的工作原理
AQS
内部通过一个 FIFO 队列 管理线程,队列中的线程根据锁的状态决定其是否可以执行。具体的工作流程如下:
获取锁:
- 当线程请求锁时,
AQS
会检查当前的锁状态。如果锁没有被占用,线程会成功获取锁并更新状态。 - 如果锁已经被占用,线程会被放入等待队列,直到锁被释放。
- 当线程请求锁时,
释放锁:
- 当线程完成任务并释放锁时,
AQS
会尝试唤醒等待队列中的下一个线程,使其有机会获取锁。
- 当线程完成任务并释放锁时,
队列管理:
- 等待的线程是按队列顺序排队的。
AQS
会通过一个双向队列来管理这些线程,保证公平性(如果同步器是公平的),或者不公平地唤醒线程。
- 等待的线程是按队列顺序排队的。
状态管理:
AQS
通过一个状态值(state
)来控制同步器的状态。具体同步器可以根据需要定义状态的不同值来代表不同的状态,例如锁的占用情况或信号量的可用资源数。
AQS 主要的实现类
ReentrantLock
:支持可重入的互斥锁,允许同一个线程多次获取锁。Semaphore
:信号量控制并发访问的数量,允许一定数量的线程访问某个资源。CountDownLatch
:倒计时器,允许一个或多个线程等待,直到某些操作完成。CyclicBarrier
:一个同步屏障,用于使多个线程等待,直到所有线程都达到屏障点。
AQS 的核心方法
AQS
提供了一些核心方法来实现同步器的功能,开发者在子类中可以通过重写这些方法来实现特定的同步行为。主要的核心方法包括:
acquire(int arg)
:尝试获取锁或资源。release(int arg)
:释放锁或资源。tryAcquire(int arg)
:尝试获取锁,如果获取成功,返回true
。tryRelease(int arg)
:尝试释放锁,如果成功释放,返回true
。hasQueuedThreads()
:判断是否有线程在等待队列中。getState()
:获取当前同步器的状态值。
AQS 和具体同步器的关系
AQS
提供了一个模板方法模式,开发者通过继承 AQS
并实现其抽象方法,来构建具体的同步器。例如:
- 在
ReentrantLock
中,AQS
管理了锁的状态,tryAcquire
和tryRelease
方法实现了锁的获取和释放逻辑。 - 在
CountDownLatch
中,AQS
管理了倒计时状态,countDown
和await
方法分别用于倒计时和等待。
总结
AQS
是一个强大的工具,它简化了同步器的实现,解决了线程同步中的复杂性问题。通过它提供的通用框架,开发者可以更容易地实现和定制自己的同步器。
AQS 的原理是什么?
AQS
(AbstractQueuedSynchronizer)的核心原理是通过 CLH 锁(Craig, Landin, and Hagersten Locks)的改进实现线程的同步与排队。它提供了一个通用的框架来管理多个线程的资源竞争,特别适用于实现多种同步工具,比如 可重入锁、信号量、倒计时器等。
1. CLH 锁及其优化
- CLH 锁 是一种基于单链表的自旋锁。在传统的自旋锁中,线程通过不断检查锁的状态来决定是否获取锁,而这种方式会导致过多的 CPU 占用。
- CLH 锁的改进:CLH 锁在自旋锁基础上做了优化,使用了一个单向链表来维护线程的排队。当线程请求锁时,会将其添加到队列中,并通过自旋不断检查前一个节点的状态,直到前一个线程释放锁,当前线程才能获取锁。
2. AQS 中的 CLH 变体队列
AQS
在 CLH 锁的基础上进一步改进,使用了 双向队列 和 自旋 + 阻塞 的方式。具体来说:
- 自旋 + 阻塞:线程会先尝试自旋获取锁,若自旋失败,再进入阻塞状态。这样既能保持高效的性能,又避免了过多的 CPU 占用。
- 双向队列:
AQS
中的线程队列使用了双向链表。每个线程(节点)包含:thread
:指向当前线程的引用。waitStatus
:表示线程的状态,如等待、已取消等。prev
:指向前驱节点。next
:指向后继节点。
双向队列使得线程在阻塞状态下,能够在释放锁时通过 next
指针唤醒后继线程,确保线程的顺序执行。
3. AQS 中的资源管理(state)
AQS
使用一个 state
变量来表示同步状态,通常是一个 int
类型的变量,表示锁的状态。该变量的状态值会影响资源的分配。
state
通过volatile
关键字修饰,确保线程之间的可见性。state
变量可以通过三个方法来操作:getState()
:获取当前的同步状态。setState(int newState)
:设置同步状态。compareAndSetState(int expect, int update)
:原子操作,通过 CAS(Compare-And-Swap)来更新状态。
这些方法通过 unsafe.compareAndSwapInt
操作确保线程安全,避免了同步锁的开销。
4. AQS 资源分配和线程排队
在 AQS 中,线程的获取和释放资源的过程由 CLH 变体队列来管理。线程通过 自旋 或 阻塞 的方式排队,直到它们能够获取到资源。
- 当线程尝试获取资源时,如果当前资源空闲,则立即获取。如果资源被占用,则将线程加入到等待队列中。
- 线程一旦获取资源成功,
state
值会发生变化,同时队列中的线程会被唤醒并继续尝试获取资源。 - 线程释放资源时,
state
值会更新,并唤醒排队的线程。
5. ReentrantLock
和 CountDownLatch
示例
ReentrantLock
:ReentrantLock
使用 AQS 的state
来表示锁的状态。当state
为 0 时表示未锁定,线程通过调用tryAcquire()
来尝试获取锁。如果state
大于 0,其他线程将被排队等待。CountDownLatch
:在CountDownLatch
中,state
初始值为计数值 N,表示等待的子任务数量。每当一个子任务完成时,通过countDown()
方法将state
减 1。当state
值减到 0 时,主线程将被唤醒,继续执行。
总结
AQS
的核心原理是通过 CLH 锁的变体队列 和 状态管理 机制,来实现线程的同步和资源的排队。它通过 自旋 + 阻塞 的方式高效地管理线程的等待和唤醒,确保线程的顺序执行。同时,通过 state
变量和 CAS 操作来控制同步器的状态,使得实现同步工具变得更加灵活和高效。
Semaphore 有什么用?
Semaphore
(信号量)是 Java 中一种用于控制并发访问资源数量的同步工具类。与 synchronized
和 ReentrantLock
的“互斥锁”机制不同,Semaphore
用来控制同时访问某个特定资源的线程数量,可以实现对多个线程并发访问资源的管理。
1. 基本概念
Semaphore
维护了一个许可集,每个许可表示一个可用的资源。当线程请求访问一个共享资源时,它会尝试获取一个许可:
- 如果有可用许可,线程就获取一个许可并继续执行。
- 如果没有可用许可,线程就会被阻塞,直到有许可可用。
当线程完成资源的使用后,它会释放所持有的许可,其他阻塞的线程才能获得许可并继续执行。
// 创建一个信号量,初始许可数为 5
final Semaphore semaphore = new Semaphore(5);
// 获取一个许可
semaphore.acquire();
// 释放一个许可
semaphore.release();
2. Semaphore 的两种模式
Semaphore
提供了两种工作模式:公平模式和非公平模式。
- 公平模式:线程按调用
acquire()
方法的顺序来获取许可,遵循 FIFO(先进先出)规则。 - 非公平模式:线程并不按顺序获取许可,后调用
acquire()
的线程可能会抢先获取许可。
构造方法如下:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
- 默认情况下,
Semaphore
是非公平模式的。如果需要公平模式,可以通过第二个构造方法设置。
3. 应用场景
Semaphore
适用于资源数量有限、需要限制并发访问的场景。典型的应用场景包括:
- 限制并发请求数:控制同时并发的线程数,例如限制一个服务的最大并发数。
- 资源池管理:控制对连接池、线程池、数据库连接等资源的并发访问。
- 限流:限制流量,比如在 API 请求中控制请求的并发数,防止服务端过载。
例如,在一个 API 服务中,可以限制同时只能有 5 个请求处理:
final Semaphore semaphore = new Semaphore(5); // 限制最大并发数为 5
// 每个线程执行前先获取许可
semaphore.acquire();
// 执行任务
// 执行完后释放许可
semaphore.release();
4. 如何工作
acquire()
:当调用acquire()
方法时,线程尝试获取一个许可。如果许可可用,线程就会成功获取。如果许可不可用,线程将被阻塞,直到有许可释放。release()
:释放一个许可。如果有其他线程被阻塞等待许可,它们中的一个将会被唤醒。
5. 与锁的对比
Semaphore
控制的是资源访问的线程数量,而不是单一资源的互斥访问。- 如果许可数设置为 1,
Semaphore
就变成了排他锁,相当于ReentrantLock
或synchronized
。
总结
Semaphore
是一个非常有用的并发工具类,适用于对资源进行数量控制的场景。它支持多线程并发访问控制,可以确保在指定的并发数量限制内访问资源。可以用于流量控制、数据库连接池、线程池管理等场景,提供了一种简单而有效的并发控制方式。
Semaphore 的原理是什么?
Semaphore
基于 AQS(AbstractQueuedSynchronizer)实现,主要通过 共享锁 模式控制并发资源的访问。它的核心思想是通过维护一个许可证计数器 state
来决定允许多少个线程同时访问共享资源,超过许可数量的线程会被阻塞。具体实现中,state
表示可用的许可证数量,线程通过获取和释放许可证来控制并发访问。
1. 获取许可证(acquire)
当调用 acquire()
方法时,线程会尝试获取一个许可证。AQS 提供了 acquireShared()
方法来在共享模式下获取许可证:
- 成功获取:如果
state >= 0
,表示有足够的许可证可用,线程就会成功获取一个许可证,并通过 CAS 操作将state
减 1。 - 未能获取:如果
state < 0
,表示许可证不足,当前线程会被挂起并加入阻塞队列,直到许可证被释放。
具体实现如下:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取许可证,失败则挂起线程
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
在 acquireShared()
中:
tryAcquireShared()
会尝试获取共享锁。如果许可证数大于等于请求数,它就会减少许可证数,并返回>= 0
。- 如果失败(即许可证不足),则会调用
doAcquireSharedInterruptibly()
,将当前线程加入等待队列并挂起,等待许可证的释放。
2. 释放许可证(release)
当调用 release()
方法时,线程会释放一个许可证,许可证的数量增加。AQS 提供了 releaseShared()
方法来在共享模式下释放许可证:
- 释放成功:当许可证被成功释放时,
state
值加 1。 - 唤醒阻塞线程:释放许可证后,如果有其他线程被阻塞在等待队列中,它们会被唤醒并重新尝试获取许可证。
具体实现如下:
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
在 releaseShared()
中:
tryReleaseShared()
会尝试更新state
(即许可证数量),如果释放成功,它会调用doReleaseShared()
来唤醒队列中的一个线程。
3. 内部实现细节
state
用于表示可用的许可证数量,state >= 0
表示有可用的资源,state < 0
表示需要等待。- CAS 操作:AQS 中的
state
使用CAS
(Compare and Swap)操作来确保线程安全,避免竞争条件。 - 队列管理:当线程请求资源失败时,它会被加入到等待队列中,并会被阻塞。等待线程通过
CLH
锁队列(或其变体)来管理线程的顺序,保证先请求的线程能先被唤醒。
总结
Semaphore
通过共享锁模式实现了对资源的并发控制,它不仅支持并发数的限制,还能控制线程访问顺序。- 通过
acquire()
和release()
方法,线程可以在有限的许可数下进行并发访问,超出许可数的线程会被挂起并等待许可。 - 内部通过 AQS 的
state
和阻塞队列来协调线程的竞争与调度。
CountDownLatch 有什么用?
CountDownLatch
是一个同步工具类,它用于控制一个或多个线程等待其他线程完成一项任务之后再继续执行。通常情况下,CountDownLatch
用于等待某些操作完成(例如并行任务完成),直到计数器变为零,所有等待的线程才能继续执行。
- 工作原理:
CountDownLatch
会持有一个计数器,初始值通过构造函数传入。每当一个线程完成任务时,它会调用countDown()
方法将计数器减 1。等待线程调用await()
方法,直到计数器的值减至 0 时,所有等待的线程才会被唤醒并继续执行。
使用场景
- 等待多个线程执行完:例如,你有多个线程执行并行任务,在主线程中等待这些任务完成,只有当所有线程都完成时才继续后续工作。
- 实现启动前的同步:主线程可以在
CountDownLatch
上调用await()
等待多个子线程完成一些初始化任务,然后再继续主线程的执行。
示例代码
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个 CountDownLatch,初始计数器为 3
CountDownLatch latch = new CountDownLatch(3);
// 创建并启动 3 个线程,每个线程执行一个任务
for (int i = 0; i < 3; i++) {
new Thread(new Task(latch), "Thread-" + i).start();
}
// 主线程等待,直到计数器变为 0
latch.await();
// 所有任务完成后,主线程继续执行
System.out.println("All tasks completed. Main thread proceeding...");
}
}
class Task implements Runnable {
private final CountDownLatch latch;
public Task(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
// 模拟任务执行
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " completed.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 每个线程完成后调用 countDown() 减少计数器
latch.countDown();
}
}
}
主要方法
await()
:使当前线程等待,直到计数器的值为 0。如果计数器的值已为 0,线程立即返回。可以抛出InterruptedException
异常。countDown()
:每当某个线程完成任务时调用,计数器的值减 1。如果计数器的值为 0,则会唤醒所有等待的线程。构造方法:
CountDownLatch(int count)
初始化计数器的值,count
是计数器的初始值,通常为你希望等待的线程数。
注意事项
CountDownLatch
是一次性的,计数器值一旦减为 0 后,不能重新设置或重用。如果再次调用await()
方法,会立即返回。- 它适用于控制一组线程等待其他线程完成任务的场景,不适合用于需要重复计数器的场景(例如
CyclicBarrier
更适合这种情况)。
总结
CountDownLatch
是一个非常实用的同步工具类,特别适用于等待多个线程完成某些操作后再执行后续任务的场景。它通过简单的计数器和线程同步机制,让并发编程变得更加清晰和易于管理。
CountDownLatch 的原理是什么?
CountDownLatch
是基于 AQS (AbstractQueuedSynchronizer) 实现的,它利用 AQS 提供的共享锁机制来控制线程的同步。CountDownLatch
的核心原理是通过一个计数器(state
)来控制线程的阻塞与唤醒。其基本的同步控制是依赖 state
的值变化来协调多个线程。
工作流程:
初始化:在创建
CountDownLatch
时,传入一个初始计数值count
,这个值通常代表需要等待的线程数量。state
的值初始化为count
。await()
方法:- 当线程调用
await()
方法时,state
会被检查。如果state
为 0,说明任务已经完成,线程将立即返回并继续执行后续代码。 - 如果
state
大于 0,说明还有线程未完成,调用await()
的线程会被阻塞。此时,线程会进入 AQS 的等待队列,直到state
的值减少到 0。
- 当线程调用
countDown()
方法:- 每当一个线程完成任务后,它会调用
countDown()
方法来减少state
的值。countDown()
实际上是通过tryReleaseShared()
方法来执行的,通过 CAS 操作确保原子性地将state
减 1。 - 当
state
值减为 0 时,所有等待在await()
方法上的线程都会被唤醒。
- 每当一个线程完成任务后,它会调用
AQS 操作:
CountDownLatch
的state
是通过AQS
的state
字段进行管理的,这个字段会在多个线程之间同步更新。countDown()
方法内部调用了tryReleaseShared()
来更新state
,如果state
值减少到 0,AQS
会唤醒所有等待的线程。await()
方法内部调用tryAcquireShared()
来获取许可,如果state
不为 0,线程就会被阻塞直到state
为 0。
代码示意:
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
// 创建 CountDownLatch,初始值为 3
CountDownLatch latch = new CountDownLatch(3);
// 启动三个线程模拟任务
for (int i = 0; i < 3; i++) {
new Thread(new Task(latch)).start();
}
// 主线程等待,直到计数器变为 0
latch.await();
System.out.println("All tasks are finished, proceeding with main task.");
}
}
class Task implements Runnable {
private final CountDownLatch latch;
public Task(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
// 模拟执行任务
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " completed.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 完成任务后调用 countDown,减小计数器
latch.countDown();
}
}
}
关键点:
state
:CountDownLatch
的内部状态,通过state
来控制线程的等待与唤醒。- CAS 操作:
countDown()
方法通过 CAS 操作来原子地减少state
的值。 - 等待队列:调用
await()
的线程会被挂起,直到state
为 0 时唤醒。
总结:
CountDownLatch
是通过 AQS 提供的底层同步机制来实现线程的等待与通知机制的。它通过一个计数器 state
来控制多个线程的协调,线程调用 await()
等待计数器变为 0,而其他线程则通过 countDown()
来减少计数器值,最终达到同步的目的。
用过 CountDownLatch 么?什么场景下用的?
在项目中,确实有用过 CountDownLatch
,它的应用场景主要是在并发任务中,尤其是当多个并发线程需要等待其他线程的任务完成后再执行下一步操作的场景。以下是一个实际的场景:
场景描述:
我们有 6 个文件需要读取并处理。每个文件的处理任务是独立的,没有顺序依赖,但是我们需要等这 6 个文件的处理结果全部准备好后,才能进行汇总统计并返回给用户。
为了实现这个需求,我们使用了一个固定大小的线程池来并发执行文件的读取任务。每个线程处理一个文件,处理完一个文件后,就调用 CountDownLatch
的 countDown()
方法,减少计数器的值。当所有线程都完成工作时,CountDownLatch
的计数器变为 0,主线程可以继续执行接下来的汇总操作。
使用 CountDownLatch 的代码示例:
public class CountDownLatchExample1 {
private static final int threadCount = 6; // 需要处理的文件数量
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
// 模拟处理6个文件
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {
try {
// 模拟处理文件
System.out.println("Processing file " + threadnum);
// ...... 文件处理业务
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 完成一个任务,计数器减1
countDownLatch.countDown();
}
});
}
// 主线程等待所有文件处理完毕
countDownLatch.await();
threadPool.shutdown();
System.out.println("All files processed, finishing task.");
}
}
改进的建议:
尽管 CountDownLatch
在这种场景下工作良好,但随着任务数量的增多,代码可读性和维护性可能变差。可以通过以下方式进行改进:
- 使用
CompletableFuture
:CompletableFuture
提供了更高层次的抽象,支持异步、并行、串行等多种操作,并且可以通过allOf()
或join()
等方法方便地等待所有异步任务完成。
使用 CompletableFuture 改进后的代码:
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class CompletableFutureExample {
public static void main(String[] args) {
// 模拟多个文件路径
List<String> filePaths = List.of("file1.txt", "file2.txt", "file3.txt", "file4.txt", "file5.txt", "file6.txt");
// 使用 CompletableFuture 并发处理文件
List<CompletableFuture<Void>> futures = filePaths.stream()
.map(filePath -> CompletableFuture.runAsync(() -> processFile(filePath)))
.collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.join(); // 阻塞主线程,直到所有任务完成
System.out.println("All files processed.");
}
private static void processFile(String filePath) {
// 模拟文件处理
System.out.println("Processing " + filePath);
try {
Thread.sleep(1000); // 假设处理文件需要时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
改进的优点:
- 可读性更高:
CompletableFuture
提供了更简洁的 API,可以直接将多个异步任务组合起来,并等待其完成。 - 灵活性:可以轻松地增加任务,调整并发策略,支持更多的异步操作(例如串行、并行、异常处理等)。
- 线程池管理:可以通过
CompletableFuture.supplyAsync()
或runAsync()
来指定自定义的线程池,从而避免全局线程池的资源竞争问题。
总结:
CountDownLatch
是一个非常合适的工具,尤其在多个线程需要等待其他线程完成任务后才能继续的场景中。但随着需求的复杂化(比如增加更多异步操作和流式处理),CompletableFuture
提供了更为灵活、强大的功能,是值得考虑的替代方案。
CyclicBarrier 有什么用?
CyclicBarrier
和 CountDownLatch
有类似的功能,都是用来实现多线程间的同步,确保多个线程达到某个同步点后再继续执行。然而,CyclicBarrier
的最大特点是它是 可重复使用的,即一个线程组可以反复多次等待和触发同步点。而 CountDownLatch
只适用于一次性任务,当计数器到达零后,不能重置和重复使用。
核心特性:
- 可重复使用:
CyclicBarrier
可以在一个线程组的任务完成后,复位并继续等待下一轮的任务同步,适用于多轮任务同步。 - 线程阻塞与释放:线程调用
await()
方法时会被阻塞,直到所有线程都到达屏障点才会同时继续执行。
适用场景:
- 并行任务的阶段性同步:比如多个线程执行某些阶段性任务,当所有线程都执行完某一阶段后,可以再统一进入下一个阶段。
- 多轮协作任务:适用于需要多轮协作的任务,例如多次计算后再汇总结果。
示例代码:
import java.util.concurrent.*;
public class CyclicBarrierExample {
private static final int THREAD_COUNT = 5; // 线程数
public static void main(String[] args) throws InterruptedException {
// 创建一个 CyclicBarrier,指定线程数量和最后一个线程到达时执行的操作
CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
System.out.println("所有线程到达屏障,开始执行后续任务...");
});
// 创建并启动多个线程
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadNum = i;
executor.submit(() -> {
try {
System.out.println("线程 " + threadNum + " 正在处理...");
// 模拟线程工作
Thread.sleep((long) (Math.random() * 1000));
// 等待其他线程
System.out.println("线程 " + threadNum + " 到达屏障");
barrier.await(); // 等待所有线程到达屏障
System.out.println("线程 " + threadNum + " 继续执行后续任务");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
输出:
线程 0 正在处理...
线程 1 正在处理...
线程 2 正在处理...
线程 3 正在处理...
线程 4 正在处理...
线程 0 到达屏障
线程 1 到达屏障
线程 4 到达屏障
线程 2 到达屏障
线程 3 到达屏障
所有线程到达屏障,开始执行后续任务...
线程 0 继续执行后续任务
线程 1 继续执行后续任务
线程 2 继续执行后续任务
线程 3 继续执行后续任务
线程 4 继续执行后续任务
核心流程:
- 每个线程调用
await()
方法,直到所有线程都到达屏障。 CyclicBarrier
会在所有线程都调用await()
后,释放所有线程继续执行。- 在释放后,
CyclicBarrier
允许重用,继续等待下一轮线程到达。
总结:
CyclicBarrier
比 CountDownLatch
强大之处在于它的 可复用性,非常适合需要多轮同步的场景。如果你希望在多个线程执行不同阶段的任务时,确保每个阶段完成后才能进入下一阶段,CyclicBarrier
是非常合适的选择。
CyclicBarrier 的原理是什么?
CyclicBarrier
是基于 ReentrantLock
和 Condition
来实现的,其核心思想是通过一个计数器 count
来协调多个线程在某个同步点(屏障)上等待,直到所有线程都到达屏障时,才继续执行。
CyclicBarrier
的基本原理是:
计数器:
CyclicBarrier
内部有一个计数器count
,它的初始值是屏障需要等待的线程数(parties
)。每个线程调用await()
时会减少count
的值,当count
减到 0 时,屏障打开,所有线程才能继续执行。等待与唤醒:线程通过调用
await()
方法来到达屏障。每次一个线程到达屏障后,count
会减 1,直到count
为 0,所有线程才会同时继续执行。如果在屏障期间有线程被中断,或者发生异常,则会抛出BrokenBarrierException
,并导致所有线程被唤醒。可复用性:当所有线程到达屏障时,
CyclicBarrier
会重置计数器count
为初始值,准备迎接下一轮的屏障同步。
关键代码分析
构造方法:
CyclicBarrier
初始化时接受两个参数:parties
和barrierAction
。parties
表示需要同步的线程数,barrierAction
是在所有线程到达屏障时执行的可选操作。public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
await()
方法:
每个线程调用await()
方法来等待其他线程到达屏障。await()
内部调用了dowait(false, 0L)
方法,该方法负责线程的等待、计数器的递减以及屏障的打开。public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
dowait()
方法:
这是CyclicBarrier
的核心方法。它负责:- 获取锁,确保线程的安全同步。
- 检查当前的
generation
是否已经“破裂”,即屏障是否已经被破坏。 - 递减计数器
count
,如果count == 0
,执行屏障动作barrierCommand
并重置计数器。 - 如果还有线程未到达屏障,当前线程将被阻塞,直到所有线程到达。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); // Reset barrier for next cycle return 0; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && !g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
生成新一代屏障:
一旦count
减到 0,表示所有线程都已到达屏障,CyclicBarrier
会执行nextGeneration()
方法,重置计数器为初始值,并准备进入下一轮的同步。private void nextGeneration() { generation = new Generation(); count = parties; trip.signalAll(); // Wake up all threads waiting on the barrier }
总结
CyclicBarrier
的实现依赖于 ReentrantLock
和 Condition
来确保线程的同步和阻塞。在每次同步结束时,计数器 count
被重置为初始值,以便下一轮的同步操作。屏障的状态在每轮同步结束后都会被复位,使得 CyclicBarrier
能够在多轮任务中继续使用。这使得 CyclicBarrier
成为多线程协作和同步的重要工具,尤其适用于需要阶段性协作的场景。
四、虚拟线程
虚拟线程在 Java 21 正式发布,这是一项重量级的更新。
虽然目前面试中问的不多,但还是建议大家去简单了解一下,具体可以阅读这篇文章:虚拟线程极简入门 。重点搞清楚虚拟线程和平台线程的关系以及虚拟线程的优势即可。
参考
- 《深入理解 Java 虚拟机》
- 《实战 Java 高并发程序设计》
- Java 线程池的实现原理及其在业务中的最佳实践:阿里云开发者:https://mp.weixin.qq.com/s/icrrxEsbABBvEU0Gym7D5Q
- 带你了解下 SynchronousQueue(并发队列专题):https://juejin.cn/post/7031196740128768037
- 阻塞队列 — DelayedWorkQueue 源码分析:https://zhuanlan.zhihu.com/p/310621485
- Java 多线程(三)——FutureTask/CompletableFuture:https://www.cnblogs.com/iwehdio/p/14285282.html
- Java 并发之 AQS 详解:https://www.cnblogs.com/waterystone/p/4920797.html
- Java 并发包基石-AQS 详解:https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html