DelayQueue 源码分析
约 1482 字大约 5 分钟
DelayQueue 源码分析
1. 概述
DelayQueue
是 Java 中的一个无界阻塞队列实现,继承自 AbstractQueue
,用于存储实现了 Delayed
接口的元素。队列中的元素在到期之前无法被取出,只有到期的元素才能从队列中获取。DelayQueue
通常用于需要定时任务调度或延时处理的场景,例如缓存超时处理和任务调度。
DelayQueue
的底层是一个 基于二叉堆的优先级队列,类似于 PriorityQueue
。它利用元素的 getDelay
方法计算剩余延时时间,按照到期时间的顺序进行排序。
2. 核心特点
- 基于优先级堆:队列内部使用优先级堆来维护元素,最先到期的元素在堆顶。
- 线程安全:通过
ReentrantLock
实现线程安全。 - 阻塞操作:
- 当没有到期的元素时,
take
方法会阻塞,直到有元素到期。 - 当队列为空时,
take
方法也会阻塞,直到有新元素插入并到期。
- 当没有到期的元素时,
3. 主要成员变量
private final transient ReentrantLock lock = new ReentrantLock(); // 可重入锁
private final PriorityQueue<E> q = new PriorityQueue<>(); // 基于堆的优先级队列
private final Condition available = lock.newCondition(); // 条件变量,用于阻塞线程
lock
:确保队列的所有操作都是线程安全的。q
:内部的优先级队列,用于存储和排序元素。available
:用于在没有到期元素时阻塞线程。
4. 核心方法分析
4.1 插入操作:offer
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); // 插入到优先级队列中
if (q.peek() == e) // 如果插入的元素是堆顶元素
available.signal(); // 通知等待线程
return true;
} finally {
lock.unlock();
}
}
步骤:
- 加锁,确保线程安全。
- 将元素插入到优先级队列
q
中。 - 如果插入的元素是堆顶元素(即最早到期的元素),通知等待线程可能有新的到期元素。
- 解锁。
性能:由于使用了优先级堆,
offer
操作的时间复杂度为 O(log n)。
4.2 出队操作:take
take
方法会阻塞线程,直到有元素到期为止。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek(); // 查看堆顶元素
if (first == null)
available.await(); // 如果队列为空,等待
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return q.poll(); // 如果元素到期,移除并返回堆顶元素
available.awaitNanos(delay); // 等待元素到期
}
}
} finally {
lock.unlock();
}
}
步骤:
- 获取队列的堆顶元素。
- 如果堆顶元素为空(队列为空),调用
available.await()
阻塞当前线程,等待有新元素插入。 - 如果堆顶元素的延迟时间已到(
delay <= 0
),移除并返回堆顶元素。 - 如果堆顶元素未到期,调用
available.awaitNanos(delay)
等待剩余的延迟时间。 - 线程被唤醒后,重新检查堆顶元素是否到期,重复上述过程。
阻塞特性:如果队列为空或所有元素均未到期,线程会被挂起。
4.3 检查和移除操作:poll
poll
方法是非阻塞的,如果没有到期的元素则直接返回 null
。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek(); // 查看堆顶元素
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null; // 如果堆顶元素未到期,返回 null
else
return q.poll(); // 如果到期,移除并返回堆顶元素
} finally {
lock.unlock();
}
}
步骤:
- 检查堆顶元素是否存在且到期。
- 如果堆顶元素未到期或队列为空,返回
null
。 - 如果堆顶元素到期,移除并返回该元素。
性能:
poll
操作的时间复杂度为 O(1)(检查堆顶元素)或 O(log n)(移除堆顶元素)。
4.4 清空操作:clear
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.clear();
} finally {
lock.unlock();
}
}
- 功能:清空队列中的所有元素。
- 线程安全:通过加锁操作确保线程安全。
5. 内部机制
5.1 延迟机制
DelayQueue
的核心延迟机制依赖于元素的 getDelay
方法。每个插入队列的元素必须实现 Delayed
接口,其定义如下:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
getDelay
:返回剩余的延迟时间,单位由参数unit
决定。compareTo
:用于按照剩余延迟时间对元素排序,堆顶元素是最早到期的元素。
5.2 优先级队列
内部优先级队列 PriorityQueue
按照堆的方式存储元素,堆顶始终是最小的元素(最早到期的元素)。通过堆的插入和删除操作维持队列的顺序。
6. 性能分析
插入操作:
- 时间复杂度为 O(log n),因为每次插入都需要调整堆的顺序。
删除操作:
- 时间复杂度为 O(log n),移除堆顶元素后需要调整堆。
读取操作:
- 非阻塞方法(如
poll
)的时间复杂度为 O(1)(如果堆顶元素未到期)或 O(log n)(如果移除堆顶元素)。 - 阻塞方法(如
take
)会阻塞线程,直到有元素到期。
- 非阻塞方法(如
空间复杂度:
- 由于
DelayQueue
是无界队列,因此其内存占用取决于插入的元素数量和每个元素的大小。
- 由于
7. 适用场景
- 定时任务调度:延迟执行任务,最常见的使用场景是实现定时任务调度器。
- 缓存过期管理:维护有超时机制的缓存数据。
- 消息延迟处理:在一定时间后处理消息或事件。
总结
DelayQueue
是一个线程安全的基于堆的延迟队列,适合于延时任务和定时调度场景。通过优先级队列和阻塞机制,DelayQueue
能够高效地处理到期的任务,并确保多线程环境下的安全性。它的设计强调延迟排序和线程间的协调,是实现定时器和延时任务的理想选择。