AQS(AbstractQueuedSynchronizer)详解
1. AQS 概述
AQS
(AbstractQueuedSynchronizer)是 Java 提供的一个框架,用于构建自定义的同步器。它位于 java.util.concurrent
包下,设计目标是为了简化多线程同步器的开发过程,如互斥锁、读写锁、信号量等。AQS
提供了一个模板,供开发者基于它实现各种锁和同步器。
AbstractQueuedSynchronizer
是一个抽象类,提供了同步器的基础实现,简化了线程同步控制的实现。它通过一个 FIFO 队列(即阻塞队列)来管理线程的竞争,使得线程能够在某些条件满足时通过队列的方式顺序执行。
AQS
的核心思想是基于 CAS(Compare-And-Swap) 操作来保证并发控制,而同步状态的管理则通过一个 int
类型的状态字段来实现,通常这个状态字段表示同步器的状态(如锁的占用状态、许可的数量等)。
2. AQS 核心概念
2.1 同步状态(State)
AQS 使用一个 int
类型的字段来维护同步器的状态,这个字段的值可以表示不同的同步状态。例如,0
表示未被占用或未获得锁,1
表示已被占用或锁已被获得等。这个字段由子类根据具体的同步需求来设计。
2.2 阻塞队列(等待队列)
AQS 内部维护着一个 FIFO 队列(AbstractQueuedSynchronizer$Node
),线程在获取同步状态失败时,会被加入到队列中等待。线程的加入和唤醒是通过队列来协调的。
2.3 CAS 操作
AQS 的所有同步控制基于 CAS 操作。比如,当一个线程要获取锁时,首先会使用 CAS 尝试修改同步状态,如果修改成功,则该线程获取同步状态;如果失败,则该线程会进入队列,等待同步状态的改变。
3. AQS 主要方法
3.1 acquire(int arg)
和 release(int arg)
这两个方法是 AQS
中非常重要的两个方法,分别用于获取和释放同步状态。
acquire(int arg)
:尝试获取同步状态。如果成功,返回;如果失败,则将当前线程加入到队列中,直到获取到同步状态。release(int arg)
:释放同步状态。如果当前线程持有同步状态,释放该状态,并唤醒其他等待线程。
3.2 tryAcquire(int arg)
和 tryRelease(int arg)
这两个方法是 模板方法,需要由子类实现,主要用于自定义获取和释放同步状态的逻辑。
tryAcquire(int arg)
:子类需要实现该方法来定义获取同步状态的逻辑,通常是通过 CAS 来设置同步状态。tryRelease(int arg)
:子类需要实现该方法来定义释放同步状态的逻辑,通常是通过 CAS 操作将同步状态重置为初始状态。
3.3 addWaiter(Node mode)
该方法用于将当前线程添加到等待队列中。Node
代表了一个等待的线程节点。每个节点包含了线程的引用,以及一些状态标志,标记着线程在队列中的位置和状态。
3.4 isHeldExclusively()
该方法用于判断同步状态是否被当前线程独占。通常用于排他性同步器(如 ReentrantLock
)来判断是否是当前线程持有锁。
4. AQS 核心类和方法
4.1 Node
类
Node
是 AQS 内部维护的一个类,代表一个等待的线程节点。每个节点存储着当前线程的引用、线程状态(如是否等待)以及前后节点的指针等信息。Node
类主要有两个重要字段:
waitStatus
:表示当前节点的状态(等待、取消、唤醒等)。next
:指向下一个节点,用于形成双向链表。
4.2 锁的实现
AQS 提供了可以扩展的 tryAcquire
和 tryRelease
方法,子类可以根据需要实现自定义的锁。Java 标准库中的许多同步工具(如 ReentrantLock
、CountDownLatch
、Semaphore
等)都基于 AQS 来实现。
ReentrantLock
示例
ReentrantLock
是一个典型的基于 AQS 实现的互斥锁,其核心实现方式如下:
tryAcquire(int arg)
:尝试获取锁,如果锁已被当前线程持有(重入),则直接增加获取次数,否则尝试通过 CAS 操作获取锁。tryRelease(int arg)
:释放锁,如果锁的获取次数减至 0,则唤醒其他线程。
public class ReentrantLock extends AbstractQueuedSynchronizer implements Lock {
// 判断当前线程是否持有锁
public boolean isHeldExclusively() {
return getState() != 0;
}
// 获取锁
public void lock() {
acquire(1);
}
// 尝试获取锁
protected boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int state = getState();
if (state == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextState = state + acquires;
if (nextState < 0) throw new Error("Maximum lock count exceeded");
setState(nextState);
return true;
}
return false;
}
// 释放锁
public void unlock() {
release(1);
}
// 释放锁
protected boolean tryRelease(int releases) {
int state = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException();
boolean free = false;
if (state == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(state);
return free;
}
}
5. AQS 实现的同步器
AQS 可以用于实现多种同步器。以下是几个常见的基于 AQS 实现的同步器:
5.1 ReentrantLock
如上所述,ReentrantLock
是一种可重入的排他锁,允许当前线程在已持有锁的情况下再次获取锁。
5.2 CountDownLatch
CountDownLatch
是一种同步工具,它允许一个或多个线程等待直到其他线程完成一系列操作。它通常用于并行化程序中的等待操作。
5.3 Semaphore
Semaphore
是一种计数信号量,允许多个线程访问有限资源。Semaphore
通过维护一个计数器来控制对共享资源的访问数量。
5.4 CyclicBarrier
CyclicBarrier
是一种同步器,允许一组线程在到达某个公共屏障点之前阻塞。与 CountDownLatch
不同,CyclicBarrier
允许重复使用。
5.5 ReadWriteLock
ReadWriteLock
是一种分离读和写锁的锁机制,允许多个线程并发读取资源,但写入时必须独占资源。ReentrantReadWriteLock
是一个基于 AQS 的实现。
总结
- AQS 提供了一个框架,使得开发者可以方便地实现各种类型的同步器(如互斥锁、读写锁、信号量等)。
- 它基于 CAS 操作 和 FIFO 队列 实现了线程同步,并且通过简化线程管理、状态管理和队列管理,极大地降低了同步器的实现复杂度。
- 许多常见的并发工具类(如
ReentrantLock
、CountDownLatch
、Semaphore
等)都基于 AQS 来实现。
通过理解和使用 AQS,开发者可以更加高效地构建自己的同步器,满足不同的并发控制需求。