一个无界的阻塞队列,使用相同的排队规则 PriorityQueue 并且提供阻塞的操作. 因为这个队列逻辑上是误解的,尝试添加操作可能会失败,由于资环耗尽了(比如OOM).
这个类不接受null元素. 一个优先级队列依赖于自然序并且不保证 non-comparable(不支持比较的元素) 的元素顺序.
这个类和他的迭代器实现了 Collection 和 Iterator 接口所有可选的方法,这个迭代器提供了 iterator() 和 spliterator() , 不保证遍历元素的顺序.
如果你需要排序的遍历,可以使用 Arrays.sort(pq.toArray()) . 另外,方法 drainTo 可以用来移除一些元素,并且把他们放到另外一个集合中.
这个类的操作,不保证相同优先级的元素的顺序. 如果你需要强制一个顺序,你可以定义定制化的类或者比较器,使用第二个key来打破第一个key相同的情况.
举个例子,这里有一个类提供了 FIFO 顺序去比较元素。
class FIFOEntry>
implements Comparable> {
static final AtomicLong seq = new AtomicLong(0);
final long seqNum;
final E entry;
public FIFOEntry(E entry) {
seqNum = seq.getAndIncrement();
this.entry = entry;
public E getEntry() { return entry; }
public int compareTo(FIFOEntry other) {
// 首先调用`CompareTo`来获取优先级
int res = entry.compareTo(other.entry);
// 如果第一个优先级一样, 就根据seqNum再给定一个优先级.
if (res == 0 && other.entry != this.entry)
res = (seqNum < other.seqNum ? -1 : 1);
return res;
实现了 CompareTo ,首先使用原始类的 CompareTo ,如果优先级相等,就是用内部自定义的 seqNum 来比较优先级.
public class PriorityBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
// 实际保存数据的数组
private transient Object[] queue;
// 元素数量
private transient int size;
// 比较器,定义了元素的优先级
private transient Comparator<? super E> comparator;
// 锁
private final ReentrantLock lock = new ReentrantLock();
// 不为空的等待条件
private final Condition notEmpty = lock.newCondition();
// 锁
private transient volatile int allocationSpinLock;
// 用于帮助序列化的一个类,没啥用
private PriorityQueue q;
public PriorityBlockingQueue() {
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.comparator = comparator;
this.queue = new Object[Math.max(1, initialCapacity)];
public PriorityBlockingQueue(Collection<? extends E> c) {
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
Object[] es = c.toArray();
int n = es.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (es.getClass() != Object[].class)
es = Arrays.copyOf(es, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (Object e : es)
if (e == null)
throw new NullPointerException();
this.queue = ensureNonEmpty(es);
this.size = n;
if (heapify)
实现了四个构造方法,前三个都是对初始容量及比较器的赋值. 第四个构造函数支持将给定集合中的元素初始化到队列中.
public boolean add(E e) {
return offer(e);
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
// 加锁
final ReentrantLock lock = this.lock;
int n, cap;
Object[] es;
// 扩容
while ((n = size) >= (cap = (es = queue).length))
tryGrow(es, cap);
try {
// 根据是否有特定的比较器,将当前元素上浮到正确的优先级位置.
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftUpComparable(n, e, es);
siftUpUsingComparator(n, e, es, cmp);
// 数量+1,通知不为空的等待线程
size = n + 1;
} finally {
return true;
public void put(E e) {
offer(e); // never need to block
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
add, offer, put, offer(time,unit) 四个方法,本质上都是调用的同一个 offer ,为啥呢?
因为这个优先级队列,本质上是无界的,也就是说,没有 队列满了 的情况,因此前面的等待条件,只有 notEmpty 而没有和其他队列一样的 notFull 。
private static void siftUpComparable(int k, T x, Object[] es) {
Comparable<? super T> key = (Comparable<? super T>) x;
// 遍历
while (k > 0) {
// 父节点
int parent = (k - 1) >>> 1;
Object e = es[parent];
// 父节点和当前节点对比
if (key.compareTo((T) e) >= 0)
es[k] = e;
k = parent;
// 找到的位置给新的节点
es[k] = key;
// 和上面的方法一样,只不过比较器是给定的,不是用元素本身的CompareTo。
private static void siftUpUsingComparator(
int k, T x, Object[] es, Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = es[parent];
if (cmp.compare(x, (T) e) >= 0)
es[k] = e;
k = parent;
es[k] = x;
因为队列中的元素,其实是一个平衡的二叉堆,因此在给定的元素,寻找优先级所在的位置时, 使用类似于堆的上浮操作即可.
// 如果为空,返回null
public E poll() {
final ReentrantLock lock = this.lock;
try {
return dequeue();
} finally {
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
E result;
try {
while ( (result = dequeue()) == null)
} finally {
return result;
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
return result;
队列的几个出队方法,核心都是调用 dequeue() 方法,只是在获取元素为空时,处理策略不一致.
private E dequeue() {
// assert lock.isHeldByCurrentThread();
final Object[] es;
final E result;
// 获取数组第一个,也就是堆顶的元素
if ((result = (E) ((es = queue)[0])) != null) {
final int n;
// 最后一个元素
final E x = (E) es[(n = --size)];
es[n] = null;
if (n > 0) {
// 将他放在堆顶,然后下沉,使堆符合优先级
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftDownComparable(0, x, es, n);
siftDownUsingComparator(0, x, es, n, cmp);
return result;
private static void siftDownComparable(int k, T x, Object[] es, int n) {
// assert n > 0;
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
// 堆顶元素的孩子节点
int child = (k << 1) + 1; // assume left child is least
Object c = es[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
c = es[child = right];
if (key.compareTo((T) c) <= 0)
es[k] = c;
k = child;
es[k] = key;
将给定节点与右边子节点进行比较,如果不符合优先级,交换位置. 递归执行.
一个带有优先级的阻塞队列. 支持使用元素本身的 CompareTo 以及给定比较器 Comparator .
优先级的实现,使用堆. 因此内部保存元素的载体是一个数组.
由于设计是无界的队列,因此入队方法永远不会阻塞,只会逐渐撑爆内存. put 方法不会阻塞. 出队方法像其他阻塞队列一样,会阻塞.
对数组的读写使用 ReentrantLock 来保证线程安全性.
阻塞操作使用 Condition 来实现阻塞等待与唤醒.
