ArrayBlockingQueue 源码分析
约 1347 字大约 4 分钟
ArrayBlockingQueue 源码分析
1. 概述
ArrayBlockingQueue
是 Java 中的一个阻塞队列实现,它基于数组实现,并提供了一个固定大小的容量。该队列在多线程环境下提供了线程安全的 put
和 take
操作,用于实现生产者-消费者模型。它的主要特点是通过数组来存储元素,并且支持阻塞操作,既可以在队列为空时等待,也可以在队列满时等待。
ArrayBlockingQueue
是一个 有界队列,它的容量在创建时就被固定,并且提供了如下操作:
- 阻塞的入队和出队操作:
put
和take
,当队列满时,put
会阻塞,直到有空间;当队列为空时,take
会阻塞,直到有元素。 - 非阻塞的入队和出队操作:
offer
和poll
,它们不会阻塞,返回操作的成功与否。
2. 主要成员变量
private final Object[] items;
private int takeIndex;
private int putIndex;
private int count;
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
items
:存储队列元素的数组,队列的元素存储在该数组中。takeIndex
:从队列中获取元素时的索引。putIndex
:插入元素时的索引。count
:当前队列中的元素数量。lock
:ReentrantLock
锁,用于确保线程安全。notEmpty
:当队列为空时,消费者线程会被挂起,直到有元素可供消费。notFull
:当队列已满时,生产者线程会被挂起,直到有空间可以插入元素。
3. 构造方法
public ArrayBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.items = new Object[capacity];
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.notFull = lock.newCondition();
}
- 构造方法:
capacity
是队列的最大容量,必须大于零。- 队列内部通过一个数组
items
来存储元素,数组的大小由capacity
决定。 - 使用
ReentrantLock
来提供线程安全,notEmpty
和notFull
是两个条件变量,用于实现阻塞操作。
4. 核心方法分析
4.1 入队操作:put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (count == items.length) // 队列满,阻塞
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
- 步骤:
- 获取
lock
锁,保证线程安全。 - 如果队列已满,当前线程会被阻塞,直到队列有空余空间(即
count < items.length
)。 - 如果队列不满,调用
enqueue(e)
将元素插入队列。 - 释放锁。
- 获取
- 内部方法
enqueue
:
private void enqueue(E e) {
items[putIndex] = e; // 将元素放入数组
if (++putIndex == items.length) putIndex = 0; // 如果到达数组末尾,重置 putIndex
++count;
notEmpty.signal(); // 通知消费者线程队列不为空
}
- 步骤:
- 将元素插入队列,并更新
putIndex
和count
。 - 如果
putIndex
到达队列的末尾,重置为 0,形成循环队列。 - 调用
notEmpty.signal()
唤醒等待的消费者线程。
- 将元素插入队列,并更新
4.2 出队操作:take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (count == 0) // 队列为空,阻塞
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
步骤:
- 获取
lock
锁,保证线程安全。 - 如果队列为空,当前线程会被阻塞,直到队列有元素可供消费(即
count > 0
)。 - 如果队列不为空,调用
dequeue()
从队列中取出元素。 - 释放锁。
- 获取
内部方法
dequeue
:
private E dequeue() {
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; // 获取队列头部元素
items[takeIndex] = null; // 清空该元素
if (++takeIndex == items.length) takeIndex = 0; // 如果到达数组末尾,重置 takeIndex
--count;
notFull.signal(); // 通知生产者线程队列不满
return x;
}
- 步骤:
- 获取队列头部元素,更新
takeIndex
和count
。 - 如果
takeIndex
到达队列的末尾,重置为 0,形成循环队列。 - 调用
notFull.signal()
唤醒等待的生产者线程。
- 获取队列头部元素,更新
4.3 非阻塞操作:offer
和 poll
offer(E e)
:如果队列没有满,插入元素并返回true
;如果队列已满,返回false
。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length) // 队列已满,返回 false
return false;
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
poll(long timeout, TimeUnit unit)
:尝试从队列中取出元素,如果队列为空则等待指定时间。如果超时仍未取到元素,返回null
。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0) {
if (!notEmpty.await(timeout, unit)) // 等待超时
return null;
}
return dequeue();
} finally {
lock.unlock();
}
}
5. 性能特性
读操作:
take
和put
方法会在队列为空或已满时阻塞线程,采用的是基于ReentrantLock
和条件变量的机制。这些操作的时间复杂度是 O(1),但需要考虑锁的竞争。写操作:
offer
是非阻塞的,如果队列满,直接返回false
;put
是阻塞的,直到队列有空间。它们都涉及到锁和条件变量,因此写操作的性能受到锁竞争的影响。适用场景:
- 适用于生产者-消费者模型等需要固定大小缓冲区的场景。
- 多线程环境下需要安全的队列操作时,可以使用
ArrayBlockingQueue
。
总结
ArrayBlockingQueue
提供了线程安全的阻塞队列实现,使用数组作为底层数据结构,并通过 ReentrantLock
和条件变量 (notEmpty
、notFull
) 来保证线程安全和高效的阻塞操作。它适用于那些需要处理大量生产和消费任务的场景,特别是当队列容量已知且固定时。在性能上,读操作非常高效,但写操作(如 put
和 take
)可能会受到锁的竞争影响。