PriorityBlockingQueue
是无界的,但我需要以某种方式绑定它。实现这一目标的最佳方法是什么?
有关信息,有界PriorityBlockingQueue
将用于ThreadPoolExecutor
.
注意:如果发生这种情况,我不想抛出异常,我想将对象放入队列中,然后根据其优先级值将其剪切。有什么好的方法来做这个切东西吗?
PriorityBlockingQueue
是无界的,但我需要以某种方式绑定它。实现这一目标的最佳方法是什么?
有关信息,有界PriorityBlockingQueue
将用于ThreadPoolExecutor
.
注意:如果发生这种情况,我不想抛出异常,我想将对象放入队列中,然后根据其优先级值将其剪切。有什么好的方法来做这个切东西吗?
我实际上不会子类化它。虽然我现在无法整理示例代码,但我建议使用装饰器模式的一个版本。
创建一个新类并实现您感兴趣的类实现的接口:PriorityBlockingQueue。我发现这个类使用了以下接口:
Serializable, Iterable<E>, Collection<E>, BlockingQueue<E>, Queue<E>
在类的构造函数中,接受 aPriorityBlockingQueue
作为构造函数参数。
然后通过PriorityblockingQueue
. 添加使其有界所需的任何代码。这是装饰器模式的一个相当标准的实现。
Google Collections/Guava库中有一个实现: MinMaxPriorityQueue。
最小-最大优先级队列可以配置最大大小。如果是这样,每次队列的大小超过该值时,队列都会根据其比较器(可能是刚刚添加的元素)自动删除其最大元素。这与传统的有界队列不同,后者在满时阻塞或拒绝新元素。
在我的脑海中,我将它子类化并覆盖 put 方法以强制执行此操作。如果它超过抛出异常或做任何看起来合适的事情。
就像是:
public class LimitedPBQ extends PriorityBlockingQueue {
private int maxItems;
public LimitedPBQ(int maxItems){
this.maxItems = maxItems;
}
@Override
public boolean offer(Object e) {
boolean success = super.offer(e);
if(!success){
return false;
} else if (this.size()>maxItems){
// Need to drop last item in queue
// The array is not guaranteed to be in order,
// so you should sort it to be sure, even though Sun's Java 6
// version will return it in order
this.remove(this.toArray()[this.size()-1]);
}
return true;
}
}
编辑: add 和 put 调用提供,所以覆盖它应该就足够了
编辑 2:如果超过 maxItems,现在应该删除最后一个元素。不过,可能有一种更优雅的方法。
根据 Frank V 的建议实施 BoundedPriorityBlockingQueue 后,我意识到它并没有达到我想要的效果。主要问题是我必须插入队列的项目可能比队列中已经存在的所有项目具有更高的优先级。因此,我真正想要的是一个'pivot'方法,如果我将一个对象放入队列中,当队列已满时,我想取回最低优先级的对象,而不是阻塞。
为了充实 Frank V 的建议,我使用了以下片段......
public class BoundedPriorityBlockingQueue<E>
implements
Serializable,
Iterable<E>,
Collection<E>,
BlockingQueue<E>,
Queue<E>,
InstrumentedQueue
{
... 私有最终 ReentrantLock 锁;// = 新的 ReentrantLock(); 私人最终条件未满;
final private int capacity;
final private PriorityBlockingQueue<E> queue;
public BoundedPriorityBlockingQueue(int capacity)
throws IllegalArgumentException,
NoSuchFieldException,
IllegalAccessException
{
if (capacity < 1) throw
new IllegalArgumentException("capacity must be greater than zero");
this.capacity = capacity;
this.queue = new PriorityBlockingQueue<E>();
// gaining access to private field
Field reqField;
try {
reqField = PriorityBlockingQueue.class.getDeclaredField("lock");
reqField.setAccessible(true);
this.lock = (ReentrantLock)reqField.get(ReentrantLock.class);
this.notFull = this.lock.newCondition();
} catch (SecurityException ex) {
ex.printStackTrace();
throw ex;
} catch (NoSuchFieldException ex) {
ex.printStackTrace();
throw ex;
} catch (IllegalAccessException ex) {
ex.printStackTrace();
throw ex;
}
...
@Override
public boolean offer(E e) {
this.lock.lock();
try {
while (this.size() == this.capacity)
notFull.await();
boolean success = this.queue.offer(e);
return success;
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
return false;
} finally {
this.lock.unlock();
}
}
...
这也有一些工具,所以我可以检查队列的有效性。我仍在研究“PivotPriorityBlockingQueue”,如果有人感兴趣,我可以发布它。
There is an implementation in the ConcurrencyUtils repo.
如果您要执行的 Runnables 的顺序不严格(因为:即使存在更高优先级的任务,也可能会执行一些优先级较低的任务),那么我建议以下内容,归结为定期切割 PriorityQueue缩小尺寸:
if (queue.size() > MIN_RETAIN * 2){
ArrayList<T> toRetain = new ArrayList<T>(MIN_RETAIN);
queue.drainTo(toRetain, MIN_RETAIN);
queue.clear();
for (T t : toRetain){
queue.offer(t);
}
}
如果顺序需要严格,这显然会失败,因为耗尽会导致片刻,低优先级任务将使用并发访问从队列中检索。
优点是,这是线程安全的,并且可能会尽可能快地使用优先级队列设计。
到目前为止,没有一个答案具有以下所有属性:
BlockingQueue
接口。Unfortunately, there is no BlockingQueue
implementation in the standard Java library. You will either need to find an implementation or implement something yourself. Implementing a BlockingQueue
will require some knowledge on proper locking.
Here's what I suggest: Have a look at https://gist.github.com/JensRantil/30f812dd237039257a3d and use it as a template to implement your own wrapper around a SortedSet
. Basically, all the locking is there and there are multiple unit tests (that will need some tweaking).
查看 Google Collections API 中的ForwardingQueue。对于阻塞语义,您可以使用Semaphore。
这里还有另一个实现
它似乎可以满足您的要求:
BoundedPriorityQueue 实现了一个具有元素数量上限的优先级队列。如果队列未满,则始终添加添加的元素。如果队列已满且添加的元素大于队列中的最小元素,则删除最小元素并添加新元素。如果队列已满且添加的元素不大于队列中的最小元素,则不添加新元素。